I have two AWS lambda functions that run in docker containers that I trigger using Airflow, one at a time. I currently am triggering the data pipeline using my local MWAA for testing purposes. I want each lambda to only be invoked once for every DAG run.
For some reason, when either of my lambda functions exceeds around one minute, another concurrent lambda runs at the same time, and sometimes two extra lambdas run concurrently to the original. This is not what I want. I want no concurrency at all.
I can’t figure out why this is happening. I’ve tried a lot of different things but I can’t seem to figure it out.
Below is my Dag and my lambda configurations in terraform:
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
import boto3
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure':False,
'start_date': datetime(2023, 7, 1),
'retries': 0,
}
with DAG(
'data_pipeline',
default_args=default_args,
description="Pipeline for getting data articles and processing them",
schedule_interval=None,
catchup=False,
max_active_runs=1
) as data_dag:
start_task = EmptyOperator(
task_id = "Data-Airflow-Start"
)
fetch_data = LambdaInvokeFunctionOperator(
task_id='fetch_data_jsons',
function_name='send_data_article_to_bucket',
aws_conn_id='aws_default',
log_type='Tail',
invocation_type='RequestResponse',
dag=data_dag
)
fix_unready_articles = LambdaInvokeFunctionOperator(
task_id='fix_unready_articles',
function_name='fix_incomplete_articles',
aws_conn_id='aws_default',
log_type='Tail',
invocation_type='RequestResponse',
dag=data_dag
)
end_task = EmptyOperator(
task_id = "Data-Airflow-End"
)
start_task >> fetch_data >> fix_unready_articles >> end_task
Here is my lambda terraform configuration (I’m going to omit the IAM resources):
resource "aws_lambda_function" "lambda_function" {
function_name = var.lambda_function_name
role = aws_iam_role.lambda_execution_role.arn
package_type = local._package_type
image_uri = var.lambda_images_uri
timeout = 900 # max 15min timeout
memory_size = 2048
environment {
variables = var.lambda_env_vars
}
}
One thing that works somewhat, is that when I add the reserved_concurrent_executions = 1
field to my lambda resource in terraform, it prevents the concurrent executions, but then this causes an exception in the DAG, causing airflow to think that the task failed, when it actually succeeds. In this case, the DAG logs for the lambda’s run look like this exception (which is why airflow thinks the task failed):
[2024-07-18, 11:11:55 UTC] {{taskinstance.py:2698}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 428, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/operators/lambda_function.py", line 215, in execute
response = self.hook.invoke_lambda(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/lambda_function.py", line 82, in invoke_lambda
return self.conn.invoke(**trim_none_values(invoke_args))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 553, in _api_call
return self._make_api_call(operation_name, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 1009, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.TooManyRequestsException: An error occurred (TooManyRequestsException) when calling the Invoke operation (reached max retries: 4): Rate Exceeded.
[2024-07-18, 11:11:55 UTC] {{taskinstance.py:1138}} INFO - Marking task as FAILED. dag_id=inoreader_pipeline, task_id=fix_unready_articles, execution_date=20240718T111044, start_date=20240718T111045, end_date=20240718T111155
[2024-07-18, 11:11:55 UTC] {{standard_task_runner.py:107}} ERROR - Failed to execute job 196 for task fix_unready_articles (An error occurred (TooManyRequestsException) when calling the Invoke operation (reached max retries: 4): Rate Exceeded.; 2435)
[2024-07-18, 11:11:55 UTC] {{local_task_job_runner.py:234}} INFO - Task exited with return code 1
[2024-07-18, 11:11:55 UTC] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check
I even tried making a lambda with only a single busy loop of 2 minutes, and for some reason airflow still causes 3 concurrent executions of it.
When I test each lambda in the AWS console it only runs once like I want, and when I use an event bridge to trigger it it also only runs once. This leads me to think that it’s an airflow issue.
So to summarize, I want to prevent concurrent executions entirely, and I want airflow to mark the task as a success.
Any help would be greatly appreciated.