I’m working on a Airflow DAG where I want to process a list of files sequentially but allow multiple files to be processed in parallel. I created the following DAG, but I encounter a TypeError when running it:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
def fetch_files():
return [f"file{i}" for i in range(1, 10)]
def extract_content(file):
print(f"Extracting content from {file}")
return f"content_of_{file}"
def transform_content(content):
print(f"Transforming content: {content}")
return f"transformed_{content}"
def generate_embeddings(content):
print(f"Generating embeddings for {content}")
return f"embeddings_for_{content}"
def load_into_db(embedding):
print(f"Loading {embedding} into the database")
pass
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='fetch_and_process',
default_args=default_args,
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
fetch_task = PythonOperator(
task_id='fetch_files',
python_callable=fetch_files
)
def process_file(file):
with TaskGroup(group_id=f'process_{file}') as group:
extract_task = PythonOperator(
task_id="extract",
python_callable=extract_content,
op_args=[file]
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform_content,
op_args=[extract_task.output]
)
embeddings_task = PythonOperator(
task_id="generate_embeddings",
python_callable=generate_embeddings,
op_args=[transform_task.output]
)
load_task = PythonOperator(
task_id="load",
python_callable=load_into_db,
op_args=[embeddings_task.output]
)
extract_task >> transform_task >> embeddings_task >> load_task
return group
process_tasks = PythonOperator.partial(
task_id="process_file",
python_callable=process_file
).expand(op_args=fetch_task.output)
fetch_task >> process_tasks
What changes should I make to ensure that only one file at a time is passed to the process_file group task?
TypeError: process_file() takes 1 positional argument but 5 were given