I encountered an issue while trying to store JSON data as a Delta Lake table using PySpark and Delta Lake.
Here’s my code:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta import *
delta_version = "2.4.0"
spark = SparkSession.builder
.appName("JSONToDeltaLake")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.jars.packages", f"io.delta:delta-core_2.12:{delta_version}")
.getOrCreate()
json_data = """
[
{
"name": "John Doe",
"age": 30,
"city": "New York"
},
{
"name": "Jane Smith",
"age": 25,
"city": "Los Angeles"
}
]
"""
json_path = "example_data.json"
with open(json_path, "w") as file:
file.write(json_data)
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True)
])
try:
df = spark.read.schema(schema).json(json_path)
except Exception as e:
print(f"Error reading JSON file: {e}")
spark.stop()
exit(1)
df.printSchema()
df.show()
delta_path = "example_delta_table"
df.write.format("delta").mode("overwrite").save(delta_path)
delta_table = DeltaTable.forPath(spark, delta_path)
delta_df = delta_table.toDF()
delta_df.show()
spark.stop()
This code generates example JSON data, saves it to a file, reads the JSON data using PySpark, and then stores it as a Delta Lake table.
However, when I run the code, only null values are being stored in the Delta Lake table.
& C:/Users/no2si/AppData/Local/Programs/Python/Python311/python.exe c:/Users/no2si/Documents/MarketReSearch/TodayhomeScrape/deltalpp.py
:: loading settings :: url = jar:file:/C:/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:Usersno2si.ivy2cache
The jars for the packages stored in: C:Usersno2si.ivy2jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd;1.0
confs: [default]
found io.delta#delta-core_2.12;2.4.0 in central
found io.delta#delta-storage;2.4.0 in central
found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 173ms :: artifacts dl 8ms
:: modules in use:
io.delta#delta-core_2.12;2.4.0 from central in [default]
io.delta#delta-storage;2.4.0 from central in [default]
org.antlr#antlr4-runtime;4.9.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/10ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- city: string (nullable = true)
+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+
24/06/07 13:58:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+
24/06/07 13:58:56 WARN SparkEnv: Exception while deleting Spark temp dir: C:Usersno2siDocumentsMarketReSearchTodayhomeScrapespark_tempspark-eb98b831-105b-46fe-8173-be24ef43c323userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:Usersno2siDocumentsMarketReSearchTodayhomeScrapespark_tempspark-eb98b831-105b-46fe-8173-be24ef43c323userFiles-6cf29488-da35-4e58-8d65-1156838ddd7corg.antlr_antlr4-runtime-4.9.3.jar
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:108)
at org.apache.spark.SparkContext.$anonfun$stop$25(SparkContext.scala:2175)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1509)
at org.apache.spark.SparkContext.stop(SparkContext.scala:2175)
at org.apache.spark.SparkContext.stop(SparkContext.scala:2081)
at org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:550)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:834)
24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:Usersno2siDocumentsMarketReSearchTodayhomeScrapespark_tempspark-eb98b831-105b-46fe-8173-be24ef43c323
java.io.IOException: Failed to delete: C:Usersno2siDocumentsMarketReSearchTodayhomeScrapespark_tempspark-eb98b831-105b-46fe-8173-be24ef43c323userFiles-6cf29488-da35-4e58-8d65-1156838ddd7corg.antlr_antlr4-runtime-4.9.3.jar
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
PS C:Usersno2siDocumentsMarketReSearchTodayhomeScrape> 24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:Usersno2siDocumentsMarketReSearchTodayhomeScrapespark_tempspark-eb98b831-105b-46fe-8173-be24ef43c323userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:Usersno2siDocumentsMarketReSearchTodayhomeScrapespark_tempspark-eb98b831-105b-46fe-8173-be24ef43c323userFiles-6cf29488-da35-4e58-8d65-1156838ddd7corg.antlr_antlr4-runtime-4.9.3.jar
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
SUCCESS: The process with PID 7444 (child process of PID 14792) has been terminated.
SUCCESS: The process with PID 14792 (child process of PID 13788) has been terminated.
SUCCESS: The process with PID 13788 (child process of PID 1864) has been terminated.
What should I modify to resolve this issue? I would appreciate it if you could guide me on how to correctly store the JSON data in the Delta Lake table.
Additionally, I would also appreciate any advice on the causes and solutions for the warning and error messages that appeared in the logs.
Thank you.
I tried to verify that the JSON file was loading correctly and that the specified storage path was valid.
To check if the JSON file was loading properly, I added code to print the contents of the JSON file after saving it. This allowed me to confirm that the JSON data was being written to the file as expected.