Summary: I need to send records to kafka topic and using microsoft sql server sink connector have to fetch the record from topic and send to database.
**
Approaches i tried for sending records to topic.**
1)Manual Approach (Produce a message button)
2)Rest api (postman)
3)Java producer application (using Ide)
4)Boomi process
**
Concern:**
-
Whenever i produce records manually (in confluent cloud produce message button) it sent to topic and mssql sink connector will processed the record and sent to db -→ This case is succeed
-
But if i send with the help of 3rd parties like Boomi, Postman, Java application ==> Records sent to topic but the mssql sink connector will push the records into DLQ, it wont sent to Db.
This is the error i noticed in DLQ message.
This is the schema i added for the topic
[ { “key”: “__connect.errors.topic”, “value”: “sample_data_test” }, { “key”: “__connect.errors.partition”, “value”: “5” }, { “key”: “__connect.errors.offset”, “value”: “12” }, { “key”: “__connect.errors.connector.name”, “value”: “lcc-vk28vj” }, { “key”: “__connect.errors.task.id”, “value”: “0” }, { “key”: “__connect.errors.stage”, “value”: “VALUE_CONVERTER” }, { “key”: “__connect.errors.class.name”, “value”: “io.confluent.connect.json.JsonSchemaConverter” }, { “key”: “__connect.errors.exception.class.name”, “value”: “org.apache.kafka.connect.errors.DataException” }, { “key”: “__connect.errors.exception.message”, “value”: “Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test: ” }, { “key”: “__connect.errors.exception.stacktrace”, “value”: “org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test: ntat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:546)ntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)ntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)ntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:521)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:347)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:302)ntat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)ntat java.base/java.lang.Thread.run(Thread.java:1583)nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1ntat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)ntat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)ntat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193)ntat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)nt… 17 morenCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!ntat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:638)ntat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:129)nt… 20 moren” } ] -
So, In Java producer application what i have done is before sending message to topic i did the serialization also.
**1)Is there any Limitation for mssql sink connector (while sending records through 3rd parties)?
2)Is there any other approach?**
(Whenever i send records to kafka topic via boomi process using kafka connector, restapi, or java producer application)Mssql sink connector should process the record and send to db.
Note: By using the above approaches data sent to topic but mssql sink can’t processed these records.
Hemalathaa is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.