I have a DAG that looks like below, that gets executed via Cloud Composer. Its purpose is to extract data from an Oracle database. I try to have parametrized approach to the operator to avoid code duplication. The query is loaded from a GCS bucket and then I would expect that the parameters from it to be replace at runtime as specified in the operator.
with DAG(
dag_id='my_dag_id',
default_args=default_args
) as dag:
def get_data_from_oracle(my_table, **context):
my_hook = GCSHook()
query = my_hook.download_as_byte_array(
bucket_name = 'my_bucket',
object_name = 'my_sql.sql'
).decode()
my_operator = OracleToGCSOperator(
task_id = f'extract_data_from_{my_table}',
oracle_conn_id = my_oracle_connection,
sql = f'{query}',
bucket = 'my_bucket',
filename = 'my_file.parquet',
parameters = {
'table_name': f'{my_table}'
},
export_format='PARQUET'
)
operator.execute(context)
move_data_for_x_table = PythonOperator(
task_id = 'move_data_for_x_table',
python_callable = get_data_from_oracle,
op_kwargs = {
my_table = 'x'
}
)
.sql file looks like this:
SELECT * FROM {{ table_name }}
When executed, the variable in the query is not replaced and Oracle throws ORA-00911: invalid character.