I am using confluent-kafka to process messages from kafka topic.
Here is the code template:
<code>from multiprocessing import Process, Manager
def handle_records(records):
# extracted all details for each record. Hidden here.
# I am joining this properly
def foo(shared_dict, partition_num):
consumer = create_consumer()
consumer.assign([TopicPartition(topic, partition_num)]
# here I am consuming 2 messages each based on some calculation.
records = consumer.consume(num_messages=2, timeout=1)
total_partitions = 8 # Calculated using kafka method. Directly written here
for partition_num in range(total_partitions):
p = Process(target=foo, args=(shared_dict, partition_num), deamon=False)
total_processes.append(p)
for p in total_processes:
if __name__ == "__main__":
shared_dict = manager.dict()
# Added some key-value pairs to shared_dict
<code>from multiprocessing import Process, Manager
def handle_records(records):
for record in records:
# extracted all details for each record. Hidden here.
process = Process(
target=process_msg,
args=(
partition,
offset,
headers_dict,
msg_key,
msg,
shared_dict,
),
)
p.start()
# I am joining this properly
def foo(shared_dict, partition_num):
consumer = create_consumer()
consumer.assign([TopicPartition(topic, partition_num)]
while True:
# here I am consuming 2 messages each based on some calculation.
records = consumer.consume(num_messages=2, timeout=1)
if not records:
continue
handle_records(records)
def main(shared_dict):
total_partitions = 8 # Calculated using kafka method. Directly written here
total_processes = []
for partition_num in range(total_partitions):
p = Process(target=foo, args=(shared_dict, partition_num), deamon=False)
p.start()
total_processes.append(p)
for p in total_processes:
p.join()
if __name__ == "__main__":
manager = Manager()
shared_dict = manager.dict()
# Added some key-value pairs to shared_dict
main(shared_dict)
</code>
from multiprocessing import Process, Manager
def handle_records(records):
for record in records:
# extracted all details for each record. Hidden here.
process = Process(
target=process_msg,
args=(
partition,
offset,
headers_dict,
msg_key,
msg,
shared_dict,
),
)
p.start()
# I am joining this properly
def foo(shared_dict, partition_num):
consumer = create_consumer()
consumer.assign([TopicPartition(topic, partition_num)]
while True:
# here I am consuming 2 messages each based on some calculation.
records = consumer.consume(num_messages=2, timeout=1)
if not records:
continue
handle_records(records)
def main(shared_dict):
total_partitions = 8 # Calculated using kafka method. Directly written here
total_processes = []
for partition_num in range(total_partitions):
p = Process(target=foo, args=(shared_dict, partition_num), deamon=False)
p.start()
total_processes.append(p)
for p in total_processes:
p.join()
if __name__ == "__main__":
manager = Manager()
shared_dict = manager.dict()
# Added some key-value pairs to shared_dict
main(shared_dict)
Here I just gave the template code. The error I am facing is I am spawning 8 processes because of 8 partitions. And each process will spawn 2 processes because of number of records it consumed. My code was running fine for initial 30/40 minutes. Later I am facing Broken Pipe error when I start new process in handle_records.
Note: In handle_records method I am ‘joining’ the processes which got spawned after processing my message. And again I will spawn new process when I fetch/consumer new records from topic. It is an iterative process. I was able to do this for some records later I got Broken Pipe Error.
If my consumer doesn’t get any messages for some 1hr I will close the consumer and join the processes which spawned in main function.
<code>2024-07-17 10:34:41, 480, Process-2, handle_records, Exception raised in handle_records: [Errno 32] Broken pipe
Traceback (most recent call last) :
File "kafka_main.py", line 2380, in handle_records
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/process.py", line 121, in start
self. popen = self. Popen (self)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
return default_context.get_context () . Process ._ Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 284, in Popen
return Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in init
super (). _init_(process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_fork.py", line 16, in init
util. flush_std_streams ()
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/util.py", line 439, in _flush_std_streams
BrokenPipeError: [Errno 32] Broken pipe
<code>2024-07-17 10:34:41, 480, Process-2, handle_records, Exception raised in handle_records: [Errno 32] Broken pipe
Traceback (most recent call last) :
File "kafka_main.py", line 2380, in handle_records
process.start ()
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/process.py", line 121, in start
self. popen = self. Popen (self)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
return default_context.get_context () . Process ._ Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 284, in Popen
return Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in init
super (). _init_(process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_fork.py", line 16, in init
util. flush_std_streams ()
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/util.py", line 439, in _flush_std_streams
sys.stderr. flush ()
BrokenPipeError: [Errno 32] Broken pipe
</code>
2024-07-17 10:34:41, 480, Process-2, handle_records, Exception raised in handle_records: [Errno 32] Broken pipe
Traceback (most recent call last) :
File "kafka_main.py", line 2380, in handle_records
process.start ()
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/process.py", line 121, in start
self. popen = self. Popen (self)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
return default_context.get_context () . Process ._ Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/context.py", line 284, in Popen
return Popen (process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in init
super (). _init_(process_obj)
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/popen_fork.py", line 16, in init
util. flush_std_streams ()
File "/dpap/public/data/PythonProgramFiles/lib/python3.9/multiprocessing/util.py", line 439, in _flush_std_streams
sys.stderr. flush ()
BrokenPipeError: [Errno 32] Broken pipe