New to python multiprocessing .
I have a task that involves hitting a webservice some million times and save response in a file (separate file for each request).
I have got the high-level working code but little confuse on couple of things.
-
What is the difference between following two syntaxes ?
pool = Pool(processes=4) pool.starmap(task, listOfInputParametersTuple)
and
with Pool(processes=4) as pool:
pool.map(task, listOfInputParametersTuple)
-
Is there a way to avoid reading the entire input file before starting the multiprocessing Pool? Basically, read each line and immediately spawn a pool task.
with open (list_of_ids, 'r') as infile: for id in infile: listOfInputParametersTuple.append( tuple ((id, queue, requestBodyTemplate)) pool.starmap(task, listOfInputParametersTuple)
-
In the task, I use a fairly large requestBodyTemplate and passed id to create a JSON request. This requestBodyTemplate variable is duplicated in each element of the input tuple. Is there a way to pass it to the
task
function outside of input-tuple. -
How do one ensure that all spawned tasks are finished before exiting the main program?
-
Any pointers on timing out the task if it is taking too long so to release the pool ?
Feel free to share any other suggestions you may have.
Thanks
Objective :
- Read a list of 1 million IDs from a file
- For each id, craft a JSON.
- Request a webservice with above created JSON. (takes few seconds to get response)
- Save output of response to a directory.
- Save status to a unified “status” file
Task –
def task(i,queue):
# get the current process
process = current_process()
# generate some work
s = random.randint(1, 10)
# block to simulate work
print (f"TASK function : {process} - sleep for {s} sec")
sleep(s)
data = f"{process} - sleep {s} sec - {i} - {queue}"
print (f"TASK function: {data}")
# put it on the queue
queue.put(data)
Main method –
def main ():
set_start_method('spawn')
pool = Pool(processes=4)
with open (list_of_ids, 'r') as infile:
for line in infile:
listOfInputParametersTuple.append( tuple ((line, queue , ))
requestBodyTemplate = getRequestBodyTemplateJSON()
# set the fork start method
# create the manager
with Manager() as manager:
# create the shared queue
queue = manager.Queue()
Process(target=listener, args=(queue,)).start()
print ("back in main after starting listener ")
# execute the tasks_i in parallel # use starmap to have multiple params
pool.starmap(task, listOfInputParametersTuple)
# pool.starmap(task, zip ( args_i, itertools.repeat(queue)))
pool.close()
pool.join()
# wait for all tasks to get over
sleep(10)
print ("n Sending None message to queue ")
queue.put(None)