My logic app flow is shown below in the image.
In my Azure logic app, I am calling azure function, http_python_sp_exec_sf, (Execute SP Call) in a for each loop. The for each will run for 10 SPs. Functionality of the azure func, http_python_sp_exec_sf, is that it connects to snowflake and then submits a query to snowflake to call a SP, the SP name is passed as input to azure function along with other parameters. The azure function returns submitted query id after submitted query to snowflake, and I keep on polling on query id status (using http_python_sf_query_fetchall). If status is returned as success then the loop exists and get the executed statement returned statement. This is done to avoid the timeout error and azure function can execute within time limits. I have added a delay of 30 secs in polling before checking the query status on snowflake. In end, I append the newly generated table (as a result of the SP) to some other table (using http_python_sf_query_fetchall). When I ran for 4,5 SPs, the for each works fine but when I executed 10 SPs, the azure function returns timeouts randomly for different iterations. Questions below:
- I dnt understand why there is timeout error. All I am doing in the azure function is to submit the query and return. Is there a parallel snowflake connection issue? By default I have permission to run 16 parallel SF connections in parallel.
- Do I need to have a delay between opening the connections with Snowflake? I tried putting a delay of 30 sec after every snowflake query but didn’t help.
P.S. I am connecting to snowflake from azure logic app via azure function (as snowflake connector in azure logic app was not working as we are using public/private key authentication).
Below is the azure function code.
@app.route(route=”http_python_sp_exec_sf”)
def http_python_sp_exec_sf(req: func.HttpRequest) -> func.HttpResponse:
logging.info(‘Python HTTP trigger function processed a request. – http_python_sp_exec_sf’)
token = req.params.get('token') # secret
sp_name = req.params.get('sp_name')
start_date = req.params.get('start_date')
end_date = req.params.get('end_date')
if not token:
try:
req_body = req.get_json()
except Exception as e:
return func.HttpResponse(str(e))
else:
try:
token = req_body.get('token')
sp_name = req_body.get('sp_name')
start_date = req_body.get('start_date')
end_date = req_body.get('end_date')
cur,conn = snowflake_connection_public_private_key()
logging.info('connection Established with snowflake - http_python_sp_exec_sf')
query_tab1 ="select col1 from test1"
query_tab2="select col2 from test1"
sf_call_query = "CALL sc."+"SP_"+sp_name+"_SP2"+"('"+sp_name+"_tmp, '"+query_tab1+"','"+query_tab2+"','"+start_date+"','"+end_date+"');"
response_sf_call_query = cur.execute(sf_call_query)
logging.info('Query submitted to snowflake')
query_id = cur.sfqid
logging.info('Returning Query ID')
cur.close()
conn.close()
logging.info('Snowflake Connection Closed')
ret_json = {"SP_Name":sp_name, "query_id":query_id}
return func.HttpResponse(json.dumps(ret_json), mimetype="application/json")
except Exception as e:
return func.HttpResponse(str(e))
@app.route(route="http_python_sf_query_fetchall", auth_level=func.AuthLevel.ANONYMOUS)
def http_python_sf_query_fetchall(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request - http_python_sf_query_fetchall')
query = req.params.get('query')
if not query:
try:
req_body = req.get_json()
except ValueError:
pass
else:
try:
query = req_body.get('query')
cur, conn = snowflake_connection_public_private_key()
logging.info('connection Established with snowflake - http_python_sf_query_fetchall')
ret = cur.execute(query).fetchall()
except Exception as e:
return func.HttpResponse(str(e))
cur.close()
conn.close()
ret_json = {"response_query":ret[0][0],"response_all":ret}
return func.HttpResponse(json.dumps(ret_json), mimetype="application/json")
if query:
return func.HttpResponse(f"Hello from http_python_sf_query, {query}. This HTTP triggered function executed successfully.")
else:
return func.HttpResponse(
"This HTTP triggered function http_python_sf_query executed successfully. Pass a name in the query string or in the request body for a personalized response.",
status_code=200
)