I am trying to connect MSSQL and Airflow.While trying to establish connection i got the following issue :[Driver Manager]Data source name not found and no default driver specified (0) (SQLDriverConnect). You can find my code here and screenshot of the error.
import pendulum
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta
import pyodbc
local_tz = pendulum.timezone("Asia/Kuala_Lumpur")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 3, 26),
'email_on_failure': False,
'email_on_retry': False
#'retries': 1,
#'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'1.1.initial_onboard_customer',
default_args=default_args,
description='Transfer data from onboard.public.customer to report.public.onboard_customer',
schedule_interval=None, # You can specify your desired schedule interval here
catchup=False,
tags = ['Initial Data Load', 'UAT']
)
def transfer_data():
try:
# Get connection details from Airflow's connection pool
source_conn = BaseHook.get_connection('UAT_DB')
# dest_conn = BaseHook.get_connection('UAT_report_db_connection_string')
# Connect to the source database
source_conn_str = f"dbname={source_conn.schema} user={source_conn.login} password={source_conn.password} host={source_conn.host} port={source_conn.port}"
print(source_conn_str)
source_conn = pyodbc.connect(source_conn_str)
print(source_conn)
# Connect to the destination database
# dest_conn_str = f"dbname={dest_conn.schema} user={dest_conn.login} password={dest_conn.password} host={dest_conn.host} port={dest_conn.port}"
# dest_conn = psycopg2.connect(dest_conn_str)
with source_conn.cursor() as source_cur:
# Get customer details from the source database
query = "Select TOP 10 * from [dbo].[Banks];"
result = source_cur.execute(query)
print(result)
except Exception as e:
print("Error occurred while transferring data:", e)
raise
# Define the PythonOperator to execute the transfer_data function
transfer_data_task = PythonOperator(
task_id='1.1.initial_onboard_customer',
python_callable=transfer_data,
dag=dag
)
[![enter image description here][1]][1]
[1]: https://i.sstatic.net/f5jiB176.jpg
Do anyone faced similar issue ?