I’m using Apache Airflow to automate the process of loading data into a PostgreSQL database. My workflow consists of multiple tasks where I fetch customer and order data via API, insert or update customer records, and then insert order data into the database.
Everything works fine when I execute the tasks sequentially, but when I try to parallelize them, I encounter the following error during the insert operations:
[2024-12-26T17:24:59.503+0000] {db_insert_customer.py:60} ERROR - Failed to insert/update customer 4507****0 - leticia.****@hotmail.com: current transaction is aborted, commands ignored until end of transaction block
Traceback (most recent call last):
File "/opt/airflow/airflow_files/tasks/data_load/db_insert_customer.py", line 57, in db_insert_customer
cursor.execute(query, (row['name'], row['email'], row['cpf'], row['phone']))
psycopg2.errors.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block
Description of the Workflow:
- I retrieve data for a given week.
- I insert customer data into the database (using an UPDATE if the customer already exists, to avoid duplication).
- After that, I fetch order data and insert it into the orders table.
- These operations are done in parallel for each week, as each week’s data is independent.
However, when running the tasks in parallel, I sometimes face the issue where the transaction is aborted, causing the data to not be inserted into the database.
Questions:
- What could be causing the transaction to be aborted when I parallelize the tasks?
- How can I resolve this issue and ensure data is correctly inserted into the database, even with parallel execution?
- Should I consider adding a step to save data into a CSV file before inserting it into the database to avoid these transaction issues? Or would this add unnecessary complexity?
Additional Information:
- I’m using PostgreSQL as the database.
- I’m using psycopg2 to interact with the database.
- The customer_id field is set to serial, which might be a potential issue in parallel transactions.