I am having a case where I want to share a string to a BigQueryInsertJobOperator and use it as a key to take a given configuration. I try to do it as is:
Inside dag definition I am using PythonOperator to push a variable to xcom:
def capture_mode(**context):
trigger = context["dag_run"].conf["message"].strip()
mode = "case_a" if (trigger == "trigger_casea") else "case_b"
task_instance = context["task_instance"]
task_instance.xcom_push(key="mode", value=mode)
return mode
capture_mode = PythonOperator(
task_id="capture_mode",
provide_context=True,
python_callable=capture_mode,
dag=dag,
)
Afterwards I try to pull the parameter mode to use it as a parameter key for a json configuration:
copy_scoring_data = BigQueryInsertJobOperator(
task_id="copy-table",
configuration={
"copy": {
"sourceTable": {
"projectId": copy_data_config["sourceTable"]["projectId"],
"datasetId": copy_data_config["sourceTable"]["datasetId"],
"tableId": copy_data_config["sourceTable"]["tableId"][
"{{task_instance.xcom_pull(task_ids='capture_mode', key='mode')}}"
],
},
"destinationTable": {
"projectId": copy_data_config["destinationTable"]["projectId"],
"datasetId": copy_data_config["destinationTable"]["datasetId"],
"tableId": copy_data_config["destinationTable"]["tableId"][
"{{task_instance.xcom_pull(task_ids='capture_mode', key='mode')}}"
],
},
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_TRUNCATE",
"useLegacySql": False,
"priority": "BATCH",
}
},
impersonation_chain=config["serviceAccountName"],
location=config["datasetsLocation"],
)
the json configuration looks like this:
"copy_scoring_data": {
"sourceTable": {
"projectId": "project_id",
"datasetId": "dataset_id",
"tableId": {
"case_a": "table_a",
"case_b": "table_b"
}
}
Here I am trying to pull the right value in order to get case_a or case_b configuration.:
{{task_instance.xcom_pull(task_ids='capture_mode', key='mode')}}
But I am getting a KeyError in the lines referencing the line above.
What am I doing wrong ?