I only need to be notified if the data has changed. Can I do this with airflow?
About the problem:
I have a dag that updates warehouse acceptance cost data
@dag(
schedule='*/5 * * * *',
catchup=False,
default_args=default_args,
tags=['market', 'acceptance_rate']
)
def mp_acceptance_rate():
@task.external_python(python='/usr/local/airflow/venv_main/bin/python')
def get_mp_acceptance_rate():
from src.transform import MpTables
return MpTables().acceptance_rate()
@task.external_python(python='/usr/local/airflow/venv_main/bin/python')
def update_mp_acceptance_rate(data):
from src.databases.DatabaseWorker import DatabaseWorker
from src.databases.mp_schema import MpAcceptanceRate
db = DatabaseWorker()
with db.session() as session:
session.upsert(
MpAcceptanceRate,
data,
on_conflict='do_update',
deletable=True
)
_get_mp_acceptance_rate = get_mp_acceptance_rate()
_get_mp_acceptance_rate >> update_mp_acceptance_rate(_get_mp_acceptance_rate)
mp_acceptance_rate()
I also have a query that returns “suitable” data
SELECT * FROM mp.acceptance_rate mar
WHERE warehouse_id IN (
SELECT distinct id FROM mp.stock ms
LEFT JOIN mp.warehouses mw
ON ms.warehouse_name = mw.name
WHERE date = CURRENT_DATE
AND quantity > 0
)
AND coefficient != -1
AND coefficient <= 3
AND box_type_id in (2, 5)
ORDER BY date, warehouse_id
I want the query results to be sent only if the new data differs from the previously sent data. For example, for the first notification, all rows of the query response will be sent (since there is no previous data). Let’s say that the next request adds a row (or several rows) compared to the previous request. In this case, a notification should be sent, for example, “Delivery available for {delivery date} to {warehouse 1} with coefficient {coef}.” If a row is deleted, a notification should be sent that “delivery for {delivery date} to {warehouse 1} has changed coefficient or is unavailable.”