I got this code in my DAG for Airflow:
<code>import pandas as pd
import datetime
import io
import httpx
from airflow.decorators import dag, task
from airflow.models import Variable
from clickhouse_driver import Client
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
from airflow.models import Connection
from airflow.utils.db import create_session
default_args = {
'owner': 'd-chernovol',
'depends_on_past': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2024, 6, 20)
}
schedule_interval = '*/20 * * * *'
host = Variable.get('host')
database_name = Variable.get('database_name')
user_name = Variable.get('user_name')
password_for_db = Variable.get('password_for_db')
bot_token = Variable.get('bot_token')
chat_id = Variable.get('chat_id')
connections_name = 'clickhouse_default'
try:
# create Connection
clickhouse_conn = Connection(
conn_id=connections_name,
conn_type='ClickHouse',
host=host,
schema=database_name,
login=user_name,
password=password_for_db,
port=8443
)
# create connection
with create_session() as session:
session.add(clickhouse_conn)
session.commit()
# make ClickHouseHook
ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)
except:
ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=3)
def test_dag_4():
@task
def get_table_from_db(sql_query):
result = ch_hook.execute(sql_query)
return result
@task
def send_msg(bot_token: str, chat_id: str, message: str):
# send_message
url = f'https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}'
client = httpx.Client()
return client.post(url)
get_table_from_db_task >> send_msg_task
st_dag = test_dag_4()
</code>
<code>import pandas as pd
import datetime
import io
import httpx
from airflow.decorators import dag, task
from airflow.models import Variable
from clickhouse_driver import Client
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
from airflow.models import Connection
from airflow.utils.db import create_session
default_args = {
'owner': 'd-chernovol',
'depends_on_past': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2024, 6, 20)
}
schedule_interval = '*/20 * * * *'
host = Variable.get('host')
database_name = Variable.get('database_name')
user_name = Variable.get('user_name')
password_for_db = Variable.get('password_for_db')
bot_token = Variable.get('bot_token')
chat_id = Variable.get('chat_id')
connections_name = 'clickhouse_default'
try:
# create Connection
clickhouse_conn = Connection(
conn_id=connections_name,
conn_type='ClickHouse',
host=host,
schema=database_name,
login=user_name,
password=password_for_db,
port=8443
)
# create connection
with create_session() as session:
session.add(clickhouse_conn)
session.commit()
# make ClickHouseHook
ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)
except:
ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=3)
def test_dag_4():
@task
def get_table_from_db(sql_query):
result = ch_hook.execute(sql_query)
return result
@task
def send_msg(bot_token: str, chat_id: str, message: str):
# send_message
url = f'https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}'
client = httpx.Client()
return client.post(url)
get_table_from_db_task >> send_msg_task
st_dag = test_dag_4()
</code>
import pandas as pd
import datetime
import io
import httpx
from airflow.decorators import dag, task
from airflow.models import Variable
from clickhouse_driver import Client
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
from airflow.models import Connection
from airflow.utils.db import create_session
default_args = {
'owner': 'd-chernovol',
'depends_on_past': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2024, 6, 20)
}
schedule_interval = '*/20 * * * *'
host = Variable.get('host')
database_name = Variable.get('database_name')
user_name = Variable.get('user_name')
password_for_db = Variable.get('password_for_db')
bot_token = Variable.get('bot_token')
chat_id = Variable.get('chat_id')
connections_name = 'clickhouse_default'
try:
# create Connection
clickhouse_conn = Connection(
conn_id=connections_name,
conn_type='ClickHouse',
host=host,
schema=database_name,
login=user_name,
password=password_for_db,
port=8443
)
# create connection
with create_session() as session:
session.add(clickhouse_conn)
session.commit()
# make ClickHouseHook
ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)
except:
ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=3)
def test_dag_4():
@task
def get_table_from_db(sql_query):
result = ch_hook.execute(sql_query)
return result
@task
def send_msg(bot_token: str, chat_id: str, message: str):
# send_message
url = f'https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}'
client = httpx.Client()
return client.post(url)
get_table_from_db_task >> send_msg_task
st_dag = test_dag_4()
My code is working, but get_table_from_db(sql_query) returns:
<code>class 'airflow.models.xcom_arg.PlainXComArg'>
</code>
<code>class 'airflow.models.xcom_arg.PlainXComArg'>
</code>
class 'airflow.models.xcom_arg.PlainXComArg'>
If I convert result to string I get list with values from table:
<code>[[111, 'name_1'], [222, 'name_2'], [222, 'name_3']]
</code>
<code>[[111, 'name_1'], [222, 'name_2'], [222, 'name_3']]
</code>
[[111, 'name_1'], [222, 'name_2'], [222, 'name_3']]
But I need to get table from my database as pandas.Dataframe.
How I can achieve it? Or I should write function for this?
The main problem, that I get only values without column names.