Skip to main content

Striim Platform 5.0 documentation

CREATE EXTERNAL SOURCE

Kafka Connect is a component of Apache Kafka that serves as a centralized hub for data integration between databases, key-value stores, search indexes and file systems. The Connect framework consists of source connector components to ingest and stream data from external sources into Kafka topics and sink connector components to deliver data from the Kafka topics to the external targets.

Striim has a persisted stream feature that allows data captured from external sources by reader adapters to be stored in Kafka topics by persisted stream senders. Data from these Kafka topics can then be retrieved by persisted stream pullers and delivered to external targets using writer adapters.

Striim Connect is a new solution from Striim that uses the first half of the Kafka Connect pipeline and combines it with the second half of the Striim persisted stream pipeline. This allows external sources, for which Striim does not have a reader adapter, to develop a Kafka source connector and be able to integrate with the Striim data pipeline.

Features of Striim Connect

The following are features of Striim Connect:

  • Infer/create Striim types for incoming table data from Debezium SourceRecords or Avro data/schema definitions.

  • Support Initial Loads along with CDC events.

  • Publish appropriate monitoring metrics in Striim’s monitoring app for the source connectors.

    striim-connect-diagram.png
Support for reading data from Debezium or Debezium-compliant source connectors

Striim Connect can read data change event records containing row-level changes in databases from Kafka topics written by any Kafka source connector through Persisted Streams. Striim has certified support for the Debezium-compliant YugabyteDB connector.

The following are requirements for reading the data:

  • The records are serialized in Avro format and the Confluent Schema Registry.

  • The schema of the Avro records conforms to that of the Debezium connectors, where the schema name for the change event value record is Envelope, and the payload for the Envelope records contains:

    • A source field containing a schema and a table field, a combination of which provides a qualified table name in the database

    • An op field with c (insert,) u (update), d (delete), (initial load) or t (truncate, currently ignored) as possible values

    • A before field containing the full before image of data when op is u or d

    • An after field containing the full after image of data when op is c, u or r

Setting up the environment for Striim Connect

