I have a FastAPI App which interact with the backend trough some singleton class. The singleton object holds the values which should be displayed in the frontend.
Multiple processes work with this singleton object (called Database
) and have no problem. Changes are in the singleton object are visible between this processes. But my fastapi app is not getting any updates.
I can change values inside the singleton object from the app and this change has no effect on the other processes singleton object and visa verce.
App class:
#imports ...
db = Database() # singleton obbject
app = FastAPI()
# example route
@app.websocket("/stream/volume")
async def websocket_volume(websocket: WebSocket):
await websocket.accept()
try:
volume_old = None
while True:
volume = db.get_volume()
if volume_old != volume:
volume_old = volume
volume_data = json.dumps({"volume": volume})
await websocket.send_text(volume_data)
await asyncio.sleep(1) # Simulate data sent every second using asyncio compatible sleep
except Exception:
print("WebSocket disconnected")
await websocket.close()
Singleton class:
class Singleton:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if not cls._instance:
with cls._lock:
# another thread could have created the instance
# before we acquired the lock. So check that the
# instance is still nonexistent.
if not cls._instance:
cls._instance = super(Singleton, cls).__new__(cls)
cls._instance.__initialized = False
return cls._instance
class Database(Singleton):
def __init__(self):
if self._Singleton__initialized:
return
self._Singleton__initialized = True
self.lock = threading.Lock()
self.volume = 0
def replace_volume(self, value: int):
with self.lock:
self.volume = value
def get_volume(self):
with self.lock:
return self.volume