I have an issue with sink connector, as it is not producing a table with needed data. There are no serious warnings display in the kafka connect log.
Data are available in kafka itself, but I cannot get them out from kafka to MS SQL Server.
Connectors in use:
- SINK: confluentinc jdbc sink connector
- SOURCE: debezium sqlserver source connector
Docker file:
---
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.6.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
schema-registry:
image: confluentinc/cp-schema-registry:7.6.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: confluentinc/cp-kafka-connect:7.6.1
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
#CONNECT_KEY_CONVERTER_SCHEMA_ENABLE: false
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
#CONNECT_VALUE_CONVERTER_SCHEMA_ENABLE: false
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_INTERNAL_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/confluent-hub-components/'
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
volumes:
- "./confluent-hub-components/:/confluent-hub-components/"
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.6.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
By using ksqldb-server, here are the connectors:
SOURCE
CREATE SOURCE CONNECTOR sqlserver_source_connector
WITH (
'name' = 'test-connector',
'tasks.max' = '1',
'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname' = '[HOST]',
'database.port' = '[PORT]',
'database.user' = '[USER]',
'database.password' = '[PW]',
'database.names' = 'KafkaDB',
'database.server.name' = '[SERVER]',
'topic.prefix' = 'testing',
'table.whitelist' = 'dbo.ORDERS',
'database.encrypt' = 'false',
'schema.history.internal.kafka.bootstrap.servers' = 'broker:29092',
'schema.history.internal.kafka.topic' = 'schemahistory.fullfillment',
'decimal.handling.mode' = 'double',
'message.key.columns' = 'dbo.ORDERS:order_id',
'key.converter'= 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url'= 'http://schema-registry:8081',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);
SINK
CREATE SINK CONNECTOR sqlserver_sink_connector
WITH (
'name' = 'jdbc-connector',
'tasks.max' = '1',
'topics' = 'KafkaDB.dbo.ORDERS',
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:sqlserver://[CONNECTION STRING]',
'connection.username' = '[LOGIN]',
'connection.password' = '[PW]',
'transforms' = 'unwrap',
'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones' = 'false',
'auto.create' = 'true',
'insert.mode' = 'insert',
'delete.enabled' = 'true',
'pk.fields' = 'id',
'pk.mode' = 'record_key',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'table.name.format' = 'dbo.ORDERS_'
);
And here is the only WARN message from the log:
connect | [2024-06-18 11:56:43,646] WARN The deleted record handling configs "drop.tombstones" and "delete.handling.mode" have been deprecated, please use "delete.tombstone.handling.mode" instead. (io.debezium.transforms.AbstractExtractNewRecordState)