I am fetching data from the Database and using ResultSet i am creating
org.apache.spark.sql.Row (required to dump in the parquet file using Spark Sql)
Since the number of records in the database are large i am doing this in batches of 2L.
I have used coalesce(1) to make sure it has 1 partition and writes only to single file.
But what i am getting is multiple parquet files in batches of 2L.(though partition is only 1)
I need only single parquet file as if i have multiple my read performance is slow as it is creating multiple small files of 2000kb each.
But combined file size is less than 100Mb.
I tried this at the end to combine it into single file but this may reduce my write performance:
//Read the batch file and write it back to a single file
Dataset<Row> batchDf = spark.read().parquet(parquetFileName);
batchDf.repartition(1).write().mode("overwrite").parquet(parquetFileName);
Original Code:
while (rs.next()) {
Object[] fieldValues = new Object[columnCount];
for (int i = 1; i <= columnCount; i++) {
Object val = rs.getObject(i);
if(val != null){
fieldValues[i - 1] = val;
}
}
rows.add(RowFactory.create(fieldValues));
// Write the batch to Parquet when it reaches the specified batch size
if (rows.size() >= batchSize) {
// Create DataFrame from the current batch of rows
Dataset<Row> batchDf = spark.createDataFrame(rows, schema);
// Write the batch to Parquet
batchDf.coalesce(1).write().mode("append").parquet(parquetFileName);
doesDataExist = true;
// Clear the batch for the next set of rows
rows.clear();
}
}
// Write any remaining rows (less than batch size) to Parquet
if (!rows.isEmpty()) {
Dataset<Row> remainingDf = spark.createDataFrame(rows, schema);
remainingDf.coalesce(1).write().mode("append").parquet(parquetFileName);
doesDataExist = true;
}
Rohan Gala is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
2