I have code of my dag like below:
imports....
START_DATE = datetime(2024, 4, 21)
SAP_COPY_DATASET = "mydataset"
SCHEDULE_INTERVAL = "0 8 * * *"
TAGS = ["SAP_ERP"]
DAG_ID = "sap_erp_import__other_tables"
default_args = {
some args...
}
DEPENDS_ON_ONE = {
"0ART_SALES_ATTR",
"0VEN_COMPC_ATTR",
"0VEN_PURORG_ATTR",
"0VENDOR_TEXT",
"AUFK",
"AUSP",
"BSAK",
"EKBE",
"EKKN",
"EKKO_Archived",
"KONDN",
"KOTN201",
"MCHB",
"YBIF_MSG_LOMMLOG",
"YYLOMMLOGD",
"ZBIF_SHOP_OV",
"ZKBC_CDHDRPOS_01",
"ZWM_PUT_STOCK_CA",
"T006A",
"ZBW_CAT_FILTER",
"CABN",
"CAWN",
"INOB",
"LINP",
"MEAN",
"RMA_VBAP",
"T001L",
"T042Z",
"T300T",
"TVAKT",
"VBAK",
"WAKH_WAKP",
"XAP_IDLVH",
"XAP_IDLVR",
"YIMR_ADVICE_I",
"YIMR_PACK_LOG",
"YIMR_SERNR_STATA",
"YIMR_SERNR_STAT",
"YIMR_SNSTAT_CT",
"YIMR_WCS_LOG",
"YIMR_WCS_MSG_LOG",
"YRMA_CT_SERVCMT",
"YWG_CALENDAR",
"YWG_GIFT_ITEM",
"YWG_SET",
"YWG_SET_ITEM",
"YWG_SET_PRICE",
"YWG_VAR_PARAM",
"ZLO_EXT_PUP",
"ZVBAP_SERVICES",
"YWA_VAR_LABEL",
"KONBBYPRQ",
"KONMATGRPP",
"ZLO_IMP_TASHA",
"YIMR_INVWM",
"ZLO_DEL_CAN_LOGH",
"ZLO_DEL_CAN_LOGI",
"ZFI_CANC_AD_TEXT",
"ZFI_CANC_ADVICE",
"0COSTCENTER_TEXT",
"0GL_ACCOUNT_TEXT",
"0PROFIT_CTR_TEXT"
}
DEPENDS_ON_MANY = [
({"VBAK_TEST_DATA_NON_UE", "VBKD"}, "VBKD"),
({"T002", "T002T"}, "T002")
]
with DAG(
dag_params...
) as dag:
dummy_end = DummyOperator(task_id="end")
# one-to-one models
for model in sorted(DEPENDS_ON_ONE):
sensor = BQTable.from_str(f"{SAP_COPY_DATASET}.{model}").get_observed_sensor(
date="{{ data_interval_end | ds }}",
mut_types=[MutationType.EDIT],
obj_types=[ObjectType.TABLES],
env=BigQueryObserverEnvironment.PROD,
dag=dag
)
mart = DBTRunOperator(
task_id=model,
dag_filepath=__file__,
models=[f"+{model}"],
profile=const.PROFILE,
target=const.ENV,
)
sensor >> mart >> dummy_end
# many to one models
for sources, model in sorted(DEPENDS_ON_MANY, key=lambda x: x[1]):
mart = DBTRunOperator(
task_id=model,
dag_filepath=__file__,
models=[f"+{model}"],
profile=const.PROFILE,
target=const.ENV,
)
for source in sorted(sources):
sensor = BQTable.from_str(f"{SAP_COPY_DATASET}.{source}").get_observed_sensor(
date="{{ data_interval_end | ds }}",
mut_types=[MutationType.EDIT],
obj_types=[ObjectType.TABLES],
env=BigQueryObserverEnvironment.PROD,
dag=dag
)
sensor >> mart >> dummy_end
I would like to create only 4 datasets from given tables which are on DEPENDS_ON_ONE list:
KONDN, KOTN201, KONBBYPRQ and KONMATGRPP in mart task.
I was thinking if I could use there outlets parameter inside of “mart”? Or maybe it should be totally separate and create new task between them or after any of them?
How can I do it using Airflow datasets?