I have an Airflow DAG that runs a BashOperator task. When it fails I get an email with not much detail:
Try 1 out of 1
Exception:
Bash command failed. The command returned a non-zero exit code 1.
Log: Link
Host: 2db56ea2ab34
Mark success: Link
The error details that would tell me why my task failed is in the task log I see when I click the log link. How can I make Airflow attach that log so that the people getting the failure email don’t have to click through? I am still on Airflow 2.6.1 but could upgrade if needed as part of troubleshooting this.
What I tried
I tried writing my own versions of a failure_callback()
function and was able to get it sending an email, but it couldn’t successfully attach the logs.
import os
import tempfile
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.email import send_email
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.log_reader import TaskLogReader
from datetime import datetime
def failure_callback(context):
task_instance = context['task_instance']
log_url = task_instance.log_url
exception = context.get('exception')
# Fetch the log content using TaskLogReader
try:
log_reader = TaskLogReader()
log_content, _ = log_reader.read_log_chunks(task_instance, try_number=task_instance.try_number, metadata={})
log_content = ''.join([chunk['message'] for chunk in log_content[0]])
except Exception as e:
log_content = f"Could not fetch log content: {e}"
subject = f"Airflow Task Failure: {task_instance.task_id}"
html_content = f"""
Task: {task_instance.task_id}<br>
DAG: {task_instance.dag_id}<br>
Execution Time: {context['logical_date']}<br>
Log URL: <a href="{log_url}">{log_url}</a><br>
Exception: {exception}<br>
"""
# Write log content to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.log') as temp_log_file:
temp_log_file.write(log_content.encode('utf-8'))
temp_log_file_path = temp_log_file.name
# Send email with log content as attachment
send_email(
to=default_args['email'],
subject=subject,
html_content=html_content,
files=[temp_log_file_path] # Attach the log file
)
os.remove(temp_log_file_path)
I added on_failure_callback=failure_callback
to my DAG definition.
The result
An email is sent with an attachment, but the attached just says:
Could not fetch log content: tuple indices must be integers or slices, not str
And then when I examine the task logs in the Airflow UI it says:
[2024-08-08, 14:41:09 EDT] {file_task_handler.py:522} ERROR - Could not read served logs
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1407, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1558, in _execute_task_with_callbacks
result = self._execute_task(context, task_orig)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1628, in _execute_task
result = execute_callable(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/bash.py", line 210, in execute
raise AirflowException(
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 1.