I am going to implement requests with sessions in my application, but I have read some threads concerning issues with thread and process safety, but those arent exactlty recent.
My application run 800+ processes, so I am a little worried that data recovered from APIs get messed up.
I come about this model of implementation, so was wondering if this is a corret way of working with request sessions in a multiprocess program.
import multiprocessing
import time
import sys
import requests
from requests.packages.urllib3.util.retry import Retry
session = None
def initialize_session():
global session
if session is None:
session = requests.Session()
retry_strategy = Retry(
total=3,
status_forcelist=[429,500,502,503,504],
allowed_methods=["POST"],
backoff_factor=1
)
adapter = requests.adapters.HTTPAdapter( max_retries=retry_strategy,
pool_connections=1, pool_maxsize=1)
session.mount("https://", adapter)
def worker(args):
global session
initialize_session()
try:
for i in range(10):
print("I am process "+str(j)+" and my cookie is ")
print(session.cookies.get_dict())
session.cookies.set('worker', j)
time.sleep(5) #do some api work, function calls, etc
except:
raise
processes = []
for j in range(0, 4):
p = multiprocessing.Process(target=worker, args=(j,))
processes.append(p)
time.sleep(0.1)
p.start()
for p in processes:
p.join()
0
The use of a session instance as per the OP is almost certainly not safe cross-platform. e.g., it won’t work on MacOS. If your “API work” involves POSTing to some API then you might be better off using multithreading. Having said that, I’m not sure if requests.Session is thread-safe. If it isn’t, you’ll need to implement some kind of lock to protect it.
Here’s an example of how you could do this with multithreading:
from threading import Thread, Lock
import requests
from requests.adapters import HTTPAdapter, Retry
from functools import cache
NTHREADS = 5
URL = "http://localhost:8000/post"
@cache
def getSession():
session = requests.Session()
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
return session
def process(lock):
with lock:
response = getSession().post(URL, json={"FOO": "BAR"})
response.raise_for_status()
print(response.json()["data"])
def main():
lock = Lock()
threads = []
for _ in range(NTHREADS):
thread = Thread(target=process, args=(lock,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
1