I have a CSV file in my AWS S3 bucket that I would like to access in AWS Athena. I need to create a relational database schema using Glue to do that. I’ve tried a couple different ways to do this. The first way was by using a Glue crawler and defining a CSV classifier (that defined datatypes for the columns). The second way was by creating a visual ETL job and defining Python script to try to typecast some of the columns in the CSV file to datatypes I want. When I tried the first method, the float and integer columns weren’t properly preserved (some of the data was either missing or the table would give a string can’t be converted to real value error). The second method is causing the AWS Glue Catalog to not even render a schema for the data. I’m not sure what I’m doing wrong.
Example Data:
url,brand,color,type,material,item_height,item_depth,item_width,item_weight,condition,price,seller,numsold,rating
https://www.ebay.com/itm/235324388478?_trkparms=5079%3A5000006518,Cuisinart,Silver,Blends,Stainless Steel,15,-1,-1,-1,"Certified - Refurbished
Certified - Refurbished",149.99,Cuisinart,42,-1
CSV crawler for first method
#My Python script for second method:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
# Script generated for node Custom Transform
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
dynf = dfc.select(list(dfc.keys())[0])
df = dynf.toDF()
from pyspark.sql.functions import col, regexp_extract
from pyspark.sql.types import FloatType, IntegerType
from awsglue.dynamicframe import DynamicFrame
pattern = r"[d.]+"
columns_to_transform = ['price', 'rating', 'item_width', 'item_weight', 'item_height', 'item_depth', 'numsold']
for col_name in columns_to_transform:
# Check if column exists in the DataFrame
if col_name in df.columns:
# Apply regex pattern to extract numerical values and convert to appropriate types
df = df.withColumn(col_name, regexp_extract(col(col_name), pattern, 0).cast(FloatType() if col_name != 'numsold' else IntegerType()))
res = DynamicFrame.fromDF(df, glueContext, 'changed')
return DynamicFrameCollection({'Custom_Transform_0': res}, glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Script generated for node Amazon S3
AmazonS3_node1713841179360 = glueContext.create_dynamic_frame.from_options(format_options={"quoteChar": """, "withHeader": True, "separator": ",", "optimizePerformance": False}, connection_type="s3", format="csv", connection_options={"paths": ["s3://ecommerce-deals"]}, transformation_ctx="AmazonS3_node1713841179360")
# Script generated for node Custom Transform
CustomTransform_node1713841197747 = MyTransform(glueContext, DynamicFrameCollection({"AmazonS3_node1713841179360": AmazonS3_node1713841179360}, glueContext))
# Script generated for node Select From Collection
SelectFromCollection_node1713842428985 = SelectFromCollection.apply(dfc=CustomTransform_node1713841197747, key=list(CustomTransform_node1713841197747.keys())[0], transformation_ctx="SelectFromCollection_node1713842428985")
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1713841379233 = glueContext.write_dynamic_frame.from_catalog(frame=SelectFromCollection_node1713842428985, database="default", table_name="ebay_csv", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"}, transformation_ctx="AWSGlueDataCatalog_node1713841379233")
job.commit()
Visual ETL job
2