I have a java spark application where I need to create a dataset object. The trick is that it has multiple layers to it, creating issues when using createDataFrame(...)
method. The (dummy) schema for my dataset can be defined as below:
StructType objectEntities = new StructType(new StructField[]{
new StructField("id",DataTypes.StringType,true, Metadata.empty()),
new StructField("type",DataTypes.StringType,true,Metadata.empty())
});
StructType subjectEntities = new StructType(new StructField[]{
new StructField("id",DataTypes.StringType,true, Metadata.empty()),
new StructField("type",DataTypes.StringType,true,Metadata.empty())
});
StructType innerLevelStruct = new StructType();
innerLevelStruct = innerLevelStruct.add("f1",DataTypes.StringType,false);
innerLevelStruct = innerLevelStruct.add("f2",DataTypes.StringType,false);
innerLevelStruct = innerLevelStruct.add("f3",DataTypes.StringType,false);
innerLevelStruct = innerLevelStruct.add("f4",DataTypes.StringType,false);
innerLevelStruct = innerLevelStruct.add("f5",objectEntities,true);
innerLevelStruct = innerLevelStruct.add("f6",subjectEntities,true);
StructType outerLayerStruct = new StructType();
outerLayerStruct = outerLayerStruct.add("id",DataTypes.StringType,false);
outerLayerStruct = outerLayerStruct.add("type",DataTypes.StringType,false);
outerLayerStruct = outerLayerStruct.add("state",DataTypes.StringType,false);
outerLayerStruct = outerLayerStruct.add("items",innerLevelStruct,false);
StructType entityDetailsSchema = new StructType();
entityDetailsSchema = entityDetailsSchema.add("id",DataTypes.StringType,false);
entityDetailsSchema = entityDetailsSchema.add("cols",outerLayerStruct,false);
With schema defined, I thought I can define the data as so
List<Row> entries = new ArrayList<>();
entries.add(RowFactory.create("id",List.of(
"testId","testType","testState",
List.of(
"f1Val","f2Val","f3Val","f4Val",List.of("a","b"),List.of("c","v")
)
)));
Dataset<Row> output = sparkSession.createDataFrame(entries,entityDetailsSchema);
Where sparkSession
is a standard spark sessions defined as so:
sparkSession = SparkSession.builder()
.master("local[1]")
.appName("TestSpark")
.getOrCreate();
Normally, that would work for datasets with simple 1 layered schema. However, when I run my app with the schema I mentioned at the start, I get:
java.lang.IllegalArgumentException: The value ([testId, testType, testState, [f1Val, f2Val, f3Val, f4Val, [a, b], [c, v]]]) of the type (java.util.ImmutableCollections.ListN) cannot be converted to struct<id:string,type:string,state:string,innerLevelStruct:struct<f1:string,f2:string,f3:string,f4:string,objectEntities:struct<id:string,type:string>,subjectEntities:struct<id:string,type:string>>>
I suspect it is because I make use of List.of(...)
. The issue goes away by replacing List.of(...)
with RowFactory.create(...)
but it is not a suitable solution seeing as it creates problems when I try to convert dataset to POJO via encoder as so:
output.as(Encoders.bean(MyPojoClass.class))
.foreachPartition(partition -> {
// do work
});
Giving me an exception stating that it expects Arrays instead of structType:
org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] The deserializer is not supported: need a(n) "ARRAY" field but got "STRUCT<...>
So the question is as follows: how can I create dataset using the schema provided? How can I define a row that would populate each layer with data? Is RowFactory even capable of that – from what I read, it is a linear simple method, so not sure that it is even correct to use it?