Hello so I am using redis to allow the vertical scaling of the websockets but I am not sure that my two endpoints are using the right logic because first I started by using the memory locally but then I decided to change the in-memory of redis
here’s the code :
@router.post("/send_message", response_model=MessageResponse)
async def send_message(message: MessageCreate, db: Session = Depends(get_db)):
print("#########")
# Verify that the sender exists
sender = db.query(User).filter(User.id == message.sender_id).first()
if not sender:
raise HTTPException(status_code=404, detail="Sender not found")
# Verify that the conversation exists
conversation = db.query(Conversation).filter(Conversation.id == message.conversation_id).first()
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
# Create the message
db_message = Message(
content=message.content,
sender_id=message.sender_id,
conversation_id=message.conversation_id,
content_type=message.content_type,
status=message.status
)
db.add(db_message)
db.commit()
print(f"db :{db_message}")
db.refresh(db_message)
# Broadcast the message using Redis
formatted_message = {
"content": db_message.content,
"sender_id": db_message.sender_id,
"conversation_id": db_message.conversation_id,
"created_at": db_message.created_at.isoformat()
}
await broadcast.publish(channel=f"conversation_{message.conversation_id}", message=json.dumps(formatted_message))
return db_message
@router.websocket("/ws/{conversation_id}/{client_id}")
async def websocket_endpoint(websocket: WebSocket, conversation_id: int, client_id: int, db: Session = Depends(get_db)):
await manager.connect(conversation_id, websocket)
print("manager connected")
try:
async with anyio.create_task_group() as task_group:
async def receiver() -> None:
try:
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
# Create a MessageCreate instance from the received data
message_create = MessageCreate(
content=message_data['content'],
sender_id=message_data['sender_id'],
conversation_id=message_data['conversation_id'],
content_type=message_data['content_type'],
status=True
)
await send_message(message_create, db=db)
print(f"Publishing to Redis channel conversation_{conversation_id}: {message_create.content}")
except WebSocketDisconnect:
print(f"Client {client_id} disconnected from conversation {conversation_id}")
manager.disconnect(conversation_id, websocket)
task_group.cancel_scope.cancel()
async def sender() -> None:
async with broadcast.subscribe(f"conversation_{conversation_id}") as subscriber:
async for event in subscriber:
await websocket.send_text(event.message)
print(f"message: {event.message}")
task_group.start_soon(receiver)
task_group.start_soon(sender)
except WebSocketDisconnect:
print(f"Client {client_id} disconnected from conversation {conversation_id}")
manager.disconnect(conversation_id, websocket)
can you tell me if this is correct or not
because when I try to send a message it works but the print message is being duplicated so I assume there is a confusion in my process
message: {“content”: “bonjour”, “sender_id”: 2, “conversation_id”: 1, “created_at”: “2024-08-02T12:55:52”}
message: {“content”: “bonjour”, “sender_id”: 2, “conversation_id”: 1, “created_at”: “2024-08-02T12:55:52”}
programmer_1 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.