I have a CSV file in my AWS S3 bucket that I would like to access in AWS Athena. I need to create a relational database schema using Glue to do that. I’ve tried a couple different ways to do this. The first way was by using a Glue crawler and defining a CSV classifier (that defined datatypes for the columns). The second way was by creating a visual ETL job and defining Python script to try to typecast some of the columns in the CSV file to datatypes I want. When I tried the first method, the float and integer columns weren’t properly preserved (some of the data was either missing or the table would give a string can’t be converted to real value error). The second method is causing the AWS Glue Catalog to not even render a schema for the data. I’m not sure what I’m doing wrong.
url,brand,color,type,material,item_height,item_depth,item_width,item_weight,condition,price,seller,numsold,rating
https: //www.ebay.com/itm/235324388478?_trkparms=5079%3A5000006518,Cuisinart,Silver,Blends,Stainless Steel,15,-1,-1,-1,"Certified - Refurbished
Certified - Refurbished", 149.99 ,Cuisinart, 42 , -1
<code>Example Data:
url,brand,color,type,material,item_height,item_depth,item_width,item_weight,condition,price,seller,numsold,rating
https://www.ebay.com/itm/235324388478?_trkparms=5079%3A5000006518,Cuisinart,Silver,Blends,Stainless Steel,15,-1,-1,-1,"Certified - Refurbished
Certified - Refurbished",149.99,Cuisinart,42,-1
</code>
Example Data:
url,brand,color,type,material,item_height,item_depth,item_width,item_weight,condition,price,seller,numsold,rating
https://www.ebay.com/itm/235324388478?_trkparms=5079%3A5000006518,Cuisinart,Silver,Blends,Stainless Steel,15,-1,-1,-1,"Certified - Refurbished
Certified - Refurbished",149.99,Cuisinart,42,-1
CSV crawler for first method
< code > #My Python script for second method:
from awsglue. transforms import *
from awsglue. utils import getResolvedOptions
from pyspark. context import SparkContext
from awsglue. context import GlueContext
from awsglue. job import Job
from awsglue. dynamicframe import DynamicFrameCollection
from awsglue. dynamicframe import DynamicFrame
# Script generated for node Custom Transform
def MyTransform ( glueContext, dfc ) - > DynamicFrameCollection:
dynf = dfc. select ( list ( dfc. keys ())[ 0 ])
from pyspark. sql . functions import col, regexp_extract
from pyspark. sql . types import FloatType, IntegerType
from awsglue. dynamicframe import DynamicFrame
columns_to_transform = [ 'price' , 'rating' , 'item_width' , 'item_weight' , 'item_height' , 'item_depth' , 'numsold' ]
for col_name in columns_to_transform:
# Check if column exists in the DataFrame
if col_name in df. columns :
# Apply regex pattern to extract numerical values and convert to appropriate types
df = df. withColumn ( col_name, regexp_extract ( col ( col_name ) , pattern, 0 ) . cast ( FloatType () if col_name != 'numsold' else IntegerType ()))
res = DynamicFrame. fromDF ( df, glueContext, 'changed' )
return DynamicFrameCollection ({ 'Custom_Transform_0' : res } , glueContext )
args = getResolvedOptions ( sys. argv , [ 'JOB_NAME' ])
glueContext = GlueContext ( sc )
spark = glueContext. spark_session
job. init ( args [ 'JOB_NAME' ] , args )
# Script generated for node Amazon S3
AmazonS3_node1713841179360 = glueContext. create_dynamic_frame . from_options ( format_options= { "quoteChar" : "" ", " withHeader ": True, " separator ": " , ", " optimizePerformance ": False}, connection_type=" s3 ", format=" csv ", connection_options={" paths ": [" s3: //ecommerce-deals"]}, transformation_ctx="AmazonS3_node1713841179360")
# Script generated for node Custom Transform
CustomTransform_node1713841197747 = MyTransform ( glueContext, DynamicFrameCollection ({ "AmazonS3_node1713841179360" : AmazonS3_node1713841179360 } , glueContext ))
# Script generated for node Select From Collection
SelectFromCollection_node1713842428985 = SelectFromCollection. apply ( dfc=CustomTransform_node1713841197747, key= list ( CustomTransform_node1713841197747. keys ())[ 0 ] , transformation_ctx= "SelectFromCollection_node1713842428985" )
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1713841379233 = glueContext. write_dynamic_frame . from_catalog ( frame=SelectFromCollection_node1713842428985, database= "default" , table_name= "ebay_csv" , additional_options= { "enableUpdateCatalog" : True , "updateBehavior" : "UPDATE_IN_DATABASE" } , transformation_ctx= "AWSGlueDataCatalog_node1713841379233" )
<code>#My Python script for second method:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
# Script generated for node Custom Transform
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
dynf = dfc.select(list(dfc.keys())[0])
df = dynf.toDF()
from pyspark.sql.functions import col, regexp_extract
from pyspark.sql.types import FloatType, IntegerType
from awsglue.dynamicframe import DynamicFrame
pattern = r"[d.]+"
columns_to_transform = ['price', 'rating', 'item_width', 'item_weight', 'item_height', 'item_depth', 'numsold']
for col_name in columns_to_transform:
# Check if column exists in the DataFrame
if col_name in df.columns:
# Apply regex pattern to extract numerical values and convert to appropriate types
df = df.withColumn(col_name, regexp_extract(col(col_name), pattern, 0).cast(FloatType() if col_name != 'numsold' else IntegerType()))
res = DynamicFrame.fromDF(df, glueContext, 'changed')
return DynamicFrameCollection({'Custom_Transform_0': res}, glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Script generated for node Amazon S3
AmazonS3_node1713841179360 = glueContext.create_dynamic_frame.from_options(format_options={"quoteChar": """, "withHeader": True, "separator": ",", "optimizePerformance": False}, connection_type="s3", format="csv", connection_options={"paths": ["s3://ecommerce-deals"]}, transformation_ctx="AmazonS3_node1713841179360")
# Script generated for node Custom Transform
CustomTransform_node1713841197747 = MyTransform(glueContext, DynamicFrameCollection({"AmazonS3_node1713841179360": AmazonS3_node1713841179360}, glueContext))
# Script generated for node Select From Collection
SelectFromCollection_node1713842428985 = SelectFromCollection.apply(dfc=CustomTransform_node1713841197747, key=list(CustomTransform_node1713841197747.keys())[0], transformation_ctx="SelectFromCollection_node1713842428985")
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1713841379233 = glueContext.write_dynamic_frame.from_catalog(frame=SelectFromCollection_node1713842428985, database="default", table_name="ebay_csv", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"}, transformation_ctx="AWSGlueDataCatalog_node1713841379233")
job.commit()
</code>
#My Python script for second method:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
# Script generated for node Custom Transform
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
dynf = dfc.select(list(dfc.keys())[0])
df = dynf.toDF()
from pyspark.sql.functions import col, regexp_extract
from pyspark.sql.types import FloatType, IntegerType
from awsglue.dynamicframe import DynamicFrame
pattern = r"[d.]+"
columns_to_transform = ['price', 'rating', 'item_width', 'item_weight', 'item_height', 'item_depth', 'numsold']
for col_name in columns_to_transform:
# Check if column exists in the DataFrame
if col_name in df.columns:
# Apply regex pattern to extract numerical values and convert to appropriate types
df = df.withColumn(col_name, regexp_extract(col(col_name), pattern, 0).cast(FloatType() if col_name != 'numsold' else IntegerType()))
res = DynamicFrame.fromDF(df, glueContext, 'changed')
return DynamicFrameCollection({'Custom_Transform_0': res}, glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Script generated for node Amazon S3
AmazonS3_node1713841179360 = glueContext.create_dynamic_frame.from_options(format_options={"quoteChar": """, "withHeader": True, "separator": ",", "optimizePerformance": False}, connection_type="s3", format="csv", connection_options={"paths": ["s3://ecommerce-deals"]}, transformation_ctx="AmazonS3_node1713841179360")
# Script generated for node Custom Transform
CustomTransform_node1713841197747 = MyTransform(glueContext, DynamicFrameCollection({"AmazonS3_node1713841179360": AmazonS3_node1713841179360}, glueContext))
# Script generated for node Select From Collection
SelectFromCollection_node1713842428985 = SelectFromCollection.apply(dfc=CustomTransform_node1713841197747, key=list(CustomTransform_node1713841197747.keys())[0], transformation_ctx="SelectFromCollection_node1713842428985")
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1713841379233 = glueContext.write_dynamic_frame.from_catalog(frame=SelectFromCollection_node1713842428985, database="default", table_name="ebay_csv", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"}, transformation_ctx="AWSGlueDataCatalog_node1713841379233")
job.commit()
Visual ETL job