I’m encountering an issue while trying to run a Spark job in a Dockerized environment. I have a Docker Compose setup with a Spark cluster configured using bitnami/spark images. However, when I execute my Spark code inside a JupyterLab container, I encounter an issue that doesn’t allow any kind of collect() operation (show(), count(), etc.).
Docker Compose Configuration:
services:
jupyterlab:
image: jupyter/all-spark-notebook
ports:
- "8888:8888" # JupyterLab
volumes:
- ./data:/home/jovyan/data # Mount a local directory to the container for persistent storage
environment:
- JUPYTER_ENABLE_LAB=yes
spark-master:
image: bitnami/spark:latest
ports:
- "8080:8080" # Spark Web UI
- "7077:7077" # Spark master
- "4040:4040"
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_MODE=master
spark-worker:
image: bitnami/spark:latest
ports:
- "8081:8081" # Spark Web UI
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
volumes:
- ./data:/home/jovyan/data
depends_on:
- spark-master
The code that I try to run on a Jupyter Notebook (the SparkSession is created properly and the app appears on the SparkUI):
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,FloatType,IntegerType,StringType
from pyspark.sql.functions import from_json,col
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
import os
SPARK_MASTER = "spark://spark-master:7077"
logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logging.getLogger("spark_structured_streaming")
try:
spark_session = SparkSession.builder
.master(SPARK_MASTER)
.appName("SparkTest")
.getOrCreate()
spark_session.sparkContext.setLogLevel("DEBUG")
logging.info('Spark session created successfully')
except Exception:
logging.error("Couldn't create the spark session")
psdf = ps.DataFrame(
{'a': [1, 2, 3, 4, 5, 6],
'b': [100, 200, 300, 400, 500, 600],
'c': ["one", "two", "three", "four", "five", "six"]},
index=[10, 20, 30, 40, 50, 60])
psdf
And it leads to the following error:
Caused by: java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID = 823754013007382808, local class serialVersionUID = 3516924559342767982
at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:597)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2051)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2051)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2224)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:86)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Any insights on how to resolve this issue would be greatly appreciated.