I’m trying to create a DAG (airflow) that processes an unformatted file uploaded to a GCP bucket and generates a new .csv file with specific transformations. The input file format is “CARTOLAS.YYYYMM” and the desired output is “CARTOLAS.YYYYMM.csv” both in the same bucket.
However, after deploying the Python script, I haven’t observed any actions being taken. I’ve checked the step logs but haven’t found any information.
thank you for any help or guide.
from airflow import DAG
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from datetime import datetime, timedelta
import os
col_1 = {
'000': [
[0, 3, 'iden'],
[3, 5, 'cta_region.arr'],
[5, 9, 'cta_codsuc.arr'],
[9, 15, 'cta_folio'],
[15, 16, 'cta_digito.arr'],
[16, 46, 'v_nombre.arr'],
[46, 98, 'v_calle.arr'],
[98, 118, 'v_comuna.arr'],
[118, 138, 'v_ciudad.arr'],
[138, 158, 'v_glosa_fondo.arr']
],
'111': [
[0, 3, 'iden'],
[3, 5, 'cta_region.arr'],
[5, 9, 'cta_codsuc.arr'],
[9, 15, 'cta_folio'],
[15, 16, 'cta_digito.arr'],
[16, 25, 'rut_titu'],
[25, 26, 'digito_titu.arr'],
[26, 56, 'nombre_titu.arr'],
[56, 65, 'rut_titu'],
[65, 66, 'digito_titu.arr'],
[66, 96, 'nombre_titu.arr'],
[96, 106, 'v_mesini'],
[106, 110, 'v_anoini'],
[110, 120, 'v_mester'],
[120, 124, 'v_anoter']
],
'222': [
[0, 3, 'iden'],
[3, 5, 'cta_region.arr'],
[5, 9, 'cta_codsuc.arr'],
[9, 15, 'cta_folio'],
[15, 16, 'cta_digito.arr'],
[16, 24, 'v_fecant2.arr'],
[24, 32, 'opc_fecfincar.arr'],
[32, 62, 'ceros_o_vacios_en_registro_222'],
[62, 76, 'hcu_salprocta_c_ini'],
[76, 86, 'v_valcuo_ini_00'],
[32, 62, 'valcuo_ini_01'],
[86, 101, 'hcu_salprocta_p_ini'],
[101, 116, 'ceros_o_vacios_en_registro_222'],
[116, 131, 'hcu_salretcta_p_ini'],
[131, 146, 'hcu_salprocta_p_ini'],
[146, 192, 'ceros_o_vacios_en_registro_222']
],
'333': [
[0, 3, 'iden'],
[3, 5, 'cta_region.arr'],
[5, 9, 'cta_codsuc.arr'],
[9, 15, 'cta_folio'],
[15, 16, 'cta_digito.arr'],
[16, 31, 'v_rentab1'],
[31, 46, 'v_rentab2'],
[46, 192, 'ceros_o_vacios_en_registro_333']
],
'444': [
[0, 3, 'iden'],
[3, 5, 'cta_region.arr'],
[5, 9, 'cta_codsuc.arr'],
[9, 15, 'cta_folio'],
[15, 16, 'cta_digito.arr'],
[16, 24, 'opc_fecfincar.arr'],
[24, 54, 'ceros_o_vacios_en_registro_444'],
[54, 68, 'hcu_salacucta_c_fin'],
[68, 78, 'v_valcuo_fin_00'],
[78, 93, 'hcu_salacucta_p_fin'],
[93, 108, 'ceros_o_vacios_en_registro_444'],
[108, 123, 'hcu_salretcta_p_fin'],
[123, 138, 'hcu_salprocta_p_fin'],
[138, 145, 'a_ingresos'],
[145, 160, 'a_montos_ing'],
[160, 167, 'a_egresos'],
[167, 182, 'a_montos_egr'],
[182, 192, 'ceros_o_vacios_en_registro_444']
],
'555': [
[0, 3, 'iden'],
[3, 5, 'cta_region.arr'],
[5, 9, 'cta_codsuc.arr'],
[9, 15, 'cta_folio'],
[15, 16, 'cta_digito.arr'],
[16, 26, 'v_valcuo_ini_00'],
[26, 36, 'v_valcuo_fin_00'],
[36, 43, 'v_factor_valcuo_00'],
[43, 58, 'v_factor_valcuo_01'],
[58, 192, 'ceros_o_vacios_en_registro_555']
],
'FORMATO_MOVIMIENTOS': [
[0, 3, 'iden'],
[3, 5, 'cta_region.arr'],
[5, 9, 'cta_codsuc.arr'],
[9, 15, 'cta_folio'],
[15, 16, 'cta_digito.arr'],
[16, 46, 'v_glosa_movimto.arr'],
[46, 56, 'meses+v_mesval'],
[56, 58, 'v_diaval'],
[58, 73, 'ceros_o_vacios'],
[73, 88, 'mov_valcuo'],
[88, 98, 'mov_monto_c'],
[98, 112, 'mov_fecval.arr'],
[112, 122, 'ceros_o_vacios'],
[122, 123, 'muestra']
]
}
# Function to process the file
def procesar_archivo(input_path, output_path):
iguales = False
posicion_data = -1
formato_nombre = ''
aux_cabecera = ''
with open(input_path, encoding="Latin-1") as fichero, open(output_path, 'w', encoding="Latin-1") as file_csv:
prueba = 0
aux_sin_registro = ''
for linea in fichero:
if linea[0:3] == '000':
if prueba == 1:
file_csv.write(aux_sin_registro + "~" * 14 + 'n')
iguales = False
posicion_data += 1
aux_cabecera = ''
aux_sin_registro = ''
prueba = 0
if iguales:
formato = col_1['FORMATO_MOVIMIENTOS']
formato_nombre = 'FORMATO_MOVIMIENTOS'
else:
formato = col_1[linea[0:3]]
formato_nombre = linea[0:3]
total_aux = len(formato)
posicion_aux = 0
linea_aux = ''
for posicion in formato:
linea_aux += str(linea[posicion[0]:posicion[1]]).strip()
if posicion_aux < total_aux:
linea_aux += '~'
posicion_aux += 1
if iguales:
file_csv.write(aux_cabecera + linea_aux + 'n')
prueba += 1
else:
aux_cabecera += linea_aux
if linea[0:3] == '555':
iguales = True
prueba += 1
aux_sin_registro = aux_cabecera
if prueba == 1:
file_csv.write(aux_sin_registro + "~" * 14 + 'n')
# Function to download file from GCS
def download_file_from_gcs(bucket_name, object_name, local_path):
hook = GCSHook()
hook.download(bucket_name, object_name, local_path)
# Function to upload file to GCS
def upload_file_to_gcs(bucket_name, object_name, local_path):
hook = GCSHook()
hook.upload(bucket_name, object_name, local_path)
# Define the default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
dag = DAG(
'dag_cartolas_gcs_to_csv',
default_args=default_args,
description='Detects a new file in GCS, processes it, and uploads the result back to GCS',
schedule_interval=None,
catchup=False,
)
# Task to list objects in GCS
list_objects = GCSListObjectsOperator(
task_id='list_objects',
bucket='cartolas-ahorros-test',
prefix='CARTOLAS.',
dag=dag,
)
# Task to download the file from GCS
def download_task(**kwargs):
bucket_name = 'cartolas-ahorros-test'
ti = kwargs['ti']
objects = ti.xcom_pull(task_ids='list_objects')
if not objects:
raise ValueError("No objects found with the prefix 'CARTOLAS.'")
object_name = objects[0] # Take the first object in the list
local_path = '/tmp/' + object_name
download_file_from_gcs(bucket_name, object_name, local_path)
return local_path
download_file = PythonOperator(
task_id='download_file',
python_callable=download_task,
provide_context=True,
dag=dag,
)
# Task to process the file
def process_task(**kwargs):
ti = kwargs['ti']
input_path = ti.xcom_pull(task_ids='download_file')
output_path = input_path + '.csv'
procesar_archivo(input_path, output_path)
return output_path
process_file = PythonOperator(
task_id='process_file',
python_callable=process_task,
provide_context=True,
dag=dag,
)
# Task to upload the processed file to GCS
def upload_task(**kwargs):
ti = kwargs['ti']
output_path = ti.xcom_pull(task_ids='process_file')
bucket_name = 'cartolas-ahorros-test'
object_name = os.path.basename(output_path)
upload_file_to_gcs(bucket_name, object_name, output_path)
upload_file = PythonOperator(
task_id='upload_file',
python_callable=upload_task,
provide_context=True,
dag=dag,
)
# Set task dependencies
list_objects >> download_file >> process_file >> upload_file ```