I’m trying to achieve max performance in the case I’m working on.
I’m parsing jsons and when the first three tasks:
- check_cm_files_present_task
- check_cmts_struct_files_present_task
- check_cmts_meas_files_present_task
detect any new file in either one of three GCS locations it should trigger one of the file moving tasks:
–
check_cm_files_present_task
- move_to_valid_cmts_struct
- move_to_valid_cmts_meas
and in parallel it should also trigger dataproc cluster creation task (to save time)
So far I am unable to achieve it.
Whataever task id I set in the return message or set trigger rules or rearrange dependencies the task: “create_cluster_task” won’t start.
Here is a piece of code I’m using:
<code>def check_cm_files_present():
regex = re.compile(r"(.*?).gz")
blobs = bucket.list_blobs(prefix=PREFIX)
jsons = [blob for blob in blobs if regex.match(blob.name)]
return ["move_to_valid_cm","create_cluster_task"]
with models.DAG(dag_id='job-json-to-csv',
schedule_interval="0,15,30,45 * * * *",
default_args=default_dag_args,
max_active_runs=1) as dag:
check_cm_files_present_task = BranchPythonOperator(
task_id='check_cm_files_present_task',
python_callable=check_cm_files_present,
check_cmts_struct_files_present_task = BranchPythonOperator(
task_id='check_cmts_struct_files_present_task',
python_callable=check_cmts_struct_files_present,
check_cmts_meas_files_present_task = BranchPythonOperator(
task_id='check_cmts_meas_files_present_task',
python_callable=check_cmts_meas_files_present,
exit_task = DummyOperator(
trigger_rule='one_success'
create_cluster_task = DummyOperator(
task_id='create_cluster_task',
trigger_rule='one_success'
check_cm_files_present_task >> move_to_valid_cm >> other_task1
check_cm_files_present_task >> create_cluster_task
check_cm_files_present_task >> exit_task
check_cmts_struct_files_present_task >> move_to_valid_cmts_struct >> other_task2
check_cmts_struct_files_present_task >> create_cluster_task
check_cmts_struct_files_present_task >> exit_task
check_cmts_meas_files_present_task >> move_to_valid_cmts_meas >> other_task3
check_cmts_meas_files_present_task >> create_cluster_task
check_cmts_meas_files_present_task >> exit_task
create_cluster_task >> create_dataproc_cluster
<code>def check_cm_files_present():
regex = re.compile(r"(.*?).gz")
blobs = bucket.list_blobs(prefix=PREFIX)
jsons = [blob for blob in blobs if regex.match(blob.name)]
if len(jsons) > 0:
return ["move_to_valid_cm","create_cluster_task"]
else:
return ["exit_task"]
with models.DAG(dag_id='job-json-to-csv',
schedule_interval="0,15,30,45 * * * *",
description='',
default_args=default_dag_args,
max_active_runs=1) as dag:
check_cm_files_present_task = BranchPythonOperator(
task_id='check_cm_files_present_task',
python_callable=check_cm_files_present,
)
check_cmts_struct_files_present_task = BranchPythonOperator(
task_id='check_cmts_struct_files_present_task',
python_callable=check_cmts_struct_files_present,
)
check_cmts_meas_files_present_task = BranchPythonOperator(
task_id='check_cmts_meas_files_present_task',
python_callable=check_cmts_meas_files_present,
)
exit_task = DummyOperator(
task_id='exit_task',
trigger_rule='one_success'
)
create_cluster_task = DummyOperator(
task_id='create_cluster_task',
trigger_rule='one_success'
)
check_cm_files_present_task >> move_to_valid_cm >> other_task1
check_cm_files_present_task >> create_cluster_task
check_cm_files_present_task >> exit_task
check_cmts_struct_files_present_task >> move_to_valid_cmts_struct >> other_task2
check_cmts_struct_files_present_task >> create_cluster_task
check_cmts_struct_files_present_task >> exit_task
check_cmts_meas_files_present_task >> move_to_valid_cmts_meas >> other_task3
check_cmts_meas_files_present_task >> create_cluster_task
check_cmts_meas_files_present_task >> exit_task
create_cluster_task >> create_dataproc_cluster
</code>
def check_cm_files_present():
regex = re.compile(r"(.*?).gz")
blobs = bucket.list_blobs(prefix=PREFIX)
jsons = [blob for blob in blobs if regex.match(blob.name)]
if len(jsons) > 0:
return ["move_to_valid_cm","create_cluster_task"]
else:
return ["exit_task"]
with models.DAG(dag_id='job-json-to-csv',
schedule_interval="0,15,30,45 * * * *",
description='',
default_args=default_dag_args,
max_active_runs=1) as dag:
check_cm_files_present_task = BranchPythonOperator(
task_id='check_cm_files_present_task',
python_callable=check_cm_files_present,
)
check_cmts_struct_files_present_task = BranchPythonOperator(
task_id='check_cmts_struct_files_present_task',
python_callable=check_cmts_struct_files_present,
)
check_cmts_meas_files_present_task = BranchPythonOperator(
task_id='check_cmts_meas_files_present_task',
python_callable=check_cmts_meas_files_present,
)
exit_task = DummyOperator(
task_id='exit_task',
trigger_rule='one_success'
)
create_cluster_task = DummyOperator(
task_id='create_cluster_task',
trigger_rule='one_success'
)
check_cm_files_present_task >> move_to_valid_cm >> other_task1
check_cm_files_present_task >> create_cluster_task
check_cm_files_present_task >> exit_task
check_cmts_struct_files_present_task >> move_to_valid_cmts_struct >> other_task2
check_cmts_struct_files_present_task >> create_cluster_task
check_cmts_struct_files_present_task >> exit_task
check_cmts_meas_files_present_task >> move_to_valid_cmts_meas >> other_task3
check_cmts_meas_files_present_task >> create_cluster_task
check_cmts_meas_files_present_task >> exit_task
create_cluster_task >> create_dataproc_cluster
And here is the log from the check_cm_files_present_task. It should follow the [“move_to_valid_cm”, “create_cluster_task”] but it never does. It always gets skipped.
Do you have any idea why is that happening?