I’m new to this domain and was trying to implement an inter-service communication system using the kafka-python library. I’m having a problem with said implementation where the consumer in the Main block is stalled and nothing really seems to happen. Checking the kafka queue via a GUI tool tells me that the message has been sent from the viewing microservice to the main one in the topic ‘send_topic’, leading me to wonder if running both a producer and a consumer in the same service might be causing this. Any suggestions with respect to the code or alternate approaches would be appreciated, thanks!
The code I’m trying to implement looks something like this:
Main:
#imports and other routes omitted
@app.route("/view_data")
def view():
p = KafkaProducer(bootstrap_servers = 'Localhost:9092', value_serializer = lambda x: json.dumps(x).encode('utf-8'))
file_id = int(input("Enter a file id: "))
p.send('view_topic', value=file_id)
c = KafkaConsumer('send_topic', bootstrap_servers='localhost:9092', value_deserializer = lambda x: json.loads(x.decode('utf-8')))
print('consumed') #testing purposes
for file_data in c:
file_data = file_data.value
print('test')
return(file_data)
Viewing Microservice:
app = flask.Flask(__name__)
mongo_client = PyMongo(app, uri='mongodb://localhost:27017/LRTest')
db = mongo_client.db
col = db.col
fs = gridfs.GridFS(db)
p = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer = lambda x: json.dumps(x).encode('utf-8'))
c = KafkaConsumer('view_topic', bootstrap_servers = 'localhost:9092', auto_offset_reset = 'latest', value_deserializer = lambda x: json.loads(x.decode('utf-8')))
for i in c:
file = fs.find_one({'fid':int(i.value)})
file_data = str(file.read())
print('test 1')
p.send('send_topic', value=file_data)
print('sent')
Snak is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.