The following steps show an example of setting up the environment for Striim Connect. These steps may vary depending on your source and target.

  1. Setup Kafka Broker. Download Kafka 3.3.2 (or above). Start Zookeeper and a Kafka broker.

  2. Setup Confluent Schema Registry. For example:

    docker run -it --rm --name csr -p 8085:8085 -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=<kafka_broker_ip>:9092 -e SCHEMA_REGISTRY_HOST_NAME=localhost -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8085 -e SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=5000 confluentinc/cp-schema-registry:latest
  3. Setup Kafka Connect service with the connector plugin. For example:

    docker run -it --rm --name connect -p 8083:8083 -e BOOTSTRAP_SERVERS=<kafka_broker_ip>:9092 -e GROUP_ID=connect_cluster -e CONFIG_STORAGE_TOPIC=connect_configs -e OFFSET_STORAGE_TOPIC=connect_offsets -e STATUS_STORAGE_TOPIC=connect_status quay.io/yugabyte/debezium-connector:latest
  4. Set up the database instance. For example:

    docker run -d -p 7000:7000 -p 7100:7100 -p 9000:9000 -p 9100:9100 -p 15433:15433 -p 5433:5433 -p 9042:9042 --name yugabyte yugabytedb/yugabyte:latest bin/yugabyted start --daemon=false
  5. In docker perform the following steps:

    1. Check the host and port of the instance.

    2. Connect to Yugabyte SQL (YSQL), create a few tables as the data sources, and insert data in the tables.

    3. Create a change data stream.

    docker exec -it  yugabyte /bin/bash
    (a) yugabyted status
    (b) bin/ysqlsh -h <docker_host_ip>  -U yugabyte -d yugabyte
    (c) bin/yb-admin --master_addresses <docker_host_ip>:7100 create_change_data_stream ysql.yugabyte IMPLICIT ALL

    Note the CDC Stream ID, it will be required in the connector configuration.

  6. Setup PostgreSQL as the target database. Create the target tables in the database.

    docker run -d -p 5432:5432 --name postgres12 123456789012.dkr.ecr.us-west-1.amazonaws.com/postgres-12.2:latest
  7. Start the Striim server. Create a demo app with an external stream and writing to the PostgreSQL target:

    create application demo recovery 30 second interval;
    create PropertySet KConnPropSet (
        zk.address: '10.0.0.92:2181',
        bootstrap.brokers: '10.0.0.92:9092',
        kafkaversion: '2.1',
        partitions: '1',
        replication.factor: '1',
        dataformat: 'avro-debezium',
        schemaregistry: 'http://10.0.0.92:8085'
    );
    create stream yb_data of Global.WAEvent persist using KConnPropSet;
    create external source yb_source (
        connector: 'yugabyteDB',
        configFile: './config/debezium-yugabyte-source-connector.json'
    ) output to yb_data;
    create target pg_replica using Global.DatabaseWriter (
        ConnectionURL:'jdbc:postgresql://<postgresql_ip>:5432/webaction',
        ConnectionRetryPolicy: 'retryInterval=5, maxRetries=3',
        CDDLAction: 'Process',
        Username:'waction',
        Password:'w@ct10n',
        BatchPolicy:'Eventcount:10,Interval:10',
        CommitPolicy:'Interval:10,Eventcount:10',
        Tables: '<list_of_table_mappings>'
    ) input from yb_data;
    create target con_disp using SysOut(name:Disp) input from yb_data;
    end application demo;
    deploy application demo;
    start demo;
  8. Start the source connector.

    curl -H "Content-Type: application/json" -X POST http://<kafka_connect_ip>:8083/connectors -d @"debezium-yugabyte-source-connector.json"

    Configure the source connector as follows:

    {
        "name" : "debezium-yugabyte-source-connector",
        "config": {
    		"connector.class" : "io.debezium.connector.yugabytedb.YugabyteDBConnector",
    		"tasks.max" : 1,
    		"snapshot.mode" : "initial",
    		"exactly.once.support" : "requested",
    		"database.hostname" : "<yugabytedb_ip>",
    		"database.port" : 5433,
    		"database.master.addresses" : "<yugabytedb_ip>:7100",
    		"database.user" : "yugabyte",
    		"database.password" : "yugabyte",
    		"database.dbname" : "yugabyte",
    		"database.server.name" : "dbserver1",
    		"database.streamid" : "<cdc_stream_id>",
    		"table.include.list" : "<list_of_tables>",
    		"tombstones.on.delete" : false,
    		"topic.creation.enable" : true,
    		"topic.prefix" : "debezium",
    		"topic.creation.default.partitions" : 1,
    		"topic.creation.default.replication.factor" : 1,
    		"include.schema.changes" : true,
    		"schema.history.internal.kafka.bootstrap.servers" : "<kafka_broker_ip>:9092", 
    		"schema.history.internal.kafka.topic" : "debezium_schemahistory",
    		"key.converter" : "io.confluent.connect.avro.AvroConverter",
    		"value.converter" : "io.confluent.connect.avro.AvroConverter",
    		"key.converter.schema.registry.url" : "http://<schema_registry_ip>:8085",
    		"value.converter.schema.registry.url" : "http://<schema_registry_ip>:8085",
    		"key.converter.key.subject.name.strategy" : "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
    		"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
    		"transforms" : "Reroute",
    		"transforms.Reroute.type" : "org.apache.kafka.connect.transforms.RegexRouter",
    		"transforms.Reroute.regex" : ".*",
    		"transforms.Reroute.replacement" : "admin_yb_data"
        }
    }
  9. The existing records from the source database tables will appear in the PostgreSQL tables. Do some DMLs in the source database tables, and verify that they appear in the PostgreSQL tables.

    Note

    Another option for testing Striim Connect is to configure and test the Kafka Connect components separately from the Striim components before combining them together. First set up the infrastructure and test data flow from the source database to the Kafka topic. Then test the actual Striim Connect functionality and verify that Striim is able to read from the Kafka topic and deliver to the targets.

Configuring Striim

The following sections describe the configuration for Striim.

Configuring a Kafka PropertySet

When creating a Kafka PropertySet for persisted streams that will be receiving data from Kafka source connectors the following changes are required:

  • You must define the dataformat property with the value:

    • avro-debezium to read data from Debezium connectors or

    • avro-striim to read data from third party source connectors publishing data using Striim prescribed format.

  • Must define the new property schemaregistry with the URL of the Schema Registry for the Avro data being published in the Kafka topics used by the persisted stream.

Configuring the persisted stream

Note the following requirements for the persisted stream:

  • The persisted streams that will receive data from Kafka source connectors must be created before starting the connectors. You must provide the name of the Kafka topic created by the persisted stream in the corresponding source connector configuration as its target.

  • No Striim component can write to persisted streams with the data format avro-debezium or avro-striim. Striim components are only allowed to read from them, while Kafka source connectors will write to them.

  • The type of such persisted streams must be Global.WAEvent. They cannot be of any other event type or a user-defined type.

Configuring target mapping in the Striim app

For the source database, the ordering of columns in the WAEvents coming out of the persisted stream will only match the ordering of columns in the source table if the primary key columns are defined in the table in order in the beginning of the column list. Otherwise, in the WAEvents, the primary key will appear first if it's a single key. If it's a composite key, there is no guarantee on the ordering of those columns. Also, there is no guarantee on ordering of the remaining non-PK columns.

For these reasons, rather than assuming the source table column ordering, you should use an explicit column mapping by name in the targets.

Database mapping

Any source database data will be written to the Kafka topic as one the Avro datatypes. This may cause loss of type information as Avro types are limited compared to the source databases. To mitigate this issue, Debezium provides an optional Semantic Type with every Literal Type (Avro schema type) definition with the data records. The source connectors must provide the semantic types for data when required for correct datatype handling by Striim Connect.

If a semantic type is present, the following is the datatype mapping from Avro data to Striim events:

Semantic type

Description

Literal type

Java type

io.debezium.time.Date

# of days since the epoch

int

java.time.LocalDate

io.debezium.time.Time

# of milliseconds past midnight

int

java.time.LocalTime

io.debezium.time.MicroTime

# of microseconds past midnight

long

java.time.LocalTime

io.debezium.time.NanoTime

# of nanoseconds past midnight

long

java.time.LocalTime

io.debezium.time.Timestamp

# of milliseconds past the epoch

long

java.time.LocalDateTime

io.debezium.time.MicroTimestamp

# of microseconds past the epoch

long

java.time.LocalDateTime

io.debezium.time.NanoTimestamp

# of nanoseconds past the epoch

long

java.time.LocalDateTime

io.debezium.time.ZonedTimestamp

timestamp in ISO 8601 format

String

java.time.LocalDateTime

org.apache.kafka.connect.data.Date

# of days since the epoch

int

java.time.LocalDate

org.apache.kafka.connect.data.Time

# of microseconds past midnight

long

java.time.LocalTime

org.apache.kafka.connect.data.Timestamp

# of milliseconds since epoch

long

java.time.LocalDateTime

If there is no semantic type, the following is the datatype mapping from Avro data to Striim events:

Literal type

Java type

Notes

boolean

java.lang.Boolean

int

java.lang.Integer

long

java.lang.Long

float

java.lang.Float

double

java.lang.Double

string

java.lang.String

enum

java.lang.String

fixed

java.lang.String

bytes

java.lang.String

Hex encoded

array

java.lang.String

Comma separated, curly bracket enclosed

Creating an EXTERNAL SOURCE using TQL

A new Striim EntityType named External Source allows you to indicate the original source the Kafka source connector is reading from. It only exists as metadata and does not get deployed as part of a Striim application. This entity has an optional set of properties to describe it, such as the connector name and the connector’s configuration file location. These properties are only for descriptive purposes. Whereas the stream entity an external source writes to is part of striim and is deployed with it’s application. The stream must be a persisted stream of type Global.WAEvent with a proper value for dataformat and schemaregistry defined in its property set.

W (admin) > create propertySet KConnPropSet (
    zk.address: '10.0.0.92:2181',
    bootstrap.brokers: '10.0.0.92:9092',
    kafkaversion: '2.1',
    partitions: '1',
    replication.factor: '1',
    dataformat: 'avro-debezium',
    schemaregistry: 'http://10.0.0.92:8085'
);
W (admin) > create stream example_data of Global.WAEvent 
   persist using KConnPropSet;
W (admin) > create external source example_source (
    connector: 'exampleDB',
    configFile: './config/source-connector.json'
) output to example_data;

Or create the stream implicitly along with the external source:

W (admin) > create propertySet KConnPropSet (
    zk.address: '10.0.0.92:2181',
    bootstrap.brokers: '10.0.0.92:9092',
    kafkaversion: '2.1',
    partitions: '1',
    replication.factor: '1',
    dataformat: 'avro-debezium',
    schemaregistry: 'http://10.0.0.92:8085'
);
W (admin) > create external source example_source (
    connector: 'exampleDB',
    configFile: './config/source-connector.json'
) output to example_data persist using KConnPropSet;

You can use the list, describe and drop TQL commands on this new EntityType:

W (admin) > list external sources;
EXTERNALSOURCE 1 =>  admin.example_source
W (admin) > describe external source example_source;
EXTERNALSOURCE admin.example_source CREATED 2024-02-27 21:05:49
 WITH PROPERTIES (
   configFile: ./config/source-connector.json,
   connector: exampleDB
)
OUTPUTS TO STREAM example_data
W (admin) > drop external source ext_source;
EXTERNALSOURCE example_source dropped successfully