Tools that I am using
- Cloudera Streams Messaging Manager
- Kafka Connect
- debezium.connector.sqlserver.SqlServerConnector
- tabular.iceberg.connect.IcebergSinkConnector
- Schema Registry
I have enabled CDC on the desired table on my MSSQL Server Database.
The debezium SQL Server connector seems to work as expected. It auto creates the topic for the table, and its producing the messages as per events (Insert/Delete) on the table.
But the Tabular iceberg connector does not work as expected.
When I custom create the iceberg table (using Nessie catalog) matching the schema of the source table in MS SQL Database, for each record it creates rows with Empty/null values.
When I select to use iceberg.tables.auto-create-enabled
and it creates a table but the schema of this table is bot of the table’s schema and no rows are inserted in this tables regardless of data being inserted in the source table.
Here is the configuration of the Sink connector I am using:
{
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"consumer.override.sasl.jaas.config": "",
"iceberg..s3.secret-access-key": "",
"iceberg..s3a.path.style.access": "true",
"iceberg.catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog",
"iceberg.catalog.client.region": "default",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.ref": "main",
"iceberg.catalog.s3.access-key-id": "",
"iceberg.catalog.s3.compat": "true",
"iceberg.catalog.s3.endpoint": "https://myminio.mydomain.com:21000",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.catalog.s3.secret-access-key": "",
"iceberg.catalog.uri": "http://mynessie.mydomain.com:19120/api/v1",
"iceberg.catalog.warehouse": "s3://testbucket",
"iceberg.fs.defaultFS": "s3://testbucket",
"iceberg.fs.s3a.impl": "org.apache.fs.s3a.S3AFileSystem",
"iceberg.tables": "schema.sqlserver_kafka_cdc_test",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.tables.evolve-schema-enabled": "true",
"iceberg.tables.schema-case-insensitive": "true",
"key.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
"key.converter.passthrough.enabled": "false",
"key.converter.schema.registry.url": "https://schemareg.mydomain.com:7790/api/v1/",
"tasks.max": "1",
"topics": " sqls_cdc_new.dbo.Employees",
"value.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
"value.converter.passthrough.enabled": "false",
"value.converter.schema.registry.url": "https://schemareg.mydomain.com:7790/api/v1/",
"secret.properties": "consumer.override.sasl.jaas.config",
"name": "sqlserver_kafka_cdc_sink_iceberg_test"
}
I tried creating custom destination table matching the source table schema and also tried letting the connector create the table. When I created a custom table, it creates empty rows, and when it auto creates the table, the schema does not match to the source table, and no rows are created.
GulamAbbas Sethjiwala is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.