I am new to kafka and kafka-connect.
I want to do a data migration from MongoDB to Atlas MongoDB via MongoDB Kafka Connect.
I am using kafka 2.12-2.3.0 version and uses mongo-kafka-connect-1.13.0-all.jar
for kafka-connect.
After starting zookeeper and kafka, I am running connect-distributed
like:
cd /opt/kafka_2.12-2.3.0
sudo bash bin/connect-distributed.sh config/connect-distributed.properties &disown
After starting or running connect-distributed
I get the log showing both ProducerConfig
and ConsumerConfig
ProducerConfig
[2024-09-25 07:13:37,742] INFO ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 2147483647
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig:347)
ConsumerConfig
[2024-09-25 07:13:37,750] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-cluster
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:347)
What I want to achieve is to update the defaults in ProducerConfig's max.request.size = 1048576
and increase it to something like max.request.size =10485760
.
Same with ConsumerConfig's max.partition.fetch.bytes = 1048576
and increase it like max.partition.fetch.bytes = 10485760
.
My problem now is where can I find these properties files.
Below are the list of configurations in /opt/kafka_2.12-2.3.0/config
:
butitoy@my-server:/opt/kafka_2.12-2.3.0$ ls -l config/
total 68
-rw-r--r-- 1 root root 906 Jun 19 2019 connect-console-sink.properties
-rw-r--r-- 1 root root 909 Jun 19 2019 connect-console-source.properties
-rw-r--r-- 1 root root 5482 Sep 25 02:36 connect-distributed.properties
-rw-r--r-- 1 root root 883 Jun 19 2019 connect-file-sink.properties
-rw-r--r-- 1 root root 881 Jun 19 2019 connect-file-source.properties
-rw-r--r-- 1 root root 1552 Jun 19 2019 connect-log4j.properties
-rw-r--r-- 1 root root 2290 Sep 24 04:03 connect-standalone.properties
-rw-r--r-- 1 root root 1344 Sep 25 05:33 consumer.properties
-rw-r--r-- 1 root root 4727 Jun 19 2019 log4j.properties
-rw-r--r-- 1 root root 1933 Sep 25 05:33 producer.properties
-rw-r--r-- 1 root root 6925 Sep 24 15:24 server.properties
-rw-r--r-- 1 root root 1032 Jun 19 2019 tools-log4j.properties
-rw-r--r-- 1 root root 1169 Jun 19 2019 trogdor.conf
-rw-r--r-- 1 root root 1023 Jun 19 2019 zookeeper.properties
butitoy@my-server:/opt/kafka_2.12-2.3.0$
I tried adding max.request.size
in server.properties
and producer.properties
, and max.partition.fetch.bytes
in consumer.properties
. Even in connect-distributed.properties
, the changes were not reflected in ProducerConfig
and ConsumerConfig
.
How do I change the default values and what config/properties files do I need to change?
Kafka Connect does not read the server, consumer, or producer properties files
If you must set client configs, you’d do it in the connect-distributed.properties file,
as written in the docs.
For configuration of the producers used by Kafka source tasks and the consumers used by Kafka sink tasks, the same parameters can be used but need to be prefixed with
producer.
andconsumer.
respectively
Or at the individual connector level
Starting with 2.3.0, client configuration overrides can be configured individually per connector by using the prefixes
producer.override.
andconsumer.override.
https://kafka.apache.org/documentation/#connect_running
You shouldn’t need sudo to run Kafka Connect, and if possible, I’d recommend upgrading to latest Kafka, as there have been improvements in both client rebalancing protocols, transactions, as well as Connect features.