I’m just starting out with multiprocessing in Python.
I use multiprocessing.queues
in my program that I pass from process to process.
The problem is that when I use myprocess.join()
, it only terminates when the queue I’m writing to is empty.
So I’m wondering what the reason is, why it’s built that way and how to get around it. I can empty all my queues in my main process, but I’d like it better.
Bug part :
What I explained to you is the bahaviour for queues with more than 100 elements.
In my example if you put a list of initial data of 100 : in range(100)
the p4.join()
will be called as expected, but if you put 1000 or more it stops working as I explained. So I’m a little bit lost.
Thanks,
Here a small example to show you.
import random
import multiprocessing
import time
def process1(data, queue1):
for item in data:
queue1.put(item)
queue1.put(None)
def process2(queue1, queue2):
while True:
item = queue1.get()
if item is None:
queue2.put(None)
break
queue2.put(item * 2)
def process3(queue2, queue3):
while True:
item = queue2.get()
if item is None:
queue3.put(None)
break
queue3.put(item * 2)
def process4(queue3, queue4):
while True:
item = queue3.get()
if item is None:
queue4.put(None)
print("None received")
break
queue4.put(item * 2)
print("End process4")
if __name__ == "__main__":
start_time = time.time()
data = [random.randint(1, 10000) for _ in range(10000)]
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue()
queue3 = multiprocessing.Queue()
queue4 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=process1, args=(data, queue1))
p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))
p1.start()
p2.start()
p3.start()
p4.start()
print("Proc started")
p1.join()
print("p1 join")
p2.join()
print("p2 join")
p3.join()
print("p3 join")
p4.join()
print("p4 join")
results = []
print("While loop")
while True:
item = queue4.get()
if item is None:
break
results.append(item)
end_time = time.time()
print(end_time - start_time)
The Gum machine is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.