I’m trying to create a distributed semaphore using Redis to use in my Django application. This is to limit concurrent requests to an API. I’m using asyncio in redis-py. However, I want to create a connection pool to share across requests since I was getting a “Max clients reached” error. Thus, I created a shared connection pool in settings.py
which I use in my semaphore class. However, I then get an error got Future <Future pending> attached to a different loop
when I make concurrent requests. This is my code:
import os
import uuid
import asyncio
import time
from typing import Any
import random
from django.conf import settings
from redis import asyncio as aioredis
STARTING_BACKOFF_S = 4
MAX_BACKOFF_S = 16
class SemaphoreTimeoutError(Exception):
"""Exception raised when a semaphore acquisition times out."""
def __init__(self, message: str) -> None:
super().__init__(message)
class RedisSemaphore:
def __init__(
self,
key: str,
max_locks: int,
timeout: int = 30,
wait_timeout: int = 30,
) -> None:
"""
Initialize the RedisSemaphore.
:param redis_url: URL of the Redis server.
:param key: Redis key for the semaphore.
:param max_locks: Maximum number of concurrent locks.
:param timeout: How long until the lock should automatically be timed out in seconds.
:param wait_timeout: How long to wait before aborting attempting to acquire a lock.
"""
self.redis_url = os.environ["REDIS_URL"]
self.key = key
self.max_locks = max_locks
self.timeout = timeout
self.wait_timeout = wait_timeout
self.redis = aioredis.Redis(connection_pool=settings.REDIS_POOL)
self.identifier = "Only identifier"
async def acquire(self) -> str:
"""
Acquire a lock from the semaphore.
:raises SemaphoreTimeoutError: If the semaphore acquisition times out.
:return: The identifier for the acquired semaphore.
"""
czset = f"{self.key}:owner"
ctr = f"{self.key}:counter"
identifier = str(uuid.uuid4())
now = time.time()
start_time = now
backoff = STARTING_BACKOFF_S
while True:
# TODO: Redundant?
if time.time() - start_time > self.wait_timeout:
raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
async with self.redis.pipeline(transaction=True) as pipe:
pipe.zremrangebyscore(self.key, "-inf", now - self.timeout)
pipe.zinterstore(czset, {czset: 1, self.key: 0})
pipe.incr(ctr)
counter = (await pipe.execute())[-1]
pipe.zadd(self.key, {identifier: now})
pipe.zadd(czset, {identifier: counter})
pipe.zrank(czset, identifier)
rank = (await pipe.execute())[-1]
print(rank)
if rank < self.max_locks:
return identifier
pipe.zrem(self.key, identifier)
pipe.zrem(czset, identifier)
await pipe.execute()
# Exponential backoff with randomness
sleep_time = backoff * (1 + random.random() * 0.3)
if (sleep_time + time.time() - start_time) > self.wait_timeout:
raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
await asyncio.sleep(sleep_time)
backoff = min(backoff * 2, MAX_BACKOFF_S)
async def release(self, identifier: str) -> bool:
"""
Release a lock from the semaphore.
:param identifier: The identifier for the lock to be released.
:return: True if the semaphore was properly released, False if it had timed out.
"""
czset = f"{self.key}:owner"
async with self.redis.pipeline(transaction=True) as pipe:
pipe.zrem(self.key, identifier)
pipe.zrem(czset, identifier)
result = await pipe.execute()
return result[0] > 0
class RedisSemaphoreContext:
def __init__(self, semaphore: RedisSemaphore) -> None:
"""
Initialize the RedisSemaphoreContext.
:param semaphore: An instance of RedisSemaphore.
"""
self.semaphore = semaphore
self.identifier = None
async def __aenter__(self) -> "RedisSemaphoreContext":
"""Enter the async context manager."""
self.identifier = await self.semaphore.acquire()
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the async context manager."""
await self.semaphore.release(self.identifier)
which I then use in my adrf async views.
What am I doing wrong? Is this possible?