Snowflake is becoming increasingly popular as a data platform, and thus the need for ways to integrate Snowflake into production systems is also growing. As type-safe programming languages like Java lead to more stable productive environments and are often preferred over untyped languages (e.g. Python), this implies the challenge to connect Snowflake databases in those type-safe languages. While Snowflake comes with dedicated methods for data loading from Cloud providers (e.g. Snowpipes), we have to rely on other implementation approaches when ingesting data directly from our type-safe Java apps.
So in this blog post, we will cover the following native data ingestion methods for Java: Batch and Merge insertion using the JDBC template and Stream insertion via the Snowflake Streaming API.
We will also compare their performance in terms of ingestion time and Snowflake compute usage. This provides you with a practical guide to choosing the right implementation for your application and is applicable to all languages that provide a Snowflake JDBC driver (in particular it should be applicable to other JVM-based languages like Kotlin or Scala).
Preliminary remarks
Throughout this post, we will use a Snowflake database with a table named INGEST_TABLE, which is created in SnowSQL with the following command
1 2 3 4 5 |
CREATE OR REPLACE TABLE INGEST_TABLE( id int, name varchar, price double, jsonData variant); |
The Variant column, which represents JSON formatted data, is used to create proper load on the database. We implement a corresponding Java class named “SnowflakeTableEntity“:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class SnowflakeTableEntity { private int id; private String name; private Double price; private final String jsonData = "{'someBigJson':0}"; @JsonIgnore private final ObjectMapper objectMapper = new ObjectMapper(); public String toJson() { try { return objectMapper.writeValueAsString(this); } catch (JsonProcessingException e) { log.error(e); return null; } } } |
Snowflake Connection in Java
Like most other database systems, Snowflake provides us with a JDBC driver for Java, which we will use throughout this blog post. To connect our JDBC driver to Snowflake, we first have to authenticate it to Snowflake.
For that, we have to create an RSA key pair (e.g. by invoking ssh-keygen) and assign the public key to a Snowflake User with the SnowSQL command
1 |
ALTER USER sf_user SET rsa_public_key='MII...'; |
where sf_user is the Snowflake User in our example. With the private key of our generated key pair which will be located in a private key file, let’s say ‘rsa_key.p8’, we can configure our data source which we use to instantiate the JDBC driver.
Therefore, we have to convert the private key, which is saved in base64 encoded text into the PrivateKey Java Interface:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import java.security.KeyFactory; import java.security.PrivateKey; import java.security.spec.KeySpec; import java.security.spec.PKCS8EncodedKeySpec; import java.util.Base64; /** * Creates a RSA private key from a P8 file * * @param file a private key P8 file * @return RSAPrivateCrtKey instance * @throws Exception arises if any error occurs */ private static PrivateKey readPrivateKey(File file) throws Exception { String key = Files.readString(file.toPath(), Charset.defaultCharset()); String privateKeyPEM = key.replace("-----BEGIN PRIVATE KEY-----", "") .replaceAll(System.lineSeparator(), "") .replace("-----END PRIVATE KEY-----", ""); byte[] encoded = Base64.getDecoder().decode(privateKeyPEM); KeyFactory keyFactory = KeyFactory.getInstance("RSA"); PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded); return keyFactory.generatePrivate(keySpec); } |
Note, how here the —–BEGIN PRIVATE KEY—– and —–END PRIVATE KEY—– parts have to be removed from the private key.
With that, we can already create our JDBC template that uses the corresponding Datasource:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
SnowflakeBasicDataSource dataSource = new SnowflakeBasicDataSource(); dataSource.setPrivateKey(readPrivateKeyFile(file)); dataSource.setSchema(schema); dataSource.setDatabaseName(databaseName); dataSource.setRole(role); dataSource.setUser(user); dataSource.setWarehouse(warehouse); dataSource.setUrl(sfUrl); HikariDataSource pooledDataSource = new HikariDataSource(); pooledDataSource.setDataSource(dataSource); SnowflakeConnection sfConnection = pooledDataSource.getConnection().unwrap(SnowflakeConnection.class); JdbcTemplate sfJdbcTemplate = new JdbcTemplate(pooledDataSource); |
where the parameters of the Datasource are the corresponding snowflake objects and the file input is your private key file. The sfUrl should be of the form
1 |
jdbc:snowflake://{snowflake-instance-locator}.{snowflake-instance-region}.snowflakecomputing.com) |
Having set up the Snowflake connection correctly, we should now be able to communicate with our Snowflake database. Here is a test selection query:
1 2 |
String query = "SELECT id FROM INGEST_TABLE;"; List<Integer> idList = sfJdbcTemplate.queryForList(query, Integer.class); |
Data Ingestion with Batch Insert
This method should be rather familiar to users who have already worked with JDBC. It relies on traditional OLTP insert and update methods and uses the JDBC template batchUpdate functionality:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public void insert(List<SnowflakeTableEntity> entities) { String sql = "INSERT INTO INGEST_TABLE (ID, NAME, PRICE, JSONDATA) SELECT ?, ?, ?, PARSE_JSON(?);"; sfJdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement preparedStatement, int i) throws SQLException, SQLException { SnowflakeTableEntity entity = entities.get(i); preparedStatement.setInt(1, entity.getId()); preparedStatement.setString(2, entity.getName()); preparedStatement.setDouble(3, entity.getPrice()); preparedStatement.setString(4, entity.getJsonData()); } @Override public int getBatchSize() { return entities.size(); } }); } |
Here, we used Snowflake’s PARSE_JSON function to convert our json string to a variant object.
If we wanted to upsert entries instead of just inserting them, we would have to determine existing entities and update those, while inserting the non-existing ones.
Although the batch insert works theoretically, we’ll see in the Performance Comparison section that one should use the other methods for effective data ingestion.
Data Ingestion with Merge Insert
This solution is based on native Snowflake ingestion methods rather than traditional JDBC methods. Here, we use Snowflake stages for intermediate data storage and then merge these stages into our table. In general, Snowflake stages are intended as gateways to a variety of data sources, such as cloud storage and uploaded data files. Thus, we first create an internal Snowflake stage (alternatively one could use the table stage of our ingest table which Snowflake creates by default, see the Snowflake documentation):
1 |
CREATE OR REPLACE STAGE INGEST_STAGE FILE_FORMAT = (TYPE = JSON); |
The Snowflake JDBC driver provides us with an API extension to directly load data into this internal stage from a Java stream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public void uploadDataToSnowflakeStage(List<SnowflakeTableEntity> entities) throws SQLException { String json = entities .stream() .map(SnowflakeTableEntity::toJson) .collect(Collectors.joining("\n")); InputStream inputStream = toInputStream(ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8) ); String stageName = "@".concat("INGEST_STAGE"); String pathPrefix = LocalDate.now().toString(); String destinationFile = String.format("%s.json", LocalDateTime.now()); boolean compressData = true; connection.uploadStream(stageName, pathPrefix, inputStream, destinationFile, compressData); } |
Next, we can insert the uploaded data into our table by merging the stage:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public void mergeDataIntoTable() { String sql = """ MERGE INTO INGEST_TABLE USING (SELECT $1:id id, $1:name name, $1:price price, PARSE_JSON($1:jsonData) jsonData FROM @INGEST_STAGE) tempTable ON INGEST_TABLE.id = tempTable.id AND INGEST_TABLE.name = INGEST_TABLE.name WHEN MATCHED THEN UPDATE SET jsondata = tempTable.jsonData WHEN NOT MATCHED THEN INSERT (id, name, price, jsondata) VALUES (tempTable.id, tempTable.name, tempTable.price, tempTable.jsonData); """; jdbcTemplate.execute(sql); } |
Note, that there is an integrated mechanism of upserting via the matched and not matched clauses. This is a feature that distinguishes this method from the others and makes it the preferred one if an upsert is necessary in the data model.
One thing to consider, however, is the file size of the resulting uploaded data file. As the Snowflake documentation suggests, this file should be around 100-250 MB. We will experiment with different upload sizes in the Performance Comparison section.
Data Ingestion using Snowflake Streaming
This method promises low-latency data ingestion and is suitable for real-time data streams according to the Snowflake documentation. We can ingest data using the Snowflake streaming API. For this, we first have to create a dedicated Snowflake Channel:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
Properties props = new Properties(); props.put("private_key", readPrivateKeyFile(file)); props.put("user", user); props.put("database", database); props.put("schema", schema); props.put("warehouse", warehouse); props.put("role", role); props.put("connect_string", url); props.put("host", account); props.put("scheme", "https"); props.put("ssl", "on"); props.put("port", 443); SnowflakeStreamingIngestClient client = SnowflakeStreamingIngestClientFactory.builder("CLIENT").setProperties(props).build(); OpenChannelRequest request = OpenChannelRequest.builder(channelName) .setDBName(database) .setSchemaName(schema) .setTableName(tableName) .setOnErrorOption( OpenChannelRequest.OnErrorOption.CONTINUE) .build(); SnowflakeStreamingIngestChannel channel = client.openChannel(request); |
Using this channel, we can now ingest data by transforming entities to map objects:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public void insert(List<SnowflakeTableEntity> entities) { List<Map<String, Object>> rows = entities .stream() .map(entity -> toRow(entity)) .toList(); InsertValidationResponse response = channel.insertRows(rows, "offsetToken"); if (response.hasErrors()) { throw response.getInsertErrors().get(0).getException(); } } public static Map<String, Object> toRow(SnowflakeTableEntity entity) { Map<String, Object> row = new HashMap<>(); row.put("ID", entity.getId()); row.put("NAME", entity.getName()); row.put("PRICE", entity.getPrice()); row.put("JSONDATA", entity.getJsonData()); return row; } |
This method does not provide any possibility to update existing data rows, so it is limited to insertion only, which is the major drawback compared to the other methods when considering Snowflake in an OLTP context.
Performance Comparison
To evaluate which method is the most suitable for ingesting data into Snowflake with Java, we tested each method by ingesting several 1000 data entries into an empty table with various batch sizes and two different warehouses (xs and s). In the resulting graph, we omit the run times of the batch insert, as it was slower than the other methods almost by a factor of 100!
The other two methods, however, performed comparably well while the stream insert was a little bit faster than one might expect. Also note how the insertion speed decreases with higher batch sizes and increases again, after an optimal batch size is reached. This optimal batch size corresponds to a compressed file size of ~100 MB, which is consistent with the Snowflake documentation. In our case the size of the warehouse did not play a significant role, however this might change for different data schemas or sizes.
Recommendations
Based on the results of the previous section, we would recommend using Stream insertion wherever possible. As Stream insertion is an append-only method, you can use Merge insertion for cases where you must consider upsertion for entries with unique values. In each case, you should experiment with different batch sizes to determine the optimal one for your use case. Also, try out different warehouse sizes and use the smallest if you do not notice any performance difference.
Useful Links:
- https://docs.snowflake.com/en/developer-guide/sql-api/authenticating
- https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage
- https://docs.snowflake.com/en/developer-guide/jdbc/jdbc-using#label-jdbc-upload-from-stream-to-stage
- https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview
- https://github.com/snowflakedb/snowflake-ingest-java/blob/master/src/main/java/net/snowflake/ingest/streaming/example/SnowflakeStreamingIngestExample.java
- https://docs.snowflake.com/en/user-guide/data-load-considerations-prepare