I have have a docker container running spark connect. Both the apache and bitmani images work.
<code>services:
spark-connect:
image: apache/spark:latest
container_name: spark-connect
environment:
- SPARK_NO_DAEMONIZE=yes
ports:
- '4040:4040'
- "15002:15002"
command: /opt/spark/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1 --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp"
networks:
- arr-network
spark-connect-bitnami:
image: docker.io/bitnami/spark:latest
container_name: spark-connect-bitnami
ports:
- '4041:4040'
- "15003:15002"
command: start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1
networks:
- arr-network
jupyter:
build:
context: .
dockerfile: Dockerfile_jupyter
container_name: jupyter
volumes:
- ./jupyter:/home/jovyan/arr
- ${HOME}/.config:/home/jovyan/.config
# - ../arr-hive:/home/spark/spark-warehouse
ports:
- "8888:8888"
command: start-notebook.py --ip='*' --NotebookApp.token='' --NotebookApp.password=''
networks:
- arr-network
networks:
arr-network:
driver: bridge
</code>
<code>services:
spark-connect:
image: apache/spark:latest
container_name: spark-connect
environment:
- SPARK_NO_DAEMONIZE=yes
ports:
- '4040:4040'
- "15002:15002"
command: /opt/spark/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1 --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp"
networks:
- arr-network
spark-connect-bitnami:
image: docker.io/bitnami/spark:latest
container_name: spark-connect-bitnami
ports:
- '4041:4040'
- "15003:15002"
command: start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1
networks:
- arr-network
jupyter:
build:
context: .
dockerfile: Dockerfile_jupyter
container_name: jupyter
volumes:
- ./jupyter:/home/jovyan/arr
- ${HOME}/.config:/home/jovyan/.config
# - ../arr-hive:/home/spark/spark-warehouse
ports:
- "8888:8888"
command: start-notebook.py --ip='*' --NotebookApp.token='' --NotebookApp.password=''
networks:
- arr-network
networks:
arr-network:
driver: bridge
</code>
services:
spark-connect:
image: apache/spark:latest
container_name: spark-connect
environment:
- SPARK_NO_DAEMONIZE=yes
ports:
- '4040:4040'
- "15002:15002"
command: /opt/spark/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1 --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp"
networks:
- arr-network
spark-connect-bitnami:
image: docker.io/bitnami/spark:latest
container_name: spark-connect-bitnami
ports:
- '4041:4040'
- "15003:15002"
command: start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1
networks:
- arr-network
jupyter:
build:
context: .
dockerfile: Dockerfile_jupyter
container_name: jupyter
volumes:
- ./jupyter:/home/jovyan/arr
- ${HOME}/.config:/home/jovyan/.config
# - ../arr-hive:/home/spark/spark-warehouse
ports:
- "8888:8888"
command: start-notebook.py --ip='*' --NotebookApp.token='' --NotebookApp.password=''
networks:
- arr-network
networks:
arr-network:
driver: bridge
Dockerfile for Jupiter
<code>FROM quay.io/jupyter/base-notebook
RUN mamba install --yes pandas pyspark[connect] grpcio grpcio-status black jupyterlab_code_formatter &&
mamba clean --all -f -y &&
fix-permissions "${CONDA_DIR}" &&
fix-permissions "/home/${NB_USER}"
USER root
ARG spark_uid=185
RUN groupadd --system --gid=${spark_uid} spark &&
useradd --system --uid=${spark_uid} --gid=spark spark --create-home &&
usermod -a -G users spark &&
mkdir -m 775 /home/spark/spark-warehouse &&
chown spark /home/spark/spark-warehouse &&
chgrp users /home/spark/spark-warehouse &&
echo "spark ALL=(ALL) NOPASSWD: ALL" > /etc/sudoers &&
chmod 0440 /etc/sudoers
USER spark
</code>
<code>FROM quay.io/jupyter/base-notebook
RUN mamba install --yes pandas pyspark[connect] grpcio grpcio-status black jupyterlab_code_formatter &&
mamba clean --all -f -y &&
fix-permissions "${CONDA_DIR}" &&
fix-permissions "/home/${NB_USER}"
USER root
ARG spark_uid=185
RUN groupadd --system --gid=${spark_uid} spark &&
useradd --system --uid=${spark_uid} --gid=spark spark --create-home &&
usermod -a -G users spark &&
mkdir -m 775 /home/spark/spark-warehouse &&
chown spark /home/spark/spark-warehouse &&
chgrp users /home/spark/spark-warehouse &&
echo "spark ALL=(ALL) NOPASSWD: ALL" > /etc/sudoers &&
chmod 0440 /etc/sudoers
USER spark
</code>
FROM quay.io/jupyter/base-notebook
RUN mamba install --yes pandas pyspark[connect] grpcio grpcio-status black jupyterlab_code_formatter &&
mamba clean --all -f -y &&
fix-permissions "${CONDA_DIR}" &&
fix-permissions "/home/${NB_USER}"
USER root
ARG spark_uid=185
RUN groupadd --system --gid=${spark_uid} spark &&
useradd --system --uid=${spark_uid} --gid=spark spark --create-home &&
usermod -a -G users spark &&
mkdir -m 775 /home/spark/spark-warehouse &&
chown spark /home/spark/spark-warehouse &&
chgrp users /home/spark/spark-warehouse &&
echo "spark ALL=(ALL) NOPASSWD: ALL" > /etc/sudoers &&
chmod 0440 /etc/sudoers
USER spark
Connect from Jupiter works (apache or bitmani):
<code>from pyspark.sql import SparkSession
from pathlib import Path
# directory owned by spark (uid: 185)
save_dir = Path.home().joinpath("../spark/spark-warehouse")
with open(save_dir.joinpath("touch_this"), "w") as f:
f.write("touched")
spark = (
SparkSession.builder.appName("arr")
.remote("sc://spark-connect-bitnami:15002")
.config("spark.sql.warehouse.dir", save_dir)
.enableHiveSupport()
.getOrCreate()
)
columns = ["id","name"]
data = [(1,"Sarah"),(2,"Maria")]
df = spark.createDataFrame(data).toDF(*columns)
df.show()
</code>
<code>from pyspark.sql import SparkSession
from pathlib import Path
# directory owned by spark (uid: 185)
save_dir = Path.home().joinpath("../spark/spark-warehouse")
with open(save_dir.joinpath("touch_this"), "w") as f:
f.write("touched")
spark = (
SparkSession.builder.appName("arr")
.remote("sc://spark-connect-bitnami:15002")
.config("spark.sql.warehouse.dir", save_dir)
.enableHiveSupport()
.getOrCreate()
)
columns = ["id","name"]
data = [(1,"Sarah"),(2,"Maria")]
df = spark.createDataFrame(data).toDF(*columns)
df.show()
</code>
from pyspark.sql import SparkSession
from pathlib import Path
# directory owned by spark (uid: 185)
save_dir = Path.home().joinpath("../spark/spark-warehouse")
with open(save_dir.joinpath("touch_this"), "w") as f:
f.write("touched")
spark = (
SparkSession.builder.appName("arr")
.remote("sc://spark-connect-bitnami:15002")
.config("spark.sql.warehouse.dir", save_dir)
.enableHiveSupport()
.getOrCreate()
)
columns = ["id","name"]
data = [(1,"Sarah"),(2,"Maria")]
df = spark.createDataFrame(data).toDF(*columns)
df.show()
write()
always fails
<code>df.write.save(f'{save_dir.joinpath("test.parquet")}')
</code>
<code>df.write.save(f'{save_dir.joinpath("test.parquet")}')
</code>
df.write.save(f'{save_dir.joinpath("test.parquet")}')
<code>---------------------------------------------------------------------------
SparkConnectGrpcException Traceback (most recent call last)
Cell In[2], line 1
----> 1 df.write.save(f'{save_dir.joinpath("test.parquet")}')
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/readwriter.py:601, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
599 self.format(format)
600 self._write.path = path
--> 601 self._spark.client.execute_command(self._write.command(self._spark.client))
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:982, in SparkConnectClient.execute_command(self, command)
980 req.user_context.user_id = self._user_id
981 req.plan.command.CopyFrom(command)
--> 982 data, _, _, _, properties = self._execute_and_fetch(req)
983 if data is not None:
984 return (data.to_pandas(), properties)
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1283, in SparkConnectClient._execute_and_fetch(self, req, self_destruct)
1280 schema: Optional[StructType] = None
1281 properties: Dict[str, Any] = {}
-> 1283 for response in self._execute_and_fetch_as_iterator(req):
1284 if isinstance(response, StructType):
1285 schema = response
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1264, in SparkConnectClient._execute_and_fetch_as_iterator(self, req)
1262 yield from handle_response(b)
1263 except Exception as error:
-> 1264 self._handle_error(error)
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1503, in SparkConnectClient._handle_error(self, error)
1490 """
1491 Handle errors that occur during RPC calls.
1492
(...)
1500 Throws the appropriate internal Python exception.
1501 """
1502 if isinstance(error, grpc.RpcError):
-> 1503 self._handle_rpc_error(error)
1504 elif isinstance(error, ValueError):
1505 if "Cannot invoke RPC" in str(error) and "closed" in str(error):
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1539, in SparkConnectClient._handle_rpc_error(self, rpc_error)
1537 info = error_details_pb2.ErrorInfo()
1538 d.Unpack(info)
-> 1539 raise convert_exception(info, status.message) from None
1541 raise SparkConnectGrpcException(status.message) from None
1542 else:
SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 16) (6d6e48933499 executor driver): java.io.IOException: Mkdirs failed to create file:/home/spark/spark-warehouse/test.parquet/_temporary/0/_temporary/attempt_20240806134346555678479421024381_0007_m_000000_16 (exists=false, cwd=file:/opt/bitnami/spark)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:347)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:314)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:484)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:422)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:411)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$$anon$1.newInstance(ParquetUtils.scala:490)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:389)
at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
at or...
</code>
<code>---------------------------------------------------------------------------
SparkConnectGrpcException Traceback (most recent call last)
Cell In[2], line 1
----> 1 df.write.save(f'{save_dir.joinpath("test.parquet")}')
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/readwriter.py:601, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
599 self.format(format)
600 self._write.path = path
--> 601 self._spark.client.execute_command(self._write.command(self._spark.client))
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:982, in SparkConnectClient.execute_command(self, command)
980 req.user_context.user_id = self._user_id
981 req.plan.command.CopyFrom(command)
--> 982 data, _, _, _, properties = self._execute_and_fetch(req)
983 if data is not None:
984 return (data.to_pandas(), properties)
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1283, in SparkConnectClient._execute_and_fetch(self, req, self_destruct)
1280 schema: Optional[StructType] = None
1281 properties: Dict[str, Any] = {}
-> 1283 for response in self._execute_and_fetch_as_iterator(req):
1284 if isinstance(response, StructType):
1285 schema = response
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1264, in SparkConnectClient._execute_and_fetch_as_iterator(self, req)
1262 yield from handle_response(b)
1263 except Exception as error:
-> 1264 self._handle_error(error)
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1503, in SparkConnectClient._handle_error(self, error)
1490 """
1491 Handle errors that occur during RPC calls.
1492
(...)
1500 Throws the appropriate internal Python exception.
1501 """
1502 if isinstance(error, grpc.RpcError):
-> 1503 self._handle_rpc_error(error)
1504 elif isinstance(error, ValueError):
1505 if "Cannot invoke RPC" in str(error) and "closed" in str(error):
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1539, in SparkConnectClient._handle_rpc_error(self, rpc_error)
1537 info = error_details_pb2.ErrorInfo()
1538 d.Unpack(info)
-> 1539 raise convert_exception(info, status.message) from None
1541 raise SparkConnectGrpcException(status.message) from None
1542 else:
SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 16) (6d6e48933499 executor driver): java.io.IOException: Mkdirs failed to create file:/home/spark/spark-warehouse/test.parquet/_temporary/0/_temporary/attempt_20240806134346555678479421024381_0007_m_000000_16 (exists=false, cwd=file:/opt/bitnami/spark)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:347)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:314)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:484)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:422)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:411)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$$anon$1.newInstance(ParquetUtils.scala:490)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:389)
at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
at or...
</code>
---------------------------------------------------------------------------
SparkConnectGrpcException Traceback (most recent call last)
Cell In[2], line 1
----> 1 df.write.save(f'{save_dir.joinpath("test.parquet")}')
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/readwriter.py:601, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
599 self.format(format)
600 self._write.path = path
--> 601 self._spark.client.execute_command(self._write.command(self._spark.client))
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:982, in SparkConnectClient.execute_command(self, command)
980 req.user_context.user_id = self._user_id
981 req.plan.command.CopyFrom(command)
--> 982 data, _, _, _, properties = self._execute_and_fetch(req)
983 if data is not None:
984 return (data.to_pandas(), properties)
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1283, in SparkConnectClient._execute_and_fetch(self, req, self_destruct)
1280 schema: Optional[StructType] = None
1281 properties: Dict[str, Any] = {}
-> 1283 for response in self._execute_and_fetch_as_iterator(req):
1284 if isinstance(response, StructType):
1285 schema = response
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1264, in SparkConnectClient._execute_and_fetch_as_iterator(self, req)
1262 yield from handle_response(b)
1263 except Exception as error:
-> 1264 self._handle_error(error)
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1503, in SparkConnectClient._handle_error(self, error)
1490 """
1491 Handle errors that occur during RPC calls.
1492
(...)
1500 Throws the appropriate internal Python exception.
1501 """
1502 if isinstance(error, grpc.RpcError):
-> 1503 self._handle_rpc_error(error)
1504 elif isinstance(error, ValueError):
1505 if "Cannot invoke RPC" in str(error) and "closed" in str(error):
File /opt/conda/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1539, in SparkConnectClient._handle_rpc_error(self, rpc_error)
1537 info = error_details_pb2.ErrorInfo()
1538 d.Unpack(info)
-> 1539 raise convert_exception(info, status.message) from None
1541 raise SparkConnectGrpcException(status.message) from None
1542 else:
SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 16) (6d6e48933499 executor driver): java.io.IOException: Mkdirs failed to create file:/home/spark/spark-warehouse/test.parquet/_temporary/0/_temporary/attempt_20240806134346555678479421024381_0007_m_000000_16 (exists=false, cwd=file:/opt/bitnami/spark)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:347)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:314)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:484)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:422)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:411)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$$anon$1.newInstance(ParquetUtils.scala:490)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:389)
at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
at or...
I have tried multiple things
- use same UID in spark-connect container and Jupiter container are the same
- set ACLs on directory structure being used
- set work access
This should be simple, in reality I want to use spark as ETL for a golang application. However I can’t even get it to work with python.
What configuration is required to make persistence work?