I am writing a Flask API to retrieve data from Apache Kafka topics. The consumer subscribes to a different topic each time according to the API path and uses the confluent_kafka.Consumer.consume
method to fetch a given number of events each time. Here is the consumer configuration I am using:
consumerConfig = {
'bootstrap.servers': "localhost:9092,0.0.0.0:9092",
'client.id': socket.gethostname(),
'group.id': "group",
'auto.offset.reset': "latest",
'enable.auto.commit': False,
'fetch.max.bytes': 52428800, # 50MB
'fetch.min.bytes': 1,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 300000,
}
The code to consume events from the given topic is as follows:
def consumeEvent(topic,consumer, messages_number=1, time_out=60):
consumer.subscribe(topic, on_assign=assignment_callback)
try:
messages= consumer.consume(num_messages=messages_number,timeout=time_out)
if(len(messages)==0):
return{
"status":"fail",
"messsage":"topic empty or timeout exeeded"
}
elif(len(messages)<messages_number):
print(messages)
return{
"status":"fail",
"message":"topic does not contain this number of messages or timeout exeeded"
}
else:
return{
"status":"success",
"message":"request completed succesfully",
"data":messages_parser(messages)
}
except KeyboardInterrupt:
print('Canceled by user.')
the assignment_callback
and the messages_parser
are other methods that i built .
The problem is that it takes a long time (1-2 minutes) to return the answer. Is this normal behavior, or is there something wrong with the configuration or usage?
additional infomration: I am using kafka as a docker container for an experimental environment
i tried to adjust some parameters in the consumer configs, sometimes it went better but stil take too much time
Abd Elghani Meliani is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.