Im using s3 event for lambda trigger it consumes the bucket_name, key_name from it and starts the step function ,the step function consists of glue job for ETL function when the file is good format via a lambda function if not pushes to the archive folder ,at the end of each workflow it sends sns message .
{
"Comment": "A description of my state machine",
"StartAt": "validate input csv",
"States": {
"validate input csv": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$",
"Parameters": {
"Payload.$": "$",
"FunctionName": "****"
},
"ResultPath": "$.validation_result",
"Next": "Choice"
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.validation_result.Payload.Validation",
"StringEquals": "SUCCESS",
"Next": "Glue StartJobRun"
},
{
"Variable": "$.validation_result.Payload.Validation",
"StringEquals": "FAILED",
"Next": "move-file"
}
]
},
"move-file": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "****"
},
"Next": "Error-alert"
},
"Glue StartJobRun": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun",
"Parameters": {
"JobName": "gluest",
"bucket_name.$": "$.bucket_name",
"folder_name.$": "$.key_name",
"file_name.$": "$.file_name"
},
"Next": "archived-file"
},
"archived-file": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "****",
"Message": {
"MESSAGE": "FILE IS ARCHIVED"
}
},
"End": true
},
"Error-alert": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "*****",
"Message": {
"message": "File not good mate !!"
}
},
"End": true
}
}
}
Please find the glue job below , I also finding difficulty in writing the output logs out of my glue even though I gave the IAM roles perfectly …For now I need my code to be good
import redshift_connector
import pandas as pd
import boto3
import sys
def lambda_handler(event, context):
print(event)
# Initialize Redshift Connector
host = 'redshift-arn'
port = 5439
database = 'dev'
user = 'admin'
password = '*******'
# Initialize Redshift Connector
conn = redshift_connector.connect(
host=host,
port=port,
database=database,
user=user,
password=password
)
# Access the values of the arguments
bucket_name = event['bucket_name']
key_name = event['key_name']
file_name = event['file_name']
result={}
result['bucket_name']=bucket_name
result['file_name']=file_name
source_folder,source_file_name=os.path.split(key_name)
print(source_folder)
#Trying to push all the arguements into the result
result['folder']=source_folder
# Load CSV file into a Pandas DataFrame
csv_file_path = f's3://{bucket_name}/{source_folder}/{file_name}'
print(csv_file_path)
df = pd.read_csv(csv_file_path)
# Define the Redshift table name
table_name = 'club_games' # Change this to your actual Redshift table name
# Define column names
columns = ['game_id', 'club_id', 'own_goals', 'own_position', 'own_manager_name', 'opponent_id', 'opponent_goals', 'opponent_position', 'opponent_manager_name', 'hosting', 'is_win']
# Write DataFrame to Redshift table
# Create the Redshift table (if not exists)
conn.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (game_id integer, club_id INT, own_goals INT, own_position INT, own_manager_name VARCHAR(50), opponent_id INT, opponent_goals INT, opponent_position INT, opponent_manager_name VARCHAR(50), hosting VARCHAR(20), is_win INT)")
# Insert data into the Redshift table
for _, row in df[columns].iterrows():
values = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
insert_sql = f"INSERT INTO {table_name} VALUES ({values})"
cursor.execute(insert_sql)
conn.commit()
# Archive the processed CSV file
s3 = boto3.client('s3')
archive_key='archive' + '/' + file_name
object_key= folder_name + '/' + file_name
# Copy the file to the archive folder
s3.copy_object(Bucket=bucket_name, CopySource={'Bucket': bucket_name, 'Key': object_key}, Key=archive_key)
# Delete the original file from the input folder
s3.delete_object(Bucket=bucket_name, Key=object_key)
conn.commit()
# Close the Redshift connection
conn.close()
return result
user22331353 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.