I’m having difficulties writing a very large file (over 12 million rows of data) to databricks.
from pyspark.sql import Row
testDF = spark.read.csv(path='path', schema=generic_struct, sep='|', header='false')
current_to = None
part_rows = []
prt_seq = 0
for idx, row in enumerate(testDF.rdd.toLocalIterator()):
if row['RECORD_TYPE'] == '03':
#convert Row object to dictionary to make it mutable and assign TO value.
rowDict = row.asDict()
prt_seq += 1
rowDict['TO_NUM'] = current_to
rowDict['PRT_SEQ'] = prt_seq
# Create new row object using kwargs unpacking and add it to the list
part_rows.append(Row(**rowDict))
partDF = spark.createDataFrame(part_rows, generic_struct).selectExpr(list of column headers)
missingPartDF = spark.sql("""
Irrelevant SQL
"""
)
partDF = partDF.union(missingPartDF)
partDF.write.csv('path/database/table', header=True, sep =',', escape=""", mode='overwrite')
When I try to run the above code, I get the following error:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 194614:0 was 1155776614 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
As a result, I tried to chunk the dictionary out before writing it to the table with the following, but the script keeps timing out:
from itertools import islice
def chunks(data, SIZE=1000000):
it = iter(data)
for i in range(0, len(data), SIZE):
yield {k:data[k] for k in islice(it, SIZE)}
Is there a more efficient way to handle a file of this size?