I’m working with a pyspark dataframe in Azure Databricks and I’m trying to count how many unique (distinct) values a particular column has.
Cluster: 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)
pandas 1.5.3
pyspark 3.3.0
I can .display() and .count() the df without problems, but when I try any of these three commands
df.select("PB_countProducts").distinct().count()
df.collect()
df.show()
I get this error:
SparkException: Job aborted due to stage failure: Task 6 in stage 57.0 failed 4 times, most recent failure: Lost task 6.3 in stage 57.0 (TID 421) (10.43.64.12 executor 0): org.apache.spark.SparkException: Failed to create session
at com.databricks.spark.api.python.IsolatedPythonWorkerFactory.createRawIsolatedWorker(IsolatedPythonWorkerFactory.scala:233)
at com.databricks.spark.api.python.IsolatedPythonWorkerFactory.create(IsolatedPythonWorkerFactory.scala:293)
at org.apache.spark.SparkEnv.createIsolatedPythonWorker(SparkEnv.scala:300)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:325)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:228)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner.compute(PythonUDFRunner.scala:59)
at org.apache.spark.sql.execution.python.BatchEvalPythonEvaluatorFactory.evaluate(BatchEvalPythonExec.scala:80)
at org.apache.spark.sql.execution.python.EvalPythonEvaluatorFactory$EvalPythonPartitionEvaluator.eval(EvalPythonEvaluatorFactory.scala:114)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:77)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2$adapted(EvalPythonExec.scala:76)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:920)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:920)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:88)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: RawInitSandboxError: [IndexOutOfBoundsException] readerIndex(8) + length(2067) exceeds writerIndex(2048): PooledUnsafeDirectByteBuf(ridx: 8, widx: 2048, cap: 2048)
at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:51)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:462)
at com.databricks.spark.api.python.IsolatedPythonWorkerFactory.createRawIsolatedWorker(IsolatedPythonWorkerFactory.scala:230)
... 54 more
Caused by: java.lang.RuntimeException: RawInitSandboxError: [IndexOutOfBoundsException] readerIndex(8) + length(2067) exceeds writerIndex(2048): PooledUnsafeDirectByteBuf(ridx: 8, widx: 2048, cap: 2048)
at daemon.safespark.client.SandboxApiClient.rawInitSandbox(SandboxApiClient.scala:272)
at com.databricks.spark.safespark.ApiAdapter.rawInitSandbox(ApiAdapter.scala:123)
at com.databricks.spark.safespark.udf.DispatcherImpl.$anonfun$createRawConnection$3(DispatcherImpl.scala:142)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:573)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:669)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:687)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:664)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:582)
at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:573)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:542)
at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:27)
at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:68)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:150)
at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:68)
at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:55)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:109)
at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:429)
at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:408)
at com.databricks.spark.safespark.udf.DispatcherImpl.recordOperation(DispatcherImpl.scala:67)
at com.databricks.spark.safespark.udf.DispatcherImpl.recordDispatcherOperation(DispatcherImpl.scala:443)
at com.databricks.spark.safespark.udf.DispatcherImpl.$anonfun$createRawConnection$2(DispatcherImpl.scala:130)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:105)
at scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:372)
at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:371)
at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:379)
at scala.concurrent.impl.Promise.transform(Promise.scala:33)
at scala.concurrent.impl.Promise.transform$(Promise.scala:31)
at scala.concurrent.impl.Promise$KeptPromise$Successful.transform(Promise.scala:379)
at scala.concurrent.Future.map(Future.scala:292)
at scala.concurrent.Future.map$(Future.scala:292)
at scala.concurrent.impl.Promise$KeptPromise$Successful.map(Promise.scala:379)
at scala.concurrent.Future$.apply(Future.scala:659)
at com.databricks.spark.safespark.udf.DispatcherImpl.createRawConnection(DispatcherImpl.scala:169)
at com.databricks.spark.safespark.Dispatcher.createRawConnection(Dispatcher.scala:156)
at com.databricks.spark.api.python.IsolatedPythonWorkerFactory.createRawIsolatedWorker(IsolatedPythonWorkerFactory.scala:228)
... 54 more
Driver stacktrace:
JVM stacktrace:
org.apache.spark.SparkException
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3908)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3830)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3817)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3817)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1695)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1680)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1680)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4154)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4066)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4054)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54)
Caused by: org.apache.spark.SparkException: Failed to create session
I tried to come up with a df_fake to enable you to reproduce the problem, with the same columns and values, but when I create it from data and scheme, all these functions work.
Here’s what I did to create the df_fake:
# Create a Spark session
spark = SparkSession.builder.appName("CreateDeltaTable").getOrCreate()
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
data = [("3bf293f7-f093-4943-9e1f-1e9eb313c9d6", 1),
("f8342388-3b17-4043-8a5a-73c56335181f", 1),
("1a6d1a34-4291-4e24-b00e-45600c04bc55", 2)]
columns = ["RequestId", "PB_countProducts"]
df_fake = spark.createDataFrame(data, columns)
df_fake = df_fake.withColumn("PB_countProducts", F.col("PB_countProducts").cast("integer"))
df_fake.display()
when I display my problematic df (called df) and my fake df (df_fake), they look alike:
df.display()
df_fake.display()
When I print dtypes, they look alike:
print(df.dtypes)
print(df_fake.dtypes)
[(‘RequestId’, ‘string’), (‘PB_countProducts’, ‘int’)]
[(‘RequestId’, ‘string’), (‘PB_countProducts’, ‘int’)]
When I print their scheme, they look alike:
print(df.printSchema())
print(df_fake.printSchema())
root
|– RequestId: string (nullable = true)
|– PB_countProducts: integer (nullable = true)
None
root
|– RequestId: string (nullable = true)
|– PB_countProducts: integer (nullable = true)
None
but when I try .collect() (or any of the mentioned functions), I get errors only for df, while df_fake works fine:
print(df_fake.collect())
[Row(RequestId=’3bf293f7-f093-4943-9e1f-1e9eb313c9d6′, PB_countProducts=1), Row(RequestId=’f8342388-3b17-4043-8a5a-73c56335181f’, PB_countProducts=1), Row(RequestId=’1a6d1a34-4291-4e24-b00e-45600c04bc55′, PB_countProducts=2)]
print(df.collect()) will generate the error above.
Any idea on what I can do? ChatGPT wasn’t very helpful…neither was databrick’s assistant.