My current DAG looks like:
def download_stage(dag_def: DailyDownloadDefinition, **additional_kwargs):
...
retries = 5
download = ShortCircuitOperator(
task_id=task_id,
python_callable=run_task_ignore_holidays,
op_args=(download_task, dag_def.parser_code,
dag_def.group_name, dag_def.parsing_day_delta),
op_kwargs={"send_task_kwargs": {"queue": QueuesNames.daily_pass1.value}},
execution_timeout=dag_def.download_timeout,
retries=retries,
retry_delay=dag_def.download_retry_delay,
on_retry_callback=on_attempted_download_failure_callback, <--- IMPORTANT!
**additional_kwargs
)
return download
def on_attempted_download_failure_callback(context):
"""
The callback will issue slack messages and phone calls if current time
is beyond a tolerance threshold while downloads are still failing.
"""
tolerance = get_downloads_tolerance()
# The DAG starts running one schedule interval after its execution date
# so the timestamp to compare to is the next execution date if we want a
# threshold of N hours since the DAG started running.
now_utc = pendulum.now("UTC")
if now_utc - context['next_execution_date'] > timedelta(hours=tolerance):
issue_alerts(context)
The above alerts when a download first fails. However I am trying to get it to alert on the last retry failure.
Is this possible?