We recently upgraded to Airflow 2.6.3. After this upgrade our Airflow tasks didn’t complete after dataflow job completion. We are using DataflowCreateJavaJobOperator. Our pipeline has pipeline.run().waitUntilFinish()
setup.
return DataflowCreateJavaJobOperator(
task_id=task_id,
jar=jar,
dataflow_default_options=dataflow_default_options,
options=options,
gcp_conn_id=gcp_conn_id,
dag=dag
)
Airflow task logs shows dataflow job completed successfully but tasks not marked as success in ver 2.6.3
[2024-06-11 21:15:03.710199+00:00] {beam.py:114} INFO - [INFO] 2024-06-11T21:15:01.570Z: Finished operation Write to BigQuery/BatchLoads/SinglePartitionWriteTables/GroupByKey/Read+Write to BigQuery/BatchLoads/SinglePartitionWriteTables/GroupByKey/GroupByWindow+Write to BigQuery/BatchLoads/SinglePartitionWriteTables/Values/Values/Map+Write to BigQuery/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)
[2024-06-11 21:15:03.710300+00:00] {beam.py:114} INFO - [DEBUG] 2024-06-11T21:15:01.614Z: Executing success step success228
[2024-06-11 21:15:03.710485+00:00] {beam.py:114} INFO - [INFO] 2024-06-11T21:15:01.687Z: Cleaning up.
[2024-06-11 21:15:03.710550+00:00] {beam.py:114} INFO - [DEBUG] 2024-06-11T21:15:01.731Z: Starting worker pool teardown.
[2024-06-11 21:15:03.710798+00:00] {beam.py:114} INFO - [INFO] 2024-06-11T21:15:01.779Z: Stopping worker pool...
[2024-06-11 21:28:31.805338+00:00] {beam.py:114} INFO - [INFO] 2024-06-11T21:28:30.290Z: Autoscaling: Resized worker pool from 400 to 0.
[2024-06-11 21:28:31.805415+00:00] {beam.py:114} INFO - [INFO] 2024-06-11T21:28:30.336Z: Worker pool stopped.
[2024-06-11 21:28:31.805546+00:00] {beam.py:114} INFO - [DEBUG] 2024-06-11T21:28:30.364Z: Tearing down pending resources...
[2024-06-11 21:28:41.479723+00:00] {beam.py:114} INFO - [INFO] Job 2024-06-11_14_00_12-11268310877205063496 finished with status DONE.
[2024-06-11 21:28:41.488950+00:00] {beam.py:114} INFO - [DEBUG] Closing org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@73700b80, started on Tue Jun 11 20:59:33 UTC 2024
[2024-06-11 21:28:41.491944+00:00] {beam.py:114} INFO - [DEBUG] Unregistering JMX-exposed beans on shutdown
[2024-06-11 21:28:41.493080+00:00] {beam.py:114} INFO - [INFO] Shutting down ExecutorService 'applicationTaskExecutor'
Airflow task logs shows dataflow job completed successfully and tasks marked as success in ver 2.4.
[2024-06-11, 06:27:53 UTC] {beam.py:132} INFO - [INFO] Shutting down ExecutorService 'applicationTaskExecutor'
[2024-06-11, 06:27:53 UTC] {beam.py:132} WARNING - WARNING: An illegal reflective access operation has occurred
[2024-06-11, 06:27:53 UTC] {beam.py:132} WARNING - WARNING: Illegal reflective access by org.apache.catalina.loader.WebappClassLoaderBase (file:/tmp/tmpowl7u86qSOQ_SdcOutboundFork.jar) to field java.io.ObjectStreamClass$Caches.localDescs
[2024-06-11, 06:27:53 UTC] {beam.py:132} WARNING - WARNING: Please consider reporting this to the maintainers of org.apache.catalina.loader.WebappClassLoaderBase
[2024-06-11, 06:27:53 UTC] {beam.py:132} WARNING - WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
[2024-06-11, 06:27:53 UTC] {beam.py:132} WARNING - WARNING: All illegal access operations will be denied in a future release
[2024-06-11, 06:27:53 UTC] {beam.py:155} INFO - Process exited with return code: 0
[2024-06-11, 06:27:53 UTC] {dataflow.py:425} INFO - Start waiting for done.
[2024-06-11, 06:27:53 UTC] {dataflow.py:385} INFO - Google Cloud DataFlow job trigger-dataflow-db456216e is state: JOB_STATE_DONE
[2024-06-11, 06:27:53 UTC] {taskinstance.py:1402} INFO - Marking task as SUCCESS. dag_id=test, task_id=task1, execution_date=20240610T060000, start_date=20240611T060751, end_date=20240611T062753
[2024-06-11, 06:27:54 UTC] {local_task_job.py:159} INFO - Task exited with return code 0
[2024-06-11, 06:27:54 UTC] {taskinstance.py:2626} INFO - 1 downstream tasks scheduled from follow-on schedule check
We tried to use BeamRunJavaPipelineOperator with below options but still same problem(after changing pipeline.run().waitUntilFinish()
to pipeline.run()
return BeamRunJavaPipelineOperator(
task_id=task_id,
runner='DataflowRunner',
jar=jar,
default_pipeline_options=dataflow_default_options,
pipeline_options=options,
gcp_conn_id=gcp_conn_id,
dataflow_config=DataflowConfiguration(job_name="{{task.task_id}}",wait_until_finished=True),
deferrable=deferrable,
dag=dag
)
Sam is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.