I am trying to use the multiprocessing map function to execute a function over my list. However in this function I do some API calls which can not be executed at the same time since it will overload the API. However my code is spread out over multiple files since it is quite long.
I have tried to follow to follow this solution however it does not work for me. I suspect it does not work because I am working with more than one file. The specific error I get is: NameError: name ‘lock’ is not defined.
Hereby a simplyfication of my code:
main.py
import file2
def evaluate_entry_of_list(entry):
response = file2.get_data(entry)
# some calculations
return results
def init_pool(given_lock):
global lock
lock = given_lock
if __name__ == '__main__':
list = apicaller.get_list()
t_lock = Lock()
with Pool(8, initializer=init_pool, initargs=(t_lock,)) as pool:
results = pool.map(evaluate_entry_of_list, list)
process_results(results)
file2.py
def make_call(url, body)-> requests.Response:
lock.acquire()
# Make API call
lock.release()
return response
Other solutions I tried: defining a variable in main and import to file2 and use separate class to create static variable and import in file2.
4
The simple solution is to place the pool initializer function in the same module as the worker function:
file2.py
def init_pool(given_lock):
global lock
lock = given_lock
def make_call(url, body)-> requests.Response:
lock.acquire()
# Make API call
lock.release()
return response
file1.py
...
if __name__ == '__main__':
list = apicaller.get_list()
t_lock = Lock()
with Pool(8, initializer=file2.init_pool, initargs=(t_lock,)) as pool:
results = pool.map(evaluate_entry_of_list, list)
process_results(results)
1
There are naturally many ways to solve this type of problem, but I would suggest creating your own “pool” using Process
objects instead. Things like Lock
Queue
, etc.. can only be passed at process creation (which is why in a pool
they must be passed to the initializer). If you control the process creation it is a bit easier to pass these objects around.
Here’s a demo of what that might look like (just to get an idea of how simple you can go)
from multiprocessing import Process, Lock, Queue, cpu_count
from api import make_call, task_list
def worker(lock, in_queue, out_queue):
for task in iter(in_queue.get, None):
with lock:
out_queue.put(make_call(task))
if __name__ == "__main__":
api_lock = Lock()
in_queue = Queue()
out_queue = Queue()
workers = [Process(target=worker, args=(api_lock, in_queue, out_queue)) for _ in range(cpu_count())]
for worker in workers: worker.start()
for task in task_list: in_queue.put(task)
results = [out_queue.get() for _ in task_list]
for _ in workers: in_queue.put(None) # worker shutdown sentinel
for worker in workers: worker.join()