We are developing an Azure Functions ASGI application using the following documentation:
https://learn.microsoft.com/en-us/samples/azure-samples/fastapi-on-azure-functions/fastapi-on-azure-functions/
We want to connect routes from the ASGI application with a storage queue by adding an output binding.
Here is what we are trying to do:
# function_app.py
import azure.functions as func
from src.main import app as fastapi
app = func.AsgiFunctionApp(app=fastapi, http_auth_level=func.AuthLevel.FUNCTION)
@fastapi.post('/test')
@app.queue_output(...) # "queue_output" not offered for AsgiFunctionApp
async def receive_post_and_forward_to_queue(payload, msg: func.Out[str]):
msg.set(payload) # put payload into storage queue
Is it correct that output bindings are not supported for ASGI applications and – if so – is there another way to connect to a storage queue by using only standard methods from Azure functions? (we don’t want to use additional SDK).
One idea we came up with was to use the blueprint concept and create an additional (“standard”) Azure function with an output binding and call this function from our FastAPI routes.
# blueprint.py
import azure.functions as func
bp01 = func.Blueprint()
@bp01.function_name(name="QueueOutputFunction")
@bp01.queue_output(arg_name="output",
queue_name="test-queue",
connection="Storage")
def send_message_to_queue(payload: str, output: func.Out[str]):
msg.set(payload)
# function_app.py
import azure.functions as func
from blueprint import send_message_to_queue
from src.main import app as fastapi
app = func.AsgiFunctionApp(app=fastapi, http_auth_level=func.AuthLevel.FUNCTION)
@fastapi.post("/test")
async def receive_post_and_forward_to_queue(payload):
send_message_to_queue(payload)
app.register_blueprint(bp01)
With this approach we had following issues:
- You can not create an Azure function without a trigger. It must have exactly one trigger.
- There is no reasonable choice for a standard trigger that we could use for the blueprint function, so we tried to implement a “generic trigger”. Generic triggers are not documented for Python v2 model – does anyone have a working example for this?