I have an airflow job which launches a Spark job in one task and next task extract the application logs to find the spark job appplication Id . I am using Xcom push in the spark submit task and then xcom_pull in the next task
def extract_app_id(**kwargs):
ti = kwargs['ti']
log = ti.xcom_pull(task_ids='submit_spark_job')
log_str = str(log)
logger.info("Xcom pulled str log %s",log_str)
app_id = re.search(r'application_d+_d+', str(log))
spark_submit_task = SparkSubmitOperator(
name=f"{job_name}",
task_id="submit_spark_job",
conn_id="spark3",
driver_memory="{{ spark_conf.get('driver_memory', '8g') }}",
executor_cores="{{ spark_conf.get('executor_cores', 4) }}",
executor_memory="{{ spark_conf.get('executor_memory', '30g') }}",
keytab=environment.get_runtime_setting("af_keytab_path"),
principal=environment.get_runtime_setting("af_principal"),
conf=conf,
java_class="Application",
application=f"{jar_path}{jar_file}",
do_xcom_push=True,
application_args=application_args,
execution_timeout=timedelta(minutes=5) # Set your desired timeout here
)
extract_app_id_task = PythonOperator(
task_id='extract_app_id',
python_callable=extract_app_id,
provide_context=True,
trigger_rule=TriggerRule.ALL_DONE
)
spark_submit_task >> extract_app_id_task
The issue is the the xcom_pull is always returinng null even though the spark job is successfully launched.I use Python 3 and Airflow 2.0.0