Created an Apache Kafka cluster (3 brokers, 3 controllers, 1 worker for now) for a project and have multiple topics receiving data from a 3rd party. The worker is leveraging the confluent elasticsearch plugin.
I Would like to understand where I’m going wrong with the configuration or my understanding of the data. I am relatively new to Apache Kafka and only a fledgling developer. I would like to think I have a decent grasp of tech but the Kafka ecosystem makes my head swim.
Example output from console consumer for 1 topic. All are similarly formatted with no schema.
{
"location": "Battery Tester1",
"dc": "16.20V",
"ac": "12.01V",
"curr": " 0.00A",
"temperature": "32.00C",
"status": [
"Currently on AC power"
]
}
{
"location": "Battery Tester2",
"dc": "16.10V",
"ac": "11.01V",
"curr": " 2.00A",
"temperature": "34.00C",
"status": [
"Currently on AC power"
]
}
{
"location": "Battery Tester3",
"status": [
"Currently on AC power"
]
}
The connect-standalone.properties are:
bootstrap.servers=kafbrk01-4:9092,kafbrk01-5:9092,kafbrk01-6:9092
config.storage.topic: es-connect-kafwrk01-configs
offset.storage.topic: es-connect-kafwrk01-offsets
status.storage.topic: es-connect-kafwrk01-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins
With the plugin quickstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=Power,Router,Gateway
key.ignore=true
connection.url=https://<FQDN to es01>:9200,https://<FQDN to es02>:9200,https://<FQDN to es03>:9200
connection.username=es_sink_connector_user
connection.password=FakePasswordBecause!
type.name=kafka-connect
elastic.security.protocol = PLAINTEXT
schema.ignore=true
When running with this configuration, get the following error stack
[2024-04-24 22:06:15,672] ERROR [elasticsearch-sink|task-0] WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:533)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:533)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"key"; line: 1, column: 4]
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:69)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:331)
... 18 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"key"; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3090)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:67)
... 19 more
I originally wrote a small python application when building/testing the kafka cluster to write entries into the cluster and the sink works with it. I’m stumped but based on reading/research feel like this is perhaps the JSON formatting? I’m seeing what looks like valid JSON to me but does the error indicate it’s not and the console consumer is showing it to me that way by design?
Also, I tried using the org.apache.kafka.connect.storage.StringConverter
for key/value but that creates empty indices and errors out after 6 retries with errors referencing org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
.
Any guidance, tips or feedback is appreciated.
Thank you for your time regardless.