kafka-configure-connection-profile
Configure the following properties to create a connection profile for Kafka.
Connection Profile Name: Assign a unique name to identify the Kafka connection profile.
Namespace: Select the namespace where the connection profile will be stored. Ensure all users have READ and SELECT permissions for the chosen namespace.
Endpoint Type: Choose Kafka.
Configuring connection properties
Configure the following connection properties:
Broker Address: Specify the Kafka broker addresses in host:port format. For multiple brokers, use a comma-separated list (for example,
host1:port1,host2:port2). For multiple brokers, broker addresses must belong to the same Kafka cluster.For Confluent Cloud Kafka, copy the Bootstrap Server URL from the Cluster Settings. For example:
pkc-xxxxxx.region.cloud_provider.confluent.cloud:9092.Authentication Type: Select the type of authentication you want to use, then set the properties for that type as described below.
Configuring SSL settings
Use SSL is turned off by default. For production environments, it is recommended to turn on SSL. This mode ensures data confidentiality over the network.
When Use SSL is enabled with mutual TLS authentication, configure SSL using one of the following options. For SSL encryption without client authentication, see Setting no authentication properties.
Option 1: Using PEM certificate files
Set Use Certificate to True and configure the following properties:
Set Authentication Type to Mutual TLS.
Provide the CA certificate file path in CA Certificate. You can also upload the CA Certificate.
SSL Keystore Key (client private key PEM).
SSL Key Password (if the private key is encrypted).
Note
You can securely upload the certificates to a Vault and reference it using this property or copy paste the path of the certificates.
Note
Striim provides the facility for certificate rotation for enhanced security via Connection Profile only. The rotated certificates will be used by the adapters when the adapter hits some exception and retries or on application restart.
Option 2: Using Java Keystore/Truststore
Set Use Certificate to False and configure the following properties:
Set Authentication Type to Mutual TLS.
Provide the path to the truststore file (JKS/PKCS12) in SSL Truststore Location.
SSL Truststore Password.
SSL Keystore Password.
SSL Key Password (if required).
Note
You can use Vault to securely configure authentication settings (JAAS Config, SSL Certificates).
Note
Striim provides the facility for certificate rotation for enhanced security via Connection Profile only. The rotated certificates will be used by the adapters when the adapter hits some exception and retries or on application restart.
Setting no authentication properties
Set Authentication Type to NONE.
Note
When Authentication Type is set to NONE and Use SSL is turned on, this mode ensures data confidentiality over the network but does not enforce identity verification of the client.
Encrypting with SSL (no client authentication)
You can enable SSL encryption without client authentication to secure data in transit between Striim and the Kafka broker. This mode is suitable when data must be encrypted but the broker does not require client identity verification, such as in internal trusted networks.
To configure SSL encryption without authentication, set Use SSL to True and configure the SSL properties based on the certificate format:
Property | Value |
|---|---|
Authentication Type | NONE |
Use SSL | True |
Use Certificate | True (recommended) or False. |
CA Certificate | Path to the PEM file. Required when Use Certificate is True. |
SSL Truststore Location | Path to the truststore file (JKS/PKCS12). Required when Use Certificate is False. |
SSL Truststore Password | Password for the truststore. Required when Use Certificate is False and the truststore is password-protected. |
Setting PLAIN authentication properties
Set Authentication Type to PLAIN.
For Apache Kafka, specify the following JAAS config:
org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
Replace
<username>and<password>with the actual values.For Confluent Cloud Kafka, specify the following JAAS config:
org.apache.kafka.common.security.plain.PlainLoginModule required username="<API_KEY>" password="<API_SECRET>";
Replace
<API_KEY>and<API_SECRET>with the actual values.Use SSL must be enabled.
Note
Always enable Use SSL for Confluent Cloud Kafka.
Note
It is recommended to use Vault to save the JAAS Config and refer to the vault key in the JAAS Config property.
Setting SCRAM authentication properties
Set Authentication Type to SCRAM_SHA_256 or SCRAM_SHA_512. SCRAM_SHA_512 provides better security.
Specify the following JAAS config:
org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";
Replace <username> and <password> with the actual values.
Setting GSSAPI (Kerberos) authentication properties
Set Authentication Type to GSSAPI.
For detailed information on configuring JAAS for Kerberos authentication, see Authentiation using SASL/Kerberos.
Setting Mutual TLS (mTLS) authentication properties
Set Authentication Type to MutualTLS. This method requires both client and server certificates signed by a trusted Certificate Authority (CA).
Mutual TLS setup for self-managed Kafka
For detailed information on setting up SSL/TLS for self-managed Kafka, see Kafka Documentation: Encryption and Authentication using SSL.
When Use Certificate is set to False, configure the following properties:
SSL Truststore Location.
SSL Truststore Password.
SSL Keystore Location.
SSL Keystore Password.
SSL Key Password (when Private Key in Keystore is encrypted).
When Use Certificate is set to True, configure the following properties:
CA Certificate.
SSL Keystore Certificate Chain.
SSL Keystore Key.
SSL Key Password.
Mutual TLS setup for Confluent Cloud Kafka
Refer to Configure mutual TLS (mTLS) authentication for Confluent Cloud.
Client certificate setup
Follow these steps to set up the client certificate.
Create client certificate:
openssl genpkey -algorithm RSA -out client.key -aes256 openssl req -new -key client.key -out client.csr openssl x509 -req -in client.csr -CA ca-cert.pem -CAkey ca-key.pem \ -CAcreateserial -out client.crt -days 365 -sha256Create client keystore:
openssl pkcs12 -export -in client.crt -inkey client.key \ -certfile ca-cert.pem -out client.keystore.p12 -name kafka-client keytool -importkeystore \ -destkeystore client.keystore.jks \ -srckeystore client.keystore.p12 \ -srcstoretype PKCS12 \ -alias kafka-clientCreate client truststore (trust server CA):
keytool -import -alias myCA -file ca-cert.pem -keystore client.truststore.jks
Connecting to AWS MSK
Ensure your MSK cluster is provisioned and running. The Bootstrap Server URL is available in the AWS MSK Console under cluster details.
Required permissions
Configure the following IAM permissions for Kafka operations with AWS MSK.
KafkaWriter permissions
KafkaWriter requires different permissions depending on the processing mode.
Exactly-once processing (E1P):
{
"Version": "<versionNo>",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kafka:GetBootstrapBrokers",
"kafka:ListTopics",
"kafka:DescribeTopic",
"kafka:DescribeTopicPartitions"
],
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"kafka-cluster:DeleteGroup",
"kafka-cluster:WriteDataIdempotently",
"kafka-cluster:DescribeCluster",
"kafka-cluster:ReadData",
"kafka-cluster:DescribeTransactionalId",
"kafka-cluster:AlterTransactionalId",
"kafka-cluster:DescribeTopicDynamicConfiguration",
"kafka-cluster:AlterTopicDynamicConfiguration",
"kafka-cluster:AlterGroup",
"kafka-cluster:AlterClusterDynamicConfiguration",
"kafka-cluster:AlterTopic",
"kafka-cluster:CreateTopic",
"kafka-cluster:DescribeTopic",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeGroup",
"kafka-cluster:DescribeClusterDynamicConfiguration",
"kafka-cluster:Connect",
"kafka-cluster:DeleteTopic",
"kafka-cluster:WriteData"
],
"Resource": [
"arn:aws:kafka:*:{AWS_ACCOUNT_ID}:transactional-id/*/*/*",
"arn:aws:kafka:*:{AWS_ACCOUNT_ID}:cluster/*/*",
"arn:aws:kafka:*:{AWS_ACCOUNT_ID}:topic/*/*/*",
"arn:aws:kafka:*:{AWS_ACCOUNT_ID}:group/*/*/*"
]
}
]
}At-least-once processing (A1P):
{
"Version": "<versionNo>",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kafka:GetBootstrapBrokers",
"kafka:ListTopics",
"kafka:DescribeTopic",
"kafka:DescribeTopicPartitions"
],
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"kafka-cluster:DeleteGroup",
"kafka-cluster:DescribeCluster",
"kafka-cluster:ReadData",
"kafka-cluster:DescribeTopicDynamicConfiguration",
"kafka-cluster:AlterTopicDynamicConfiguration",
"kafka-cluster:AlterGroup",
"kafka-cluster:AlterClusterDynamicConfiguration",
"kafka-cluster:AlterTopic",
"kafka-cluster:CreateTopic",
"kafka-cluster:DescribeTopic",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeGroup",
"kafka-cluster:DescribeClusterDynamicConfiguration",
"kafka-cluster:Connect",
"kafka-cluster:DeleteTopic",
"kafka-cluster:WriteData"
],
"Resource": [
"arn:aws:kafka:*:{AWS_ACCOUNT_ID}:cluster/*/*",
"arn:aws:kafka:*:{AWS_ACCOUNT_ID}:topic/*/*/*",
"arn:aws:kafka:*:{AWS_ACCOUNT_ID}:group/*/*/*"
]
}
]
}KafkaReader permissions
KafkaReader requires the following IAM permissions.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "MSKClusterDiscovery",
"Effect": "Allow",
"Action": [
"kafka:GetBootstrapBrokers",
"kafka:ListTopics",
"kafka:DescribeTopic",
"kafka:DescribeTopicPartitions"
],
"Resource": "*"
},
{
"Sid": "MSKReaderPermissions",
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:DescribeTopic",
"kafka-cluster:DescribeTopicDynamicConfiguration",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:*:<AWS_ACCOUNT_ID>:cluster/*/*",
"arn:aws:kafka:*:<AWS_ACCOUNT_ID>:topic/*/*/*",
"arn:aws:kafka:*:<AWS_ACCOUNT_ID>:group/*/*/*"
]
}
]
}MSK broker configuration defaults
AWS MSK uses different default values for some broker configuration properties than Apache Kafka. If your MSK cluster has fewer than 3 brokers, you must adjust these settings in the cluster configuration to avoid producer initialization issues.
Property | Apache Kafka default | AWS MSK default |
|---|---|---|
default.replication.factor | 1 | 3 |
min.insync.replicas | 1 | 2 |
offsets.topic.replication.factor | 3 | 3 |
transaction.state.log.replication.factor | 3 | 3 |
Exactly-once processing (E1P) topic configuration for MSK
When using exactly-once processing (E1P) with AWS MSK, you must remove the following topic configurations to avoid application halts. The application halts gracefully and provides suggestions if these properties are detected.
min.cleanable.dirty.ratiosegment.mssegment.bytes
At-least-once processing (A1P) works with default MSK topic configurations without modification.
Broker address configuration
Navigate to Amazon MSK, select Clusters, choose your cluster, and view Client Information. Enter the bootstrap server addresses provided by AWS MSK.
The Broker Address (bootstrap.servers) used to connect to AWS MSK depends on the authentication mechanism configured for your cluster.
Connecting with no authentication
Configure the following properties for connections without authentication.
Property name | Value |
|---|---|
Broker Address | For None (no authentication) or plaintext connections, use the non-TLS plaintext port. Example: |
Authentication Type | NONE |
Use SSL | True (recommended). Set to True if the cluster requires TLS encryption in transit. Otherwise, it can be set to False. SSL is optional if the cluster does not require it, but AWS MSK recommends enabling TLS encryption for data in transit. |
Connecting with SCRAM-SHA-512 authentication
Set up your MSK cluster with SCRAM authentication by following the AWS guide Set up SCRAM Authentication for Amazon MSK.
Property name | Value |
|---|---|
Broker Address | For SCRAM-SHA-512 (SASL/SCRAM) authentication, connections require TLS encryption, so use the TLS-enabled port. Example: |
Authentication Type | SCRAM-SHA-512. SCRAM-SHA-256 is not supported on AWS MSK; use SCRAM-SHA-512 instead. |
Use SSL | True. SSL must be enabled when using SCRAM as SASL/SCRAM authentication requires an encrypted connection. |
JAAS Config | org.apache.kafka.common.security.scram.ScramLoginModule required username="<MSK_USERNAME>" password="<MSK_PASSWORD>"; Replace |
Connecting with AWS_MSK_IAM authentication
Ensure IAM access control is enabled on your MSK cluster. You will use AWS IAM credentials for authentication and authorization. Refer to Amazon MSK IAM Access Control.
Property name | Value |
|---|---|
Broker Address | For IAM authentication, copy the IAM enabled broker address. Example: |
Authentication Type | AWS_MSK_IAM |
Use SSL | True |
JAAS Config | The default value is software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName="my-profile"; |
Advanced configuration
Configure additional Kafka connection and client properties to optimize behavior for your specific use case.
Additional Connection Config: Specify other Kafka connection properties as key-value pairs. For example:
request.timeout.ms: Controls the maximum amount of time the client will wait for the response of a request.retry.backoff.ms: Time between retries.
Producer Config: Define additional Kafka producer configurations as a list of key-value pairs to fine-tune producer client behavior. If not specified, default values are used.
The following properties are used by default (if not explicitly specified by user):
enable.idempotence=true: Ensures ordering and prevents duplicate messages during client-level retries.acks=all: Ensures messages are acknowledged by all in-sync replicas for maximum durability.retries=3: Enables automatic retries on transient failures.max.in.flight.requests.per.connection=5: Preserves ordering of events.
Note
Overriding any of the default values may lead to unexpected behavior, including data duplication or message loss.
Consumer Config: Define additional consumer-specific settings as a list of key-value pairs.
max.partition.fetch.bytes=10485760— Maximum amount of data per partition (10 MB) the server will return. Helps control memory usage.fetch.min.bytes=1048576— Minimum amount of data (1 MB) the server should return for a fetch request. Improves throughput by reducing the number of fetch requests.fetch.max.wait.ms=1000— Maximum time (1 second) the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes.receive.buffer.bytes=2000000— Size of the TCP receive buffer (2 MB) to use when reading data. Optimizes network performance.poll.timeout.ms=10000— Maximum time (10 seconds) to block waiting for records in a single poll() call.