Based on the following answer to a similar question, I’m trying to create an airflow task_group
based of an output from a previous task.
More precisely, my first task uses the ExecuteSQLQueryOperator
which returns a list. And I want to use this list to map it to a task group. However I always end up with a TypeError: 'XComArg' object is not iterable
.
I don’t know how to write a reproducible example with the sql operator, but I can at least write some pseudo code:
from airflow.decorators import dag, task_group
from airflow.operators.dummy import DummyOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
@dag()
def my_dag():
get_list = SQLExecuteQueryOperator(
task_id='get_list',
conn_id=pg_conn_id,
database=db_name,
sql='select column from table',
)
@task_group()
def map_list_to_task(l: list):
return list(map(my_task, l))
def my_task(element):
# define and return a task
return DummyOperator()
map_list_to_task(get_list.output)
dag = my_dag()
By simply replacing get_list.output
by a true list
on the last line, it works. But I need to make it work with the result from the previous task get_list
. Any help would be much appreciated!