I am running Spark jobs on Kubernetes. Usernames are fixed for all users, so I was attempting to set the SPARK_USER
environmental variable so that I could distinguish my jobs from others’ on the Spark History Server. However, I found that setting this env var breaks writing to S3.
Here is a simple example:
from pyspark import SparkConf
from pyspark.sql import SparkSession
os.environ["SPARK_USER"] = "hutch3232"
conf.setAppName('hutch3232 pyspark')
conf.set('spark.hadoop.fs.s3a.endpoint', '<my endpoint>')
conf.set('spark.hadoop.fs.s3a.access.key', '<my access key>')
conf.set('spark.hadoop.fs.s3a.secret.key', '<my secret key>')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.parquet("s3a://my_parquet_read")
df.write.parquet("s3a://my_parquet_write")
<code>import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
os.environ["SPARK_USER"] = "hutch3232"
conf = SparkConf()
conf.setAppName('hutch3232 pyspark')
conf.set('spark.hadoop.fs.s3a.endpoint', '<my endpoint>')
conf.set('spark.hadoop.fs.s3a.access.key', '<my access key>')
conf.set('spark.hadoop.fs.s3a.secret.key', '<my secret key>')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.parquet("s3a://my_parquet_read")
df.write.parquet("s3a://my_parquet_write")
</code>
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
os.environ["SPARK_USER"] = "hutch3232"
conf = SparkConf()
conf.setAppName('hutch3232 pyspark')
conf.set('spark.hadoop.fs.s3a.endpoint', '<my endpoint>')
conf.set('spark.hadoop.fs.s3a.access.key', '<my access key>')
conf.set('spark.hadoop.fs.s3a.secret.key', '<my secret key>')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.parquet("s3a://my_parquet_read")
df.write.parquet("s3a://my_parquet_write")
No errors are thrown. When I navigate to that S3 path the only thing there is _SUCCESS
. If I stop Spark and re-run the above, this time without running this line:
os.environ["SPARK_USER"] = "hutch3232"
It works perfectly, so it really seems to be just the fact that I’ve set this environmental variable. Also tested with CSV and it occurs there as well. I also tried writing locally, instead of to S3, and that did work.
Here are all my other Spark settings:
<code>spark.hadoop.fs.s3a.metadatastore.impl org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore
spark.hadoop.fs.defaultFS file:///
spark.sql.parquet.binaryAsString true
spark.hadoop.fs.s3a.experimental.input.fadvise normal
spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.parquet.compression.codec snappy
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.staging.conflict-mode append
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.kubernetes.driver.volumes.persistentVolumeClaim.datasets.options.claimName datasets
spark.hadoop.fs.s3a.readahead.range 256K
spark.hadoop.fs.s3a.committer.tmp.path file:///mnt/data/scratch/scratch/tmp
spark.hadoop.fs.s3a.committer.staging.tmp.path /mnt/data/scratch/scratch/staging
spark.hadoop.fs.s3a.buffer.dir /mnt/data/scratch/scratch/buffer
spark.kubernetes.driver.volumes.persistentVolumeClaim.datasets.mount.path /mnt/data/scratch/scratch
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 2047m
spark.kryoserializer.buffer 256m
spark.hadoop.fs.s3a.multipart.size 256m
<code>spark.hadoop.fs.s3a.metadatastore.impl org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore
spark.hadoop.fs.defaultFS file:///
spark.sql.parquet.binaryAsString true
spark.hadoop.fs.s3a.experimental.input.fadvise normal
spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.parquet.compression.codec snappy
spark.speculation false
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.staging.conflict-mode append
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.kubernetes.driver.volumes.persistentVolumeClaim.datasets.options.claimName datasets
spark.hadoop.fs.s3a.readahead.range 256K
spark.hadoop.fs.s3a.committer.tmp.path file:///mnt/data/scratch/scratch/tmp
spark.hadoop.fs.s3a.committer.staging.tmp.path /mnt/data/scratch/scratch/staging
spark.hadoop.fs.s3a.buffer.dir /mnt/data/scratch/scratch/buffer
spark.kubernetes.driver.volumes.persistentVolumeClaim.datasets.mount.path /mnt/data/scratch/scratch
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 2047m
spark.kryoserializer.buffer 256m
spark.hadoop.fs.s3a.multipart.size 256m
</code>
spark.hadoop.fs.s3a.metadatastore.impl org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore
spark.hadoop.fs.defaultFS file:///
spark.sql.parquet.binaryAsString true
spark.hadoop.fs.s3a.experimental.input.fadvise normal
spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.parquet.compression.codec snappy
spark.speculation false
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.staging.conflict-mode append
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.kubernetes.driver.volumes.persistentVolumeClaim.datasets.options.claimName datasets
spark.hadoop.fs.s3a.readahead.range 256K
spark.hadoop.fs.s3a.committer.tmp.path file:///mnt/data/scratch/scratch/tmp
spark.hadoop.fs.s3a.committer.staging.tmp.path /mnt/data/scratch/scratch/staging
spark.hadoop.fs.s3a.buffer.dir /mnt/data/scratch/scratch/buffer
spark.kubernetes.driver.volumes.persistentVolumeClaim.datasets.mount.path /mnt/data/scratch/scratch
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 2047m
spark.kryoserializer.buffer 256m
spark.hadoop.fs.s3a.multipart.size 256m