I’m running a spark job on AWS EMR 6.14 (based on Spark 3.4.1). I wanted to test the new push-based shuffle introduced in Spark 3.2.0 but it’s failing with a kryo exception when I’m enabling it.
My job code:
package com.analytics.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.analytics.archive.AnalyticsEventWrapperRowMapper;
import com.analytics.AnalyticsEventWrapper;
import com.analytics.IntermediateEventWrapper;
import com.analytics.spark.AnalyticsSparkConfBuilder;
public class ProcessAnalyticsEventJob {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = AnalyticsSparkConfBuilder.newBuilder("ProcessAnalyticsEventJob")
.setClusterUrl("yarn")
.enableKryo(AnalyticsEventWrapper.class, IntermediateEventWrapper.class)
.enableSnappy()
.build();
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = AnalyticsEventWrapperRowMapper::map;
Dataset<AnalyticsEventWrapper> inputDataset = spark.read().parquet("s3://bucket/path/to/events").map(mapAsAnalyticsEventWrapper, Encoders.kryo(AnalyticsEventWrapper.class));
// rest of the job (groupby aggregation and write output)
}
}
The issue is happening during Executor creation phase
24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Failed to register classes with Kryo
org.apache.spark.SparkException: Failed to register classes with Kryo
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48) ~[kryo-shaded-4.0.2.jar:?]
at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.executor.Executor.<init>(Executor.scala:143) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_402]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_402]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
Caused by: java.lang.ClassNotFoundException: com.analytics.AnalyticsEventWrapper
at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_402]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
at org.apache.spark.util.Utils$.classForName(Utils.scala:228) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.15.jar:?]
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.15.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
... 24 more
24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown
24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
the job is launched on EMR with spark-submit command (all application code is packaged in application.jar and class are I verified classed are present here)
// work
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob /home/hadoop/application.jar --deploy-mode cluster --verbose
// kryo issue
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob /home/hadoop/application.jar --deploy-mode cluster --verbose --conf "spark.shuffle.push.enabled=true" --conf "spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver"