I’m trying to group rows by identifier and apply some filtering into the resulting array within the same DataFrame.
What I was doing before was extracting all the ids doing a df.select("id").distinct()
and then perform a Scala for-loop, so when the number of ids is quite high it doesn’t scale and I’m hoping that doing the following way can at least distribute the load to executors more efficiently.
Taking this DataFrame:
import spark.implicits._
val df = Seq(
("id1", "subgroup1", "pid1", 1),
("id1", "subgroup1", "pid2", 9),
("id1", "subgroup8", "pid1", 10),
("id2", "subgroup2", "pid1", 4),
("id2", "subgroup2", "pid2", 8),
("id2", "subgroup2", "pid1", 4),
).toDF("id", "subgroup", "pid", "counter")
// .show()
+---+---------+----+-------+
|id |subgroup |pid |counter|
+---+---------+----+-------+
|id1|subgroup1|pid1|1 |
|id1|subgroup1|pid2|9 |
|id1|subgroup8|pid1|10 |
|id2|subgroup2|pid1|4 |
|id2|subgroup2|pid2|8 |
|id2|subgroup2|pid1|4 |
+---+---------+----+-------+
I’m first performing a groupBy
by id
so I collect all the records into a list typed column.
Item
is a simple case class.
case class Item(id: String, subgroup: String, pid: String, counter: Long) extends Serializable
implicit val itemEncoder: Encoder[Item] = Encoders.product[Item]
val curatedDf = df
.withColumn("rowStruct", struct("id", "subgroup", "pid", "counter").as(itemEncoder))
.groupBy(col("id"))
.agg(
collect_list("rowStruct").as("rowList")
)
// .show()
+---+----------------------------------------------------------------------------------+
|id |rowList |
+---+----------------------------------------------------------------------------------+
|id1|[{id1, subgroup1, pid1, 1}, {id1, subgroup1, pid2, 9}, {id1, subgroup8, pid1, 10}]|
|id2|[{id2, subgroup2, pid1, 4}, {id2, subgroup2, pid2, 8}, {id2, subgroup2, pid1, 4}] |
+---+----------------------------------------------------------------------------------+
Now, here’s when I’m getting the Schema for type org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] is not supported
exception.
curatedDf
.withColumn("filteredList", udf[DataFrame, Array[Item]]((items: Array[Item]) => {
// TODO: all logic within, new DFs, counts over those DF and yet another set of Row that can be parsed into a DF as a result.
}).apply(col("rowList")))
Any clue on how to workaround this?
I was thinking about using foreach
too over curatedDf
but I do really need the filtered DataFrame as column in the DataFrame to later count over it and more stuff in there…
4
Per the error this isn’t possible. You can only accept and return single objects per row, your object can be arrays/seqs but you cannot use dataframes at all in UDFs, the spark session does not exist on executors. It may run locally but will fail when run on a non local relation.