I have a Flask application with a kafka consumer receiving messages. I want to put these messages into a queue and have a background thread move them into a sqlite database depending on minor logic. Looking into Celery it seems like I’ll need Redis as a broker and set up all that in my application. I just feel like this is a lot. I just need the kafka task to push the message to a list, then have a different thread/gevent/multiprocess(?) read it and stick in the database
def kafka_read(consumer):
while True:
message = consumer.poll(0.1)
if message is not None:
if condition:
some_q_data_structure.push(message)
else:
some_other_q_data_structure.push(message)
def q_process1():
while True:
message = some_q_data_structure.pop
# logic
dbase.add(message)
def q_process2():
while True:
message = some_other_q_data_structure.pop
#some other logic
dbase.add(message)
All this runs on gunicorn running 1 worker. I spawn gevents and yield between them to accomplish the various tasks. I looked into celery and while I can accomplish this thru’ Celery it seems overkill since I’ll have to bring Redis into the fold as well. I’m not very familiar with multiprocessing/threading in python so any advice helps.
Looked at Celery and other task queues here – https://www.fullstackpython.com/task-queues.html
Looked at https://docs.python.org/3/library/multiprocessing.html