I have a case that one of my operators which is DataprocCreateClusterOperator never triggers as if “all_success” was still set for it. It runs fine if it’s the very first task but I don’t want to create a cluster each time the dag runs as it might not be needed. To my suprise putting upstreams on it and setting it as “one_success” doesn’t work.
I know that “one_success” rule works in general because “exit_task” has it set up and it always run without a problem.
Here is my code:
check_cm_files_present_task = BranchPythonOperator(
task_id='check_cm_files_present_task',
python_callable=check_cm_files_present,
)
check_cmts_struct_files_present_task = BranchPythonOperator(
task_id='check_cmts_struct_files_present_task',
python_callable=check_cmts_struct_files_present,
)
check_cmts_meas_files_present_task = BranchPythonOperator(
task_id='check_cmts_meas_files_present_task',
python_callable=check_cmts_meas_files_present,
)
move_to_valid_cm = BashOperator(
task_id='move_to_valid_cm',
bash_command='''gsutil -m mv gs://A gs://B,
dag = dag
)
move_to_valid_cmts_struct = BashOperator(
task_id='move_to_valid_cmts_struct',
bash_command='''gsutil -m mv gs://C gs://D,
dag = dag
)
move_to_valid_cmts_meas = BashOperator(
task_id='move_to_valid_cmts_meas',
bash_command='''gsutil -m mv gs://E gs://F,
dag = dag
)
create_dataproc_cluster = dataproc_operator.DataprocCreateClusterOperator(
task_id='create_dataproc_cluster',
cluster_name=CLUSTER_WITH_DATE,
project_id=PROJECT,
cluster_config=CLUSTER_CONFIG,
region='europe-west3',
trigger_rule='one_success'
)
exit_task = DummyOperator(
task_id='exit_task',
trigger_rule='one_success'
)
check_cm_files_present_task >> move_to_valid_cm >> run_dataproc_job_cm
check_cm_files_present_task >> exit_task
check_cmts_struct_files_present_task >> move_to_valid_cmts_struct >> run_dataproc_job_cmts_struct
check_cmts_struct_files_present_task >> exit_task
check_cmts_meas_files_present_task >> move_to_valid_cmts_meas >> run_dataproc_job_cmts_meas
check_cmts_meas_files_present_task >> exit_task
[move_to_valid_cm, move_to_valid_cmts_struct, move_to_valid_cmts_meas] >> create_dataproc_cluster >> [run_dataproc_job_cm, run_dataproc_job_cmts_struct, run_dataproc_job_cmts_meas]