I am trying to add a new Row to each Partition in my Spark Job. I am using the following code to achieve this:
StructType rowType = new StructType();
rowType.add(DataTypes.createStructField("value", DataTypes.StringType, true));
sourceDataset = sourceDataset.mapPartitions(new MapPartitionsFunction<String, String>() {
List<String> rows = new ArrayList<>();
@Override
public Iterator<String> call(Iterator<String> rowIterator) throws Exception {
int counter =0;
while (rowIterator.hasNext()) {
String dataRow = rowIterator.next();
rows.add(dataRow);
counter++;
}
JsonObject jsonObject = func.get();
String[] values = new String[1];
values[0] = jsonObject.toString();
rows.add(values[0]);
return rows.iterator();
}
}
,RowEncoder.apply(rowType)
);
And I am getting the following Exception.
org.apache.spark.sql.AnalysisException: Try to map struct<> to Tuple1, but failed as the number of fields does not line up
In the original dataset we have “value” which is of String Type and it is a Json in String format.
I have tried changing the MapPartitionsFunction<String,String> to MapPartitionsFunction<Row,Row> and still getting the same Exception.
Thanks
Sateesh
StructType is an immutable class, add returns a new StructType, your code:
StructType rowType = new StructType();
rowType.add(DataTypes.createStructField("value", DataTypes.StringType, true));
throws away the new struct type with “value” added. Try this instead:
StructType rowType = new StructType().
add(DataTypes.createStructField("value", DataTypes.StringType, true));