We are using: Embulk version v0.10.12.
We are collecting files using sftp input and pushing them to Kafka using embulk-output-kafka.
From time to time, we face duplicated kafka messages within our output Kafka topic although the Embulk logs shows that each file is processed ony once and Embulk Kafka producer pushes the message only once.
What could be the reason of such duplication ?
Please find below input configuration and Kafka Producer configuration:
Input configuration
type: sftp
host: myhost
port: 22
user: xxxx
password: xxxx
incremental: true
user_directory_is_root: false
max_sub_directory_depth: -1
timeout: 600
path_prefix: mypathprefix
Kafka Producer Configuration
acks = all
batch.size = 16384
bootstrap.servers = [kafka-headless:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-3
compression.type = snappy
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.LongSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 15728640
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 1
retry.backoff.ms = 100
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
We would have think that this could have been due to network transient issue scenario between Embulk and Kafka, and so we have set the Embulk Kafka Producer parameters to : acks = all ( -1 ) , and enable.idempotence = true, but this did not solve the issue of having from time to time duplicated kafka messages.