I am running Airflow version 2.7.2, and I constantly see the issue in my trigger log.
<code>Triggerer's async thread was blocked for 0.50 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.
</code>
<code>Triggerer's async thread was blocked for 0.50 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.
</code>
Triggerer's async thread was blocked for 0.50 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.
My trigger run function is this one, it is the implementation of the BaseTrigger
<code>async def run(self):
while not await self.check_if_job_finished():
await asyncio.sleep(60)
# Send our single event and then we're done
self.finished = True
yield TriggerEvent(self.job_id)
</code>
<code>async def run(self):
while not await self.check_if_job_finished():
await asyncio.sleep(60)
# Send our single event and then we're done
self.finished = True
yield TriggerEvent(self.job_id)
</code>
async def run(self):
while not await self.check_if_job_finished():
await asyncio.sleep(60)
# Send our single event and then we're done
self.finished = True
yield TriggerEvent(self.job_id)
This is the check if job finish function, and I use it to send Http request.
<code>async def check_if_job_finished(self) -> bool:
self.log.info("Check if job finished.")
endpoint = 'abc.com'
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
response_json = await async_http_util.async_get(url=endpoint + "/api/runtime/" + self.job_id,
headers=headers, verify_ssl=False)
if response_json['code'] == '0000':
status = response_json['result']['status']
if status == 'SUCCESS' or status == 'FAILED':
return True
else:
return False
else:
raise AirflowException("job status failed, response is: {}".format(response_json))
</code>
<code>async def check_if_job_finished(self) -> bool:
self.log.info("Check if job finished.")
endpoint = 'abc.com'
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
response_json = await async_http_util.async_get(url=endpoint + "/api/runtime/" + self.job_id,
headers=headers, verify_ssl=False)
if response_json['code'] == '0000':
status = response_json['result']['status']
if status == 'SUCCESS' or status == 'FAILED':
return True
else:
return False
else:
raise AirflowException("job status failed, response is: {}".format(response_json))
</code>
async def check_if_job_finished(self) -> bool:
self.log.info("Check if job finished.")
endpoint = 'abc.com'
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
response_json = await async_http_util.async_get(url=endpoint + "/api/runtime/" + self.job_id,
headers=headers, verify_ssl=False)
if response_json['code'] == '0000':
status = response_json['result']['status']
if status == 'SUCCESS' or status == 'FAILED':
return True
else:
return False
else:
raise AirflowException("job status failed, response is: {}".format(response_json))
For sending http request, I use aiohttp.
<code>import asyncio
import logging
from aiohttp import ClientSession
from airflow import AirflowException
log = logging.getLogger(__name__)
async def async_get(url, retry_times=10, retry_wait_seconds=10, **kwargs):
"""
Send an asynchronous HTTP GET request with retry logic.
:param url: Request URL
:param retry_times: Maximum number of retry attempts
:param retry_wait_seconds: Retry interval (seconds)
:param kwargs: Additional parameters
:return: Response.json()
"""
for i in range(retry_times):
try:
async with ClientSession() as session:
async with session.get(url, **kwargs) as response:
if response.status == 200:
log.info(f"Request successful with {response.status}, text: {await response.text()}")
return await response.json()
else:
raise Exception(f"Http code is {response.status}, text: {response.text()} need to retry.")
except Exception as e:
log.error(f"Request failed with {e}, retrying in {retry_wait_seconds} seconds...")
if i < retry_times - 1:
await asyncio.sleep(retry_wait_seconds)
else:
raise AirflowException(f"All retries failed. {e}")
</code>
<code>import asyncio
import logging
from aiohttp import ClientSession
from airflow import AirflowException
log = logging.getLogger(__name__)
async def async_get(url, retry_times=10, retry_wait_seconds=10, **kwargs):
"""
Send an asynchronous HTTP GET request with retry logic.
:param url: Request URL
:param retry_times: Maximum number of retry attempts
:param retry_wait_seconds: Retry interval (seconds)
:param kwargs: Additional parameters
:return: Response.json()
"""
for i in range(retry_times):
try:
async with ClientSession() as session:
async with session.get(url, **kwargs) as response:
if response.status == 200:
log.info(f"Request successful with {response.status}, text: {await response.text()}")
return await response.json()
else:
raise Exception(f"Http code is {response.status}, text: {response.text()} need to retry.")
except Exception as e:
log.error(f"Request failed with {e}, retrying in {retry_wait_seconds} seconds...")
if i < retry_times - 1:
await asyncio.sleep(retry_wait_seconds)
else:
raise AirflowException(f"All retries failed. {e}")
</code>
import asyncio
import logging
from aiohttp import ClientSession
from airflow import AirflowException
log = logging.getLogger(__name__)
async def async_get(url, retry_times=10, retry_wait_seconds=10, **kwargs):
"""
Send an asynchronous HTTP GET request with retry logic.
:param url: Request URL
:param retry_times: Maximum number of retry attempts
:param retry_wait_seconds: Retry interval (seconds)
:param kwargs: Additional parameters
:return: Response.json()
"""
for i in range(retry_times):
try:
async with ClientSession() as session:
async with session.get(url, **kwargs) as response:
if response.status == 200:
log.info(f"Request successful with {response.status}, text: {await response.text()}")
return await response.json()
else:
raise Exception(f"Http code is {response.status}, text: {response.text()} need to retry.")
except Exception as e:
log.error(f"Request failed with {e}, retrying in {retry_wait_seconds} seconds...")
if i < retry_times - 1:
await asyncio.sleep(retry_wait_seconds)
else:
raise AirflowException(f"All retries failed. {e}")
I have tried to increase the resources in my pod, but it doesn’t help.