I’m trying to automate a data validation query for all SAP HANA calculate views..
Hence trying to develop a Airflow script to connect SAP Hana and run HANA queries and load the output into some audit table.
I tried with the dbapi module and saphana hook. I’m not able to get the exact module to use.
Can anyone suggest the approach to connect SAP HANA thru Airflow MWAA?.
def record_count_check():
try:
df_src_data_func_df = kwargs['ti'].xcom_pull(task_ids='extract_data_entries')
dwh_hook = SnowflakeHook(snowflake_conn_id=Variable.get("SNOWFLAKE_CONN_ID"), warehouse=Variable.get("SNOWFLAKE_WAREHOUSE"), database=dq_app_db, role=Variable.get("SNOWFLAKE_ROLE"), schema=dq_schema)
con = dwh_hook.get_conn()
cursor_sf = con.cursor()
#hana_hook = SAPHanaHook(sap_hana_conn_id==Variable.get("HANA_CONN_ID"))
#con_hana = hana_hook.get_conn()
#cursor_hana = con_hana.cursor()
conn_prod = dbapi.connect(address='10.169.111.11', port=30015,user='abc', password='zxererer')
cursor_prod = conn_prod.cursor()
##source_data_frame()
##for index in df_src_data_func_df.index:
query = f"select count(*) FROM table"
result = hana_hook.get_first(query)
return result[0] if result else 0
except Exception as e:
print("An error occurred:", str(e))
raise e
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2024, 6, 6),
'catchup':False,
'email': ['[email protected]'],
'retries': 0,
'email_on_failure': True,
'retry_delay': dt.timedelta(minutes=5),
}
with DAG('sap_hana_check',
default_args=default_args,
schedule_interval="0 0 * * *"
) as dag:
start = DummyOperator(
task_id='start',
dag=dag
)
t1 = PythonOperator(
task_id="data_count_chk",
python_callable=record_count_check,
#raise_airflow_exception=True,
provide_context=True,
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag
)
start>>t1>>end