I am trying to read delta files using pyspark in my local machine but i am facing an issue.
When i am using the following code i am not getting any error as idWithoutTopologyInfo is null.
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
spark = SparkSession.builder.appName("Spark Application Starting").getOrCreate()
print("Started Spark Context: ", spark.sparkContext)
print("spark_packages :- ", spark.sparkContext.getConf().get("spark.jars.packages"))
my_df = spark.sql("select 1 as id")
print(my_df.show())
spark.stop()
now to work with delta file i installed delta-spark using pip then i was also able to create delta table using spark configuration for jar file package of delta format as the following:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
import pyspark.sql.functions as F
spark_app_builder = (SparkSession.builder
.appName("Employee Analysis")
.config("spark.jars.packages", "io.delta:delta-core_2.12:3.2.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(spark_app_builder).getOrCreate()
csv_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(r"""..8AMemployees.csv""")
csv_df = csv_df.withColumnsRenamed({"First Name": "FirstName",
'Start Date' : "StartDate",
"Last Login Time" : "LastLoginTime",
"Bonus %" : "Bonus_Percent",
"Senior Management" : "SeniorManagement"
}
.withColumn("StartDate", F.to_date(F.co("StartDate"), 'M/d/yyyy'))
)
delta_table_path = r"..8AMemployees.delta"
print(delta_table_path)
csv_df.write.mode("overwrite").format("delta").save(delta_table_path) ## all type file formats, and write modes, crud opertaions, acid
print(" Saved to Delta File")
spark.stop()
but after i restarted my pc and and try to run the same code it is throwing me an error as such
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/01 23:00:54 ERROR Inbox: Ignoring error
java.lang.NullPointerException: **Cannot invoke "org.apache.spark.storage.BlockManagerId.executorId()" because "idWithoutTopologyInfo"** is null
at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$register(BlockManagerMasterEndpoint.scala:677)
at org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:133)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
24/09/01 23:00:54 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.spark.storage.BlockManagerId.executorId()" because "idWithoutTopologyInfo" is null
Pyspark Error Message
Added .master("local[*]")
but still no resolution
spark_app_builder = (SparkSession.builder.master("local[*]")
.appName("Employee Analysis")
.config("spark.jars.packages", "io.delta:delta-core_2.12:3.2.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(spark_app_builder).getOrCreate()
If it is the case of local execution, setting the following before spark-submit resolved similar issue for me…
export SPARK_LOCAL_HOSTNAME=localhost
Issue seems to happen when there is VPN in play with the local IP set incorrectly.
It was coming due to wrong spark version as such
.config(“spark.jars.packages”, “io.delta:delta-core_2.12:3.5.2”)
spark_version = 3.5.2
after that it worked fine.
But some days later the issue again came back.