I am trying to create an API that can fetch logs from a docker container and returns them as an event-stream.
I use python as language together with the fastapi and docker package.
Whenever I try to test my endpoint with postman I only get the connection response but no additional responses of data. But I receive the logs as console output in python.
The endpoint:
@app.get("/servers/{server_id}/logs")
async def get_logs(server_id: str):
print("Endpoint /logs reached")
return StreamingResponse(get_server_logs(server_id), media_type="text/event-stream")
The log_handling:
def get_server_logs(server_id: str):
try:
container = client.containers.get(server_id)
for log in container.logs(stream=True, follow=True, tail=10):
cleaned_log = log.decode("utf-8").strip()
print(cleaned_log)
yield f'data: {cleaned_log}nn'
except DockerException as e:
yield handle_docker_exception(e)
except Exception as e:
yield f'data: An error occurred: {str(e)}nn'
The Docker SDK docs state that the container.logs call returns a blocking generator which, I think, is the cause of my API not processing any other request while the event-stream is open. (This also includes posting the logs as a response)
So I got it working on the log posting part but it does not closes the connection properly and also doesn’t handle other requests parallel in Postman. (Don’t know if that is a limitation of Postman)
The endpoint:
@app.get("/servers/{server_id}/logs")
async def get_logs(server_id: str):
print("Endpoint /logs reached")
return StreamingResponse(get_server_logs(server_id), media_type="text/event-stream")
Async generator:
async def get_server_logs(server_id: str, request: Request):
try:
container = client.containers.get(server_id)
async for log in async_logs_generator(container, follow=True, tail=10):
cleaned_log = log.decode("utf-8").strip()
print(cleaned_log)
yield f'data: {cleaned_log}nn'
except DockerException as e:
yield handle_docker_exception(e)
except Exception as e:
yield f'data: An error occurred: {str(e)}nn'
Parallel thread:
# Runs the blocking container.logs() call in a seperate thread
def run_blocking_logs_in_thread(container, **kwargs):
queue = Queue()
def worker():
try:
for log in container.logs(stream=True, **kwargs):
queue.put(log)
except Exception as e:
queue.put(e)
finally:
queue.put(None) # Signal the end of the logs
thread = threading.Thread(target=worker)
thread.start()
while True:
item = queue.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
thread.join()