CREATE EXTERNAL SOURCE
Kafka Connect is a component of Apache Kafka that serves as a centralized hub for data integration between databases, key-value stores, search indexes and file systems. The Connect framework consists of source connector components to ingest and stream data from external sources into Kafka topics and sink connector components to deliver data from the Kafka topics to the external targets.
Striim has a persisted stream feature that allows data captured from external sources by reader adapters to be stored in Kafka topics by persisted stream senders. Data from these Kafka topics can then be retrieved by persisted stream pullers and delivered to external targets using writer adapters.
Striim Connect is a new solution from Striim that uses the first half of the Kafka Connect pipeline and combines it with the second half of the Striim persisted stream pipeline. This allows external sources, for which Striim does not have a reader adapter, to develop a Kafka source connector and be able to integrate with the Striim data pipeline.
Features of Striim Connect
The following are features of Striim Connect:
Infer/create Striim types for incoming table data from Debezium SourceRecords or Avro data/schema definitions.
Support Initial Loads along with CDC events.
Publish appropriate monitoring metrics in Striim’s monitoring app for the source connectors.
Support for reading data from Debezium or Debezium-compliant source connectors
Striim Connect can read data change event records containing row-level changes in databases from Kafka topics written by any Kafka source connector through Persisted Streams. Striim has certified support for the Debezium-compliant YugabyteDB connector.
The following are requirements for reading the data:
The records are serialized in Avro format and the Confluent Schema Registry.
The schema of the Avro records conforms to that of the Debezium connectors, where the schema name for the change event value record is
Envelope
, and the payload for theEnvelope
records contains:A
source
field containing aschema
and atable
field, a combination of which provides a qualified table name in the databaseAn op field with
c
(insert,)u
(update),d
(delete), (initial load) ort
(truncate, currently ignored) as possible valuesA
before
field containing the full before image of data when op isu
ord
An
after
field containing the full after image of data when op isc
,u
orr
Setting up the environment for Striim Connect
The following steps show an example of setting up the environment for Striim Connect. These steps may vary depending on your source and target.
Setup Kafka Broker. Download Kafka 3.3.2 (or above). Start Zookeeper and a Kafka broker.
Setup Confluent Schema Registry. For example:
docker run -it --rm --name csr -p 8085:8085 -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=<kafka_broker_ip>:9092 -e SCHEMA_REGISTRY_HOST_NAME=localhost -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8085 -e SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=5000 confluentinc/cp-schema-registry:latest
Setup Kafka Connect service with the connector plugin. For example:
docker run -it --rm --name connect -p 8083:8083 -e BOOTSTRAP_SERVERS=<kafka_broker_ip>:9092 -e GROUP_ID=connect_cluster -e CONFIG_STORAGE_TOPIC=connect_configs -e OFFSET_STORAGE_TOPIC=connect_offsets -e STATUS_STORAGE_TOPIC=connect_status quay.io/yugabyte/debezium-connector:latest
Set up the database instance. For example:
docker run -d -p 7000:7000 -p 7100:7100 -p 9000:9000 -p 9100:9100 -p 15433:15433 -p 5433:5433 -p 9042:9042 --name yugabyte yugabytedb/yugabyte:latest bin/yugabyted start --daemon=false
In docker perform the following steps:
Check the host and port of the instance.
Connect to Yugabyte SQL (YSQL), create a few tables as the data sources, and insert data in the tables.
Create a change data stream.
docker exec -it yugabyte /bin/bash (a) yugabyted status (b) bin/ysqlsh -h <docker_host_ip> -U yugabyte -d yugabyte (c) bin/yb-admin --master_addresses <docker_host_ip>:7100 create_change_data_stream ysql.yugabyte IMPLICIT ALL
Note the CDC Stream ID, it will be required in the connector configuration.
Setup PostgreSQL as the target database. Create the target tables in the database.
docker run -d -p 5432:5432 --name postgres12 123456789012.dkr.ecr.us-west-1.amazonaws.com/postgres-12.2:latest
Start the Striim server. Create a demo app with an external stream and writing to the PostgreSQL target:
create application demo recovery 30 second interval; create PropertySet KConnPropSet ( zk.address: '10.0.0.92:2181', bootstrap.brokers: '10.0.0.92:9092', kafkaversion: '2.1', partitions: '1', replication.factor: '1', dataformat: 'avro-debezium', schemaregistry: 'http://10.0.0.92:8085' ); create stream yb_data of Global.WAEvent persist using KConnPropSet; create external source yb_source ( connector: 'yugabyteDB', configFile: './config/debezium-yugabyte-source-connector.json' ) output to yb_data; create target pg_replica using Global.DatabaseWriter ( ConnectionURL:'jdbc:postgresql://<postgresql_ip>:5432/webaction', ConnectionRetryPolicy: 'retryInterval=5, maxRetries=3', CDDLAction: 'Process', Username:'waction', Password:'w@ct10n', BatchPolicy:'Eventcount:10,Interval:10', CommitPolicy:'Interval:10,Eventcount:10', Tables: '<list_of_table_mappings>' ) input from yb_data; create target con_disp using SysOut(name:Disp) input from yb_data; end application demo; deploy application demo; start demo;
Start the source connector.
curl -H "Content-Type: application/json" -X POST http://<kafka_connect_ip>:8083/connectors -d @"debezium-yugabyte-source-connector.json"
Configure the source connector as follows:
{ "name" : "debezium-yugabyte-source-connector", "config": { "connector.class" : "io.debezium.connector.yugabytedb.YugabyteDBConnector", "tasks.max" : 1, "snapshot.mode" : "initial", "exactly.once.support" : "requested", "database.hostname" : "<yugabytedb_ip>", "database.port" : 5433, "database.master.addresses" : "<yugabytedb_ip>:7100", "database.user" : "yugabyte", "database.password" : "yugabyte", "database.dbname" : "yugabyte", "database.server.name" : "dbserver1", "database.streamid" : "<cdc_stream_id>", "table.include.list" : "<list_of_tables>", "tombstones.on.delete" : false, "topic.creation.enable" : true, "topic.prefix" : "debezium", "topic.creation.default.partitions" : 1, "topic.creation.default.replication.factor" : 1, "include.schema.changes" : true, "schema.history.internal.kafka.bootstrap.servers" : "<kafka_broker_ip>:9092", "schema.history.internal.kafka.topic" : "debezium_schemahistory", "key.converter" : "io.confluent.connect.avro.AvroConverter", "value.converter" : "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url" : "http://<schema_registry_ip>:8085", "value.converter.schema.registry.url" : "http://<schema_registry_ip>:8085", "key.converter.key.subject.name.strategy" : "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy", "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy", "transforms" : "Reroute", "transforms.Reroute.type" : "org.apache.kafka.connect.transforms.RegexRouter", "transforms.Reroute.regex" : ".*", "transforms.Reroute.replacement" : "admin_yb_data" } }
The existing records from the source database tables will appear in the PostgreSQL tables. Do some DMLs in the source database tables, and verify that they appear in the PostgreSQL tables.
Note
Another option for testing Striim Connect is to configure and test the Kafka Connect components separately from the Striim components before combining them together. First set up the infrastructure and test data flow from the source database to the Kafka topic. Then test the actual Striim Connect functionality and verify that Striim is able to read from the Kafka topic and deliver to the targets.
Configuring Striim
The following sections describe the configuration for Striim.
Configuring a Kafka PropertySet
When creating a Kafka PropertySet for persisted streams that will be receiving data from Kafka source connectors the following changes are required:
You must define the
dataformat
property with the value:avro-debezium
to read data from Debezium connectors oravro-striim
to read data from third party source connectors publishing data using Striim prescribed format.
Must define the new property schemaregistry with the URL of the Schema Registry for the Avro data being published in the Kafka topics used by the persisted stream.
Configuring the persisted stream
Note the following requirements for the persisted stream:
The persisted streams that will receive data from Kafka source connectors must be created before starting the connectors. You must provide the name of the Kafka topic created by the persisted stream in the corresponding source connector configuration as its target.
No Striim component can write to persisted streams with the data format
avro-debezium
oravro-striim
. Striim components are only allowed to read from them, while Kafka source connectors will write to them.The type of such persisted streams must be
Global.WAEvent
. They cannot be of any other event type or a user-defined type.
Configuring target mapping in the Striim app
For the source database, the ordering of columns in the WAEvents coming out of the persisted stream will only match the ordering of columns in the source table if the primary key columns are defined in the table in order in the beginning of the column list. Otherwise, in the WAEvents, the primary key will appear first if it's a single key. If it's a composite key, there is no guarantee on the ordering of those columns. Also, there is no guarantee on ordering of the remaining non-PK columns.
For these reasons, rather than assuming the source table column ordering, you should use an explicit column mapping by name in the targets.
Database mapping
Any source database data will be written to the Kafka topic as one the Avro datatypes. This may cause loss of type information as Avro types are limited compared to the source databases. To mitigate this issue, Debezium provides an optional Semantic Type with every Literal Type (Avro schema type) definition with the data records. The source connectors must provide the semantic types for data when required for correct datatype handling by Striim Connect.
If a semantic type is present, the following is the datatype mapping from Avro data to Striim events:
Semantic type | Description | Literal type | Java type |
---|---|---|---|
io.debezium.time.Date | # of days since the epoch | int | java.time.LocalDate |
io.debezium.time.Time | # of milliseconds past midnight | int | java.time.LocalTime |
io.debezium.time.MicroTime | # of microseconds past midnight | long | java.time.LocalTime |
io.debezium.time.NanoTime | # of nanoseconds past midnight | long | java.time.LocalTime |
io.debezium.time.Timestamp | # of milliseconds past the epoch | long | java.time.LocalDateTime |
io.debezium.time.MicroTimestamp | # of microseconds past the epoch | long | java.time.LocalDateTime |
io.debezium.time.NanoTimestamp | # of nanoseconds past the epoch | long | java.time.LocalDateTime |
io.debezium.time.ZonedTimestamp | timestamp in ISO 8601 format | String | java.time.LocalDateTime |
org.apache.kafka.connect.data.Date | # of days since the epoch | int | java.time.LocalDate |
org.apache.kafka.connect.data.Time | # of microseconds past midnight | long | java.time.LocalTime |
org.apache.kafka.connect.data.Timestamp | # of milliseconds since epoch | long | java.time.LocalDateTime |
If there is no semantic type, the following is the datatype mapping from Avro data to Striim events:
Literal type | Java type | Notes |
---|---|---|
boolean | java.lang.Boolean | |
int | java.lang.Integer | |
long | java.lang.Long | |
float | java.lang.Float | |
double | java.lang.Double | |
string | java.lang.String | |
enum | java.lang.String | |
fixed | java.lang.String | |
bytes | java.lang.String | Hex encoded |
array | java.lang.String | Comma separated, curly bracket enclosed |
Creating an EXTERNAL SOURCE using TQL
A new Striim EntityType
named External Source allows you to indicate the original source the Kafka source connector is reading from. It only exists as metadata and does not get deployed as part of a Striim application. This entity has an optional set of properties to describe it, such as the connector name and the connector’s configuration file location. These properties are only for descriptive purposes. Whereas the stream entity an external source writes to is part of striim and is deployed with it’s application. The stream must be a persisted stream of type Global.WAEvent
with a proper value for dataformat
and schemaregistry
defined in its property set.
W (admin) > create propertySet KConnPropSet ( zk.address: '10.0.0.92:2181', bootstrap.brokers: '10.0.0.92:9092', kafkaversion: '2.1', partitions: '1', replication.factor: '1', dataformat: 'avro-debezium', schemaregistry: 'http://10.0.0.92:8085' ); W (admin) > create stream example_data of Global.WAEvent persist using KConnPropSet; W (admin) > create external source example_source ( connector: 'exampleDB', configFile: './config/source-connector.json' ) output to example_data;
Or create the stream implicitly along with the external source:
W (admin) > create propertySet KConnPropSet ( zk.address: '10.0.0.92:2181', bootstrap.brokers: '10.0.0.92:9092', kafkaversion: '2.1', partitions: '1', replication.factor: '1', dataformat: 'avro-debezium', schemaregistry: 'http://10.0.0.92:8085' ); W (admin) > create external source example_source ( connector: 'exampleDB', configFile: './config/source-connector.json' ) output to example_data persist using KConnPropSet;
You can use the list
, describe
and drop
TQL commands on this new EntityType
:
W (admin) > list external sources; EXTERNALSOURCE 1 => admin.example_source W (admin) > describe external source example_source; EXTERNALSOURCE admin.example_source CREATED 2024-02-27 21:05:49 WITH PROPERTIES ( configFile: ./config/source-connector.json, connector: exampleDB ) OUTPUTS TO STREAM example_data W (admin) > drop external source ext_source; EXTERNALSOURCE example_source dropped successfully