I’m looking for a thread-safe implementation of a Semaphore I can use in Python.
The standard libraries asyncio.Semaphore isn’t thread-safe.
The standard libraries threading.Semaphore doesn’t have awaitable
interface.
I am using sanic which has multiple threads (workers) but also an asynchronous loop on each thread. I want to be able to yield execution back to the event loop on each of the workers whenever it encounters a blocked semaphore, while it waits.
UPDATE: I meant to say process here, not threads. So these should be that Sanic splits across processes, and multiprocessing.Semaphore. I believe the answer given is still relevant to where I can apply a similar solution.
1
If you are saying that you need a single semaphore instance to be used across all the threads, then create a threading.Semaphore
instance and am async function acquire_semaphore
such as the following:
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
semaphore = threading.Semaphore()
async def acquire_semaphore():
"""Make a threading.Semaphore awaitable."""
def acquirer():
semaphore.acquire()
loop = asyncio.get_running_loop()
await loop.run_in_executor(executor, acquirer)
async def test():
"""Acquire and release a semaphore sharable across threads."""
await acquire_semaphore()
print('acquired')
await asyncio.sleep(1)
semaphore.release() # This never blocks
print('released')
def worker():
asyncio.run(test())
def main():
threads = [
threading.Thread(target=worker)
for _ in range(3)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == '__main__':
main()
Prints:
acquired
released
acquired
released
acquired
released
Try aiologic.Semaphore
(I’m the creator of aiologic):
import asyncio
from concurrent.futures import ThreadPoolExecutor
from aiologic import Semaphore
semaphore = Semaphore()
async def work():
async with semaphore:
print("acquired")
await asyncio.sleep(1)
print("released")
with ThreadPoolExecutor(3) as executor:
for _ in range(3):
executor.submit(asyncio.run, work())
Prints:
acquired
released
acquired
released
acquired
released
Unlike Booboo’s solution, it doesn’t use an executor, so it doesn’t have cancellation issues.