We are upgrading the gcp dataproc cluster to 2.2debian12 image with
spark version is 3.5.0
scala version is 2.12.18
but with these version 1 major change is udf method with return type parameter is deprecated since Spark 3.0.0.how to return struct type from udf function.We have some udf function which are returning IntegerType and StructType
with spark 3.4 and scala 2.12.16 version on dataproc image 2.0 we were able to run our jobs by setting the below property.
–properties=spark.sql.legacy.allowUntypedScalaUDF=true
Since we been migrated to 3.5 and scala 2.12.18 version of dataproc image 2. we are getting below error message.
exception": "AnalysisException: [UNTYPED_SCALA_UDF] Youu0027re using untyped Scala UDF,
which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument,
and the closure will see the default value of the Java type for the null argument, e.g. udf((x: Int) u003du003e x, IntegerType),
the result is 0 for null input. To get rid of this error, you could:n1. use typed Scala UDF APIs(without return type parameter),
e.g. udf((x: Int) u003du003e x).n2. use Java UDF APIs, e.g. udf(new UDF1[String, Integer] { override def call(s: String): Integer u003d s.length() }, IntegerType),
if input types are all non primitive.n3. set "spark.sql.legacy.allowUntypedScalaUDF" to "true" and use this API with caution.",
So code below give error on how to correct this method, so there are no errors, and it should sync with the current UDF structure.
val timestampParserLocal: UserDefinedFunction = udf((in: Any, pattern: String, defaultTimezone: String, dropSubSeconds: Boolean, optionalTZ: Boolean, fixInferTimestampsBackwardsCompatibility: Boolean) => {
val (t, tl, tb, offset) = in match {
case ts: Timestamp =>
if (fixInferTimestampsBackwardsCompatibility) {
val format1 = new SimpleDateFormat(pattern)
format1.setLenient(false)
val value = format1.format(ts)
parseWithLocal(value, pattern, defaultTimezone)
} else {
(ts, ts, getAsLondon(ts), 0) // TimeZone.getTimeZone(defaultTimezone).getOffset(ts.getTime))
}
case value: String => parseWithLocal(adjTsString(value, dropSubSeconds, optionalTZ), pattern, defaultTimezone)
case value: Long => parseWithLocal(value.toString, pattern, defaultTimezone)
case _ =>
throw new RuntimeException(s"Unexpected datatype, in=${in}, class=${in.getClass.getName}")
}
Row(t, tl, tb, offset)
}, StructType(Seq(
StructField("t", TimestampType, nullable = false),
StructField("tl", TimestampType, nullable = true),
StructField("tlon", TimestampType, nullable = true),
StructField("o", IntegerType, nullable = true)
)))
4