I have a task group that reads tables from schema and chunks it.
Another task group is responsible for doing a few operations on a group of tables. Airflow is trying to create the groups before knowing(checkTablesExist executes on startup and the following fails because context is not available ) how many tables exist and it is causing problems.
Our code is like this
<code>@task_group(group_id="spark-group")
def mapper(tbl_name, **kwargs):
@task(retry_delay=datetime.timedelta(seconds=10), trigger_rule=TriggerRule.ALL_DONE)
def task_1(table_name):
return input
@task(retry_delay=datetime.timedelta(seconds=10), trigger_rule=TriggerRule.ALL_DONE)
def task_2(table_name):
return input
update_tbl_name_task = task_1(tbl_name)
task_1(tbl_name) >> task_2(tbl_name)
dag = DAG()
with dag:
@task_group(group_id='group1', prefix_group_id=False, dag=dag)
def read_schema_task(**kwargs):
get_tablenames_task = PythonOperator( task_id="read_tablenames", python_callable=getschema )
@task
def create_table_chunks_task(**kwargs):
# Chunking logic
return json.dumps(data)
get_tablenames_task >> create_table_chunks_task()
@task_group(group_id='tableOpGroup', prefix_group_id= False, dag= dag)
def run_operations_ontables(**kwargs):
def checkTablesExist():
list_of_tables = get_current_context()["ti"].xcom_pull(task_ids='create_table_chunks_task')
list_of_tables_json = json.loads(list_of_tables)
for index, table_name in enumerate(list_of_tables_json):
logger.info(f"working on index:{index}, table_name:{table_name}")
run_schema_scan.expand(mapper.expand(tbl_name=table_name))
checkTablesExist()
read_schema_task() >> run_operations_ontables()
</code>
<code>@task_group(group_id="spark-group")
def mapper(tbl_name, **kwargs):
@task(retry_delay=datetime.timedelta(seconds=10), trigger_rule=TriggerRule.ALL_DONE)
def task_1(table_name):
return input
@task(retry_delay=datetime.timedelta(seconds=10), trigger_rule=TriggerRule.ALL_DONE)
def task_2(table_name):
return input
update_tbl_name_task = task_1(tbl_name)
task_1(tbl_name) >> task_2(tbl_name)
dag = DAG()
with dag:
@task_group(group_id='group1', prefix_group_id=False, dag=dag)
def read_schema_task(**kwargs):
get_tablenames_task = PythonOperator( task_id="read_tablenames", python_callable=getschema )
@task
def create_table_chunks_task(**kwargs):
# Chunking logic
return json.dumps(data)
get_tablenames_task >> create_table_chunks_task()
@task_group(group_id='tableOpGroup', prefix_group_id= False, dag= dag)
def run_operations_ontables(**kwargs):
def checkTablesExist():
list_of_tables = get_current_context()["ti"].xcom_pull(task_ids='create_table_chunks_task')
list_of_tables_json = json.loads(list_of_tables)
for index, table_name in enumerate(list_of_tables_json):
logger.info(f"working on index:{index}, table_name:{table_name}")
run_schema_scan.expand(mapper.expand(tbl_name=table_name))
checkTablesExist()
read_schema_task() >> run_operations_ontables()
</code>
@task_group(group_id="spark-group")
def mapper(tbl_name, **kwargs):
@task(retry_delay=datetime.timedelta(seconds=10), trigger_rule=TriggerRule.ALL_DONE)
def task_1(table_name):
return input
@task(retry_delay=datetime.timedelta(seconds=10), trigger_rule=TriggerRule.ALL_DONE)
def task_2(table_name):
return input
update_tbl_name_task = task_1(tbl_name)
task_1(tbl_name) >> task_2(tbl_name)
dag = DAG()
with dag:
@task_group(group_id='group1', prefix_group_id=False, dag=dag)
def read_schema_task(**kwargs):
get_tablenames_task = PythonOperator( task_id="read_tablenames", python_callable=getschema )
@task
def create_table_chunks_task(**kwargs):
# Chunking logic
return json.dumps(data)
get_tablenames_task >> create_table_chunks_task()
@task_group(group_id='tableOpGroup', prefix_group_id= False, dag= dag)
def run_operations_ontables(**kwargs):
def checkTablesExist():
list_of_tables = get_current_context()["ti"].xcom_pull(task_ids='create_table_chunks_task')
list_of_tables_json = json.loads(list_of_tables)
for index, table_name in enumerate(list_of_tables_json):
logger.info(f"working on index:{index}, table_name:{table_name}")
run_schema_scan.expand(mapper.expand(tbl_name=table_name))
checkTablesExist()
read_schema_task() >> run_operations_ontables()
New contributor
pokerman is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.