My goal is to pass a string of a file path from one @task into Emailoperators , so I can apply logic with the dataset that I will read from the file path to build up my operators that will send emails. My code looks like this:
from airflow import DAG
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import datetime
import polars as pl
@dag(
dag_id="data_to_taskgroup_dag",
start_date=days_ago(2),
schedule_interval=None,
)
def data_to_taskgroup_dag():
@task
def produce_file_path():
file_path = "path/to/your/dataframe.csv"
return file_path
@task
def process_dataframe(file_path):
df = pl.read_csv(file_path)
for _, row in df.iter_rows():
email_subject = f"Email for {row['column_name']}"
email_body = f"Email body: {row['another_column']}"
EmailOperator(
task_id=f"send_email_{row['column_name']}",
to='[email protected]',
subject=email_subject,
html_content=email_body
).execute(context=kwargs)
file_path = produce_file_path()
process_dataframe(file_path)
the problem of this approach is that I am nesting emailoperators into process_dataframe task, and so I cannot really see the emailops in the UI, the nesting is mainly done to be able to get the xcom from upstream task, if i dont decorate process_dataframe and keep as plain function this line will fail df = pl.read_csv(file_path)
because now file_path is not a string but a plainxcomarg. What is the approach/practice for this kind of situation ?