Snowflake Writer
Writes to one or more existing tables in Snowflake. When not using Snowpipe Streaming, events are staged to local storage, AWS S3, or Azure Storage, then written to Snowflake as per the Upload Policy setting. Striim connects to Snowflake over JDBC with SSL enabled. Files are uploaded to Snowflake's staging area using Snowflake's PUT command and are encrypted using 128-bit keys.
Snowflake Writer properties
property | type | default value | notes |
---|---|---|---|
Append Only | Boolean | False | With the default value of False, updates and deletes in the source are handled as updates and deletes in the target. Set to True to handle updates and deletes as inserts in the target. With this setting:
|
Authentication Type | enum |
| When Use Connection Profile is True, this property is ignored and not shown in the UI. Selects the type of user authentication. Select Select Select |
CDDL Action | String | Process | |
Client Configuration | String | This property appears in the UI only when External Stage Type is ADLSGen2 or S3. Otherwise it is ignored. If using a proxy, specify | |
Client ID | String | This property appears in the UI only when Use Connection Profile is False and Authentication Type is Manual OAuth. Otherwise it is ignored. This property is required when Manual OAuth authentication is selected. | |
Client Secret | encrypted password | This property appears in the UI only when Use Connection Profile is False and Authentication Type is Manual OAuth. Otherwise it is ignored. This property is required when Manual OAuth authentication is selected. | |
Column Delimiter | String | | (UTF-8 007C) | The character(s) used to delimit fields in the delimited text files in which the adapter accumulates batched data. If the data will contain the |
Connection Profile Name | String | When Use Connection Profile is True, select (or, in TQL, specify) an existing connection profile or select New Connection Profile to create a new one (see Introducing connection profiles). | |
Connection Retry Policy | String | initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 10 seconds (InitialRetryDelay=10s). If the second attempt is unsuccessful, in 20 seconds it will try a third time (InitialRetryDelay=10s multiplied by retryDelayMultiplier=2). If that fails, the adapter will try again in 40 seconds (the previous retry interval 20s multiplied by 2). If connection attempts continue to fail, the adapter will try again every 60 seconds (maxRetryDelay=1m) until a total of 5 connection attempts have been made (maxAttempts=5), after which the adapter will halt and log an exception. The adapter will halt when either maxAttempts or totalTimeout is reached. InitialRetryDelay, maxRetryDelay, and totalTimeout may be specified in milliseconds (ms), seconds (s, the default), or minutes (m). If retryDelayMultiplier is set to 1, connection will be attempted on the fixed interval set by InitialRetryDelay. To disable connection retry, set maxAttempts=0. Negative values are not supported. |
Connection URL | String | When Use Connection Profile is True, this property is ignored and not shown in the UI. The JDBC driver connection string for your Snowflake account. The syntax is If not specified as part of the connection URL, the following property defaults are used: CLIENT_SESSION_KEEP_ALIVE is TRUE, CLIENT_SESSION_KEEP_ALIVE_HEARTBEAT_FREQUENCY is 900, networkTimeout is 800000, and queryTimeout is 1800. | |
External Stage Connection Profile Name | String | When Use Connection Profile is True and External Stage Type is ADLS Gen2 or S3, select (or, in TQL, specify) an existing connection profile or select New Connection Profile to create a new one (see Introducing connection profiles). | |
External Stage Type | String | local | With the default value of To stage to ADLS Gen2, set to To stage to S3, set to |
File Format Options | null_if = "" | Do not change unless instructed to by Striim support. | |
Ignorable Exception Code | String | Set to TABLE_NOT_FOUND to prevent the application from terminating when Striim tries to write to a table that does not exist in the target. See Handling "table not found" errors for more information. Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE). | |
Null Marker | String | Optionally, specify a string inserted into fields in the stage files to indicate that a field has a null value. These are converted back to nulls in the target tables. If any field might contain the string NULL, change this to a sequence of characters that will not appear in the data. When you set a value for Null Marker, set the same value for File Format Options. For example, if Null Marker is | |
Optimized Merge | Boolean | false | Set to True only when Append Only is False and the target's input stream is the output of an HP NonStop reader, MySQL Reader, or Oracle Reader source and the source events will include partial records. For example, with Oracle Reader, when supplemental logging has not been enabled for all columns, partial records are sent for updates. When the source events will always include full records, leave this set to false. |
Parallel Threads | Integer | ||
Password | encrypted password | This property appears in the UI only when Use Connection Profile is False and Authentication Type is Password. Otherwise it is ignored The password for the specified username. See Encrypted passwords. | |
Private Key | encrypted password | This property appears in the UI only when Use Connection Profile is False and Authentication Type is Key Pair. The key may be stored in a vault. This property is required when using public/private key pair authentication. This property supports, but does not require, encrypted private keys. | |
Private Key Passphrase | encrypted password | This property appears in the UI only when Use Connection Profile is False and Authentication Type is Key Pair. This property is required when using key pair authentication with an encrypted private key. | |
Refresh Token | encrypted password | This property appears in the UI only when Use Connection Profile is False and Authentication Type is Manual OAuth. Otherwise it is ignored. This property is required when Manual OAuth is selected as the value of the Authentication Type property. The token expires after the interval set when you created the OAuth integration. If you do not update the token before it expires, the application will halt. | |
Snowpipe Streaming (StreamingUpload in TQL) | Boolean | False | With the default value of False, Snowflake Writer will use the Snowflake JDBC driver. Set to True to use the Snowpipe Streaming API. When set to True, use either Key Pair or Manual OAuth authentication, and, optionally, adjust the Streaming Configuration property as appropriate. |
Streaming Configuration | String | MaxParallelRequests=6, MaxRequestSizeInMB=10, MaxRecordsPerRequest=100000 | When Snowpipe Streaming is False, this setting is ignored.
|
Tables | String | Specify the name(s) of the table(s) to write to in uppercase in the format If the source is Database Reader and its Create Schema property is True, the specified schema(s) and table(s) will be created in the target (see the discussion of Create Schema in Database Reader properties for more details). Otherwise, the table(s) must exist in the target when the application is started, and if a specified target table does not exist, the application will terminate with an error. To skip writes to missing tables without terminating, specify TABLE_NOT_FOUND as an Ignorable Exception Code. When the target's input stream is a user-defined event, specify a single table. When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use the source.emp,MYSCHEMA.EMP source.%,MYSCHEMA.% See Mapping columns for additional options. | |
Upload Policy | String | eventcount:10000, interval:5m | The upload policy may include eventcount and/or interval (see Setting output names and rollover / upload policies for syntax). When Showpipe Streaming is False, cached data is written to the staging area every time any of the specified values is exceeded. With the default value, data will be written every five minutes or sooner if the cache contains 10,000 events. When the app is undeployed, all remaining data is written to the staging area. When Snowpipe Streaming (StreamingUpload in TQL) is True, Optimized Merge is False, and Append Only is False, to reduce the amount of memory required on the Striim server portions of the data may be uploaded to the staging area before the upload policy has been reached. Once the upload policy is reached, the remaining data will be uploaded to the staging area and the whole batch will be written to the target. Similarly, when Snowpipe Streaming is True and Append Only is True, to reduce the amount of memory required on the Striim server portions of the data may be written to the target before the upload policy has been reached. Once the upload policy is reached, the remaining data will be written to the target. |
Use Connection Profile | Boolean | False | Set to True to use a connection profile instead of specifying the connection properties here. See Introducing connection profiles. When Use Connection Profile is True and Snowpipe Streaming is False you must also use a connection profile for the external stage. |
Username | String | This property appears in the UI only when Use Connection Profile is False and Authentication Type is Password. Otherwise it is ignored Specify the username you use to log in to Snowflake. Alternatively, specify the name of a Snowflake user with SELECT, INSERT, UPDATE, and DELETE privileges on the tables to be written to, CREATE STAGE on the schema(s) containing the tables, and CREATE TABLE privileges on the database specified in the connection URL. | |
User Role | String | When Snowpipe Streaming is True, optionally specify the role to use for the session (see Docs » Managing Security in Snowflake » Administration & Authorization » Access Control in Snowflake » Overview of Access Control). |
Azure Data Lake Storage (ADLS Gen2) properties for Snowflake Writer
property | type | default value | notes |
---|---|---|---|
Azure Account Access Key | encrypted password | the account access key from Storage accounts > <account name> > Access keys | |
Azure Account Name | String | the name of the Azure storage account for the blob container | |
Azure Container Name | String | the blob container name from Storage accounts > <account name> > Containers If it does not exist, it will be created. |
S3 properties for Snowflake Writer
Specify either the access key and secret access key or an IAM role.
property | type | default value | notes |
---|---|---|---|
S3 Access Key | String | an AWS access key ID (created on the AWS Security Credentials page) for a user with read and write permissions on the bucket (leave blank if using an IAM role) | |
S3 Bucket Name | String | Specify the S3 bucket to be used for staging. If it does not exist, it will be created. | |
S3 Region | String | the AWS region of the bucket | |
S3 Secret Access Key | encrypted password | the secret access key for the access key |
Authentication mechanisms
This adapter supports the following authentication mechanisms:
Username and password
Public/private key pair
OAuth
Username and password authentication
Set the values of the username and password properties as normal.
Public/private key pair authentication
This procedure generates the public/private key pair. For details, see the Snowflake documentation on configuring key pair authentication.
From a terminal, execute the following command.
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out keyname.p8 -nocrypt
This command generates an unencrypted private key. Remove the
-nocrypt
option to generate an encrypted private key. Replace keyname with a file name for the private key.From a terminal, execute the following command.
openssl rsa -in keyname.p8 -pubout -out pubkeyname.pub
Replace keyname with the name chosen in the previous step. Replace pubkeyname with a file name for the public key.
In the console, execute the following command to assign the public key to the user account.
ALTER USER any_snowflake_user SET RSA_PUBLIC_KEY='code string';
Replace code string with the public key, not including the start and end key delimiters.
Known issue DEV-47140: If Snowflake Writer fails with the error "Unable to establish connection as Private Key Passphrase provided is wrong for Encrypted Private Key" but the passphrase is correct, add BouncyCastle at the end of the list of security providers in the java.security file (see section 8 of How to Implement a Provider in the Java Cryptography Architecture for more information). For example:
# List of providers and their preference orders (see above): # security.provider.1=SUN security.provider.2=SunRsaSign security.provider.3=SunEC security.provider.4=SunJSSE security.provider.5=SunJCE security.provider.6=SunJGSS security.provider.7=SunSASL security.provider.8=XMLDSig security.provider.9=SunPCSC security.provider.10=JdkLDAP security.provider.11=JdkSASL security.provider.12=org.bouncycastle.jce.provider.BouncyCastleProvider
CREATE TARGET SfTgt USING SnowflakeWriter
(
ConnectionURL:'jdbc:snowflake://striim.snowflakecomputing.com/?db=DEMO_DB',
username:'infra_1780_oauth_bearer_encrypted',
appendOnly:'true',
IgnorableExceptionCode:'TABLE_NOT_FOUND',
Tables:'QATEST.oracleRawSRC,QATEST1679489873.SNOWFLAKERAWTGT',
uploadpolicy:'eventcount:1,interval:10s',
privateKey:'keydata',
streamingUpload:'TRUE',
userRole:'SYSADMIN',
authenticationType:'KeyPair',
privateKeyPassphrase:'striim'
)
INPUT FROM OracleInitStream;
CREATE OR REPLACE TARGET sf USING Global.SnowflakeWriter (
userRole: 'sysadmin',
connectionUrl: 'jdbc:snowflake://striim.snowflakecomputing.com/?db=SAMPLEDB&schema=SANJAYPRATAP',
streamingUpload: 'true',
tables: 'public.sample_pk,SAMPLEDB.SANJAYPRATAP.SAMPLE_TB',
CDDLAction: 'Process',
optimizedMerge: 'false',
columnDelimiter: '|',
privateKey_encrypted: 'true',
appendOnly: 'false',
authenticationType: 'KeyPair',
username: 'rahul_mishra',
uploadPolicy: 'eventcount:10000,interval:5m',
privateKey: 'keydata',
externalStageType: 'Local',
adapterName: 'SnowflakeWriter',
fileFormatOptions: 'null_if = \"\"'
)
INPUT FROM sysout;
Manually configuring OAuth for Snowflake Writer
See How To: Generate and use an OAuth token using Snowflake OAuth for custom clients.
To simplify OAuth configuration, create a connection profile (see Introducing connection profiles).
Snowflake Writer sample application
The following sample application will write data from PosDataPreview.csv to Snowflake. The target table must exist.
CREATE SOURCE PosSource USING FileReader ( wildcard: 'PosDataPreview.csv', directory: 'Samples/PosApp/appData', positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO PosSource_Stream; CREATE CQ PosSource_Stream_CQ INSERT INTO PosSource_TransformedStream SELECT TO_STRING(data[1]) AS MerchantId, TO_DATE(data[4]) AS DateTime, TO_DOUBLE(data[7]) AS AuthAmount, TO_STRING(data[9]) AS Zip FROM PosSource_Stream; CREATE TARGET SnowflakeDemo USING SnowflakeWriter ( ConnectionURL: '<JDBC connection string', username: 'striim', password: '********', Tables: 'MYDB.MYSCHEMA.POSDATA', appendOnly: true ) INPUT FROM PosSource_TransformedStream;
The above example shows how to use SnowflakeWriter with an input of a user-defined type. For examples of applications where the input is the output of a CDC reader, DatabaseReader, or IncrementalBatchReader, see Replicating Oracle data to Snowflake.
The following is an example of the properties required to test the Snowpipe Streaming API:
CREATE TARGET SnowflakeDemo USING SnowflakeWriter ( ConnectionURL: '<JDBC connection string', username: 'striim', password: '********', Tables: 'MYDB.MYSCHEMA.POSDATA', appendOnly: true, streamingUpload:'true', userRole: 'ACCOUNTADMIN', privateKey: '*****' ) INPUT FROM PosSource_TransformedStream;
Snowflake Writer data type support and correspondence
See also Data type support & mapping for schema conversion & evolution.
TQL type | Snowflake type |
---|---|
Boolean | BOOLEAN |
Byte, Integer, Long, Short | BIGINT, DOUBLE, FLOAT, INT, NUMBER, SMALLINT |
DateTime | DATE, DATETIME, TIMESTAMP_L, TIMESTAMP_NTZ, TIMESTAMP_TZ |
Double, Float | DOUBLE, FLOAT |
String | CHAR, VARCHAR |
When the input of a Snowflake Writer target is the output of an Oracle source (DatabaseReader, Incremental Batch Reader, or Oracle Reader):
Oracle type | Snowflake type |
---|---|
BINARY | BINARY |
BINARY_DOUBLE | FLOAT |
BINARY_FLOAT | FLOAT |
BLOB | BINARY |
CHAR | CHAR |
CLOB | VARCHAR |
DATE | DATE |
FLOAT | FLOAT |
INTEGER | NUMBER |
INTERVAL DAY TO SECOND | VARCHAR |
INTERVAL YEAR TO MONTH | VARCHAR |
LONG | not supported |
LONG RAW | not supported |
NCHAR | CHAR |
NCLOB | VARCHAR |
NUMBER | NUMBER |
NUMBER(<precision>,<scale>) | NUMBER((<precision>,<scale>) |
NVARCHAR2 | VARCHAR |
RAW | BINARY |
TIMESTAMP | TIMESTAMP_NTZ |
TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP_LTZ |
TIMESTAMP WITH TIMEZONE | TIMESTAMP_TZ |
VARCHAR2 | VARCHAR |
XMLTYPE | VARCHARD |
See Oracle Reader and OJet WAEvent fields for additional information including limitations for some types.