I’m trying to execute the same query for a list of BQ tables as separate tasks using BigQueryInsertJobOperator
Below is my code
<code>column_lists = '{{ var.json.bq_alerts.tables_list }}'
custom_column_lists = column_lists.split(',')
with DAG(**dag_args)as dag:
create_cluster = DataprocCreateClusterOperator(
task_id="sf_create_cluster",
cluster_name="CLUSTER_NAME",
location=REGION,
delete_on_error=True,
use_if_exists=True,
retry=Retry(maximum=10.0),
timeout=1800,
gcp_conn_id=CONN_ID
)
dynamic_tasks = []
for table_name in custom_column_lists:
print(f"Table name is: {table_name}")
task_id = f"t_{table_name}"
load_bq = BigQueryInsertJobOperator(
task_id=task_id,
project_id=PROJECT_ID,
location="US",
configuration={
"query": {"query": f"select * from us_tables.{table_name}", "useLegacySql": False}
},
gcp_conn_id=CONN_ID_CLINIC
)
dynamic_tasks.append(load_bq)
create_cluster >> dynamic_tasks
</code>
<code>column_lists = '{{ var.json.bq_alerts.tables_list }}'
custom_column_lists = column_lists.split(',')
with DAG(**dag_args)as dag:
create_cluster = DataprocCreateClusterOperator(
task_id="sf_create_cluster",
cluster_name="CLUSTER_NAME",
location=REGION,
delete_on_error=True,
use_if_exists=True,
retry=Retry(maximum=10.0),
timeout=1800,
gcp_conn_id=CONN_ID
)
dynamic_tasks = []
for table_name in custom_column_lists:
print(f"Table name is: {table_name}")
task_id = f"t_{table_name}"
load_bq = BigQueryInsertJobOperator(
task_id=task_id,
project_id=PROJECT_ID,
location="US",
configuration={
"query": {"query": f"select * from us_tables.{table_name}", "useLegacySql": False}
},
gcp_conn_id=CONN_ID_CLINIC
)
dynamic_tasks.append(load_bq)
create_cluster >> dynamic_tasks
</code>
column_lists = '{{ var.json.bq_alerts.tables_list }}'
custom_column_lists = column_lists.split(',')
with DAG(**dag_args)as dag:
create_cluster = DataprocCreateClusterOperator(
task_id="sf_create_cluster",
cluster_name="CLUSTER_NAME",
location=REGION,
delete_on_error=True,
use_if_exists=True,
retry=Retry(maximum=10.0),
timeout=1800,
gcp_conn_id=CONN_ID
)
dynamic_tasks = []
for table_name in custom_column_lists:
print(f"Table name is: {table_name}")
task_id = f"t_{table_name}"
load_bq = BigQueryInsertJobOperator(
task_id=task_id,
project_id=PROJECT_ID,
location="US",
configuration={
"query": {"query": f"select * from us_tables.{table_name}", "useLegacySql": False}
},
gcp_conn_id=CONN_ID_CLINIC
)
dynamic_tasks.append(load_bq)
create_cluster >> dynamic_tasks
However, I’m getting airflow.exceptions.AirflowException: The key 't_{{ var.json.bq_alerts.tables_list }}' has to be made of alphanumeric characters, dashes, dots and underscores exclusively
The tables_list
has "table1,table2"
. Unable to understand why the task_id is unable to generate dynamically like t_table1, t_table2. Can someone please help me understand where I’m going wrong?