If I have a Dataframe of Row with a schema such as StructType(StructField(Foo,String,true), StructField(Bar,String,true), StructField(XYZ,String,true))
and filled with data. Actual case has 22 columns.
I want to extend that dataframe with structure StructType(StructField(Foo,String,true), StructField(NewColumn,String,true), StructField(Bar,String,true), StructField(XYZ,String,true))
. Actual target schema has 282 columns.
Where all the columns of the initial schema are still there but I have one (or a couple hundreds) extra columns, in between the initial schema columns. The order is important due to the downstream consumer app using headerless input file, relying on element order (otherwise I would just use withColumn()
in a loop ?). I am just completing the missing columns that weren’t filled in my case before preparing for export.
I tried
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema); // schema is an StructType instance with my target schema
Dataset<Row> ds = spark.createDataset(dataset.rdd(), encoder);
ds.show(5);
The source and target schema are entire String types elements, and both Dataset so I can rule out data type conversion issues.
Upon triggering the defered execution (such as the show() call in example), I get the following exception that I didn’t understand.
Caused by: java.lang.ArrayIndexOutOfBoundsException: 22
at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174)
at org.apache.spark.sql.Row$class.isNullAt(Row.scala:193)
at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:166)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_11$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
I can see the 22 value is my number of initial fields but what am I missing ? Is it because createDataset
requires the number of columns to match the number of columns declared in schema ? It will not create the missing ones ?