I facing a problem when trying to use Prefect Storage Block in Fastapi. The error is below when I trying to call load
or newly created save
Block :
File "/usr/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/usr/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Expecting value: line 1 column 1 (char 0)
Below are the code service.py
:
from prefect import get_client
from prefect_aws.s3 import S3Bucket
import importlib, traceback
from prefect.deployments.deployments import Deployment
class PrefectProvider:
def __init__(self):
self.prefect_client = get_client()
async def register_flow(self, flow_name: str, schedule):
try:
minio_storage = await S3Bucket.load("minio-s3")
flow_fun = importlib.import_module(f"flows.{flow_name}")
deployment = await Deployment.build_from_flow(
flow=getattr(flow_fun, flow_name),
flow_name=flow_name,
name=f"{flow_name}-deployment",
schedules=[schedule],
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
storage=minio_storage,
work_queue_name="minio-worker"
)
deployment_id = await deployment.apply()
return deployment_id
except Exception as e:
traceback.print_exc()
raise e
main.py
:
from fastapi import FastAPI, Depends
from service import PrefectProvider
import uvicorn
from prefect.client.schemas.schedules import CronSchedule
from module import prefect_provider_di
app = FastAPI()
@app.get("/")
async def root(provider: PrefectProvider = Depends(prefect_provider_di)):
await provider.register_flow("my_test_flow", CronSchedule(cron="* * * * *"))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
module.py
:
from service import PrefectProvider
def prefect_provider_di():
return PrefectProvider()
And my flows/my_test_flow.py
:
from prefect import flow
@flow
def my_test_flow():
print("Hello guys")
if __name__ == "__main__":
my_test_flow()
But when I trying call load without await
it complains coroutine 'Block.load' was never awaited
, but I works perfectly when I Run code in debug mode using VSCode.
I am using docker environment for this, the service are :
Prefect Server : prefecthq/prefect:2.20-python3.8
Prefect Worker : prefecthq/prefect:2.20-python3.8
with prefect-aws
library added
MinIO : minio/minio:RELEASE.2024-08-17T01-24-54Z
Edit :
I don’t understand it fixed now after restarting the PC
2