I would like to stop the execution of a process and FASTAPI with Ctrl+C in Python. But I recieving error KeyboardInterrupt exceptions, how to properly stop threads and FAST API
from fastapi import FastAPI
import threading
import time
import signal
from datetime import datetime, timedelta
from model import core
from model.database import engine, SessionLocal, get_db
from model.core import Timestamp
from routers.timestamps import router as timestamps_router
core.Base.metadata.create_all(bind=engine)
app = FastAPI()
app.include_router(router=timestamps_router)
def record_timestamp():
while True:
db = SessionLocal()
new_timestamp = Timestamp(timestamp=datetime.utcnow())
db.add(new_timestamp)
db.commit()
db.close()
time.sleep(5)
def delete_old_timestamps():
while True:
db = SessionLocal()
one_minute_ago = datetime.utcnow() - timedelta(minutes=1)
db.query(Timestamp).filter(Timestamp.timestamp < one_minute_ago).delete()
db.commit()
db.close()
time.sleep(10)
@app.on_event("startup")
def startup_event():
get_db()
t1 = threading.Thread(target=record_timestamp)
t2 = threading.Thread(target=delete_old_timestamps)
t1.start()
t2.start()
t1.join()
t2.join()
i triend to wite a signal with ChatGPT but also had failed
New contributor
Valentine Saint is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.