I’m looking for a way to process a pyspark data frame in chunks – so regardless of the number of rows in it (whether 5 rows, 2,000,000, etc) – I will process it in a loop and “pop” a chunk of x rows at a time (or less, if no more than that available), until it’s all processed. It’s a very similar approach to “pop”, but with multiple rows at a time.
The code needs to be agnostic of the structure of the data, which can change from run to run. It’s also important for me to keep the order of rows, so not to shuffle them along the way.
I have tried working with limit
and subtract
to acheive this effect, as follows (given the original data set is loaded into inputDf
):
chunkSize = 100
keepGoing = True
while keepGoing:
unprocessedRows = inputDf.count()
if unprocessedRows >= processingChunkSize:
chunkDf = inputDf.limit(processingChunkSize)
inputDf = inputDf.subtract(chunkDf)
elif unprocessedRows > 0:
keepGoing = False
chunkDf = inputDf
else:
break
# process chunk...
chunkDf.show()
However, while subtract
indead removed the correct number of rows from the inputDf
, it did not remove the exact same rows that were extracted into chunkDf
– resulting in inconsistencies – some rows were present in multiple chunks while others were not present at all. I’m not sure why that is, but I read that in some scenarios subtract may do that. Same goes to exceptAll
.
I also tried a similar approach but with using tail
to retain the last number of rows that were not extracted by limit – but again, the result was similarly incosistent:
chunkSize = 100
keepGoing = True
while keepGoing:
unprocessedRows = inputDf.count()
if unprocessedRows >= chunkSize:
chunkDf = inputDf.limit(chunkSize)
if unprocessedRows > chunkSize:
inputDf = inputDf.tail(unprocessedRows-chunkSize)
else:
keepGoing = False
elif unprocessedRows > 0:
keepGoing = False
chunkDf = inputDf
else:
break
# process chunk...
chunkDf.show()
Can anyone advise how I could get the desired outcome while ensuring that data is not missed or duplicated in the process?