I setup a kafka connect in ubuntu server and i want to consume data from mongoDB
kafka connector: connector-config.properties
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=mongodb://user@[email protected]:27017/>authMechanism=SCRAM_SHA-256
database=main
collection=messages
topic=msg-test
and with this command run kafka connect:
bin/connect-standalone.sh config/connect-standalone.properties connector-config.properties
also this is a latest terminal logged after run above command:
[2024-06-26 00:49:59,810] INFO [mongo-source|task-0] Opened connection [connectionId{localValue:5, serverValue:98}] to 192.168.1.10:27017 (org.mongodb.driver.connection:71)
[2024-06-26 00:49:59,814] INFO [mongo-source|task-0] Watching for collection changes on 'main.messages' (com.mongodb.kafka.connect.source.MongoSourceTask:661)
[2024-06-26 00:49:59,805] INFO [mongo-source|task-0] Monitor thread successfully connected to server with description ServerDescription{address=192.168.1.10:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=13, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=7402926, setName='rs0', canonicalAddress=xxxxxxxxx:27017, hosts=[xxxxxxxxx:27017], passives=[], arbiters=[], primary='xxxxxxxxx:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000003, setVersion=1, topologyVersion=TopologyVersion{processId=667ba7a9a4ba0f0eb803b5fc, counter=6}, lastWriteDate=Wed Jun 26 00:49:50 PDT 2024, lastUpdateTimeNanos=44430480840798} (org.mongodb.driver.cluster:71)
[2024-06-26 00:49:59,907] INFO [mongo-source|task-0] Resuming the change stream after the previous offset: {"_data": "82667BB87A000000012B022C0100296E5A10043096FDA5112647D2B21F031C92FDD33046645F69640064667BB8793EE6AA48299610C10004"} (com.mongodb.kafka.connect.source.MongoSourceTask:407)
[2024-06-26 00:50:00,014] INFO [mongo-source|task-0] Opened connection [connectionId{localValue:6, serverValue:99}] to 192.168.1.10:27017 (org.mongodb.driver.connection:71)
[2024-06-26 00:50:00,064] INFO [mongo-source|task-0] Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:170)
[2024-06-26 00:50:00,064] INFO [mongo-source|task-0] WorkerSourceTask{id=mongo-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:281)
this lines created when i add new data to DB
[2024-06-25 23:29:24,628] INFO [mongo-source|task-0|offsets] WorkerSourceTask{id=mongo-source-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2024-06-25 23:29:34,630] INFO [mongo-source|task-0|offsets] WorkerSourceTask{id=mongo-source-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
With this command run a console-consumer:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic msg-test --from-beginning
but nothing return in console!
Also i tested with python-kafka but result as the same:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'ehr-test', # Topic name
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
print(message.value)