I’m literally new to data engineering where I’m using Python, PySpark and Pandas to create a data frame and I’m since a very long time I’m blocked and could not put my head around it. It is a simple problem but I’m stuck here.
Here is my code snippet which works fine (assuming there is only 1 primary key) without any loop iteration and generates dataframe in the end.
primary_keys.append(primary_key)
primary_keys = [primary_key]
df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
# Define the schema for the Spark DataFrame
schema = T.StructType([
T.StructField("primary_key", T.IntegerType(), True), # Integer primary key
T.StructField(mapped_column[0], T.StringType(), True) # Integer primary key
])
self.logger.info("$$$$$$$$$$$$$$$ Creating DF $$$$$$$$$$$$$$$$$$$$$")
# Create the Spark DataFrame from the Pandas DataFrame
spark_df = spark.createDataFrame(df, schema)
# (Optional) Verify the Spark DataFrame
spark_df.printSchema()
spark_df.show()
However, my requirement is to get this primary_keys
from a loop where every primary key is generated with the dynamic value for each iteration. I tried to do following but it keeps only last object and I ended up getting the same values (values for primary key and mapped_column) in the final data frame generated.
primary_keys = []
# Iterate over the list and access values directly
for row in column_values:
### some logic to generate the primary key from the loop iteration
primary_keys.append(primary_key)
df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
self.logger.info("$$$$$$$$$$$$$$$ For Loop Completed $$$$$$$$$$$$$$$$$$$$$")
# Define the schema for the Spark DataFrame
schema = T.StructType([
T.StructField("primary_key", T.IntegerType(), True), # Integer primary key
T.StructField(mapped_column[0], T.StringType(), True) # Integer primary key
])
self.logger.info("$$$$$$$$$$$$$$$ Creating DF $$$$$$$$$$$$$$$$$$$$$")
# Create the Spark DataFrame from the Pandas DataFrame
spark_df = spark.createDataFrame(df, schema)
# (Optional) Verify the Spark DataFrame
spark_df.printSchema()
spark_df.show()
I suspect the problem is here in the df
object in this statement df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
as it is replacing it rather than append to the existing one.
I would really appreciate if someone can assist me here, thank you