New to Pyspark and trying to play with parquet/delta ecosystem.
Trying to write a script that does the following
- Read a csv file into a spark dataframe.
- Save it as parquet file.
- Read the above saved parquet file into a spark dataframe.
- Save it as delta format file/folder.
- Create a delta Table object from the above delta file.
- Update/Append to the table
I am able to do till step 3 but step 4 errors out .
Script
<code>from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import *
from pyspark.sql.types import *
from pyspark import SparkFiles
from pyspark.context import SparkContext
SparkSession.builder.master("local[*]")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
csv_filename = "dataset/BankChurners.csv"
# read csv to spark dataframe
df_csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load(csv_filename)
df_csv.write.parquet ("output/BankChurners_spark.parquet", mode="overwrite")
# read parquet file to spark dataframe
df_parq = spark.read.parquet("output/BankChurners_spark.parquet")
# save to delta format file
df_parq.write.mode(saveMode="overwrite").format("delta").saveAsTable("output/BankChurners_delta_table")
deltaTable = DeltaTable.forPath(spark, "output/BankChurners_delta_table")
# add a record and save delta table
# did not find any example / syntax for adding a record
<code>from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *
from delta.tables import *
from pyspark.sql.types import *
from pyspark import SparkFiles
from pyspark.context import SparkContext
spark = (
SparkSession.builder.master("local[*]")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.getOrCreate()
)
csv_filename = "dataset/BankChurners.csv"
# read csv to spark dataframe
df_csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load(csv_filename)
# save to parquet file
df_csv.write.parquet ("output/BankChurners_spark.parquet", mode="overwrite")
# read parquet file to spark dataframe
df_parq = spark.read.parquet("output/BankChurners_spark.parquet")
# save to delta format file
df_parq.write.mode(saveMode="overwrite").format("delta").saveAsTable("output/BankChurners_delta_table")
# read delta table
deltaTable = DeltaTable.forPath(spark, "output/BankChurners_delta_table")
# add a record and save delta table
# did not find any example / syntax for adding a record
</code>
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *
from delta.tables import *
from pyspark.sql.types import *
from pyspark import SparkFiles
from pyspark.context import SparkContext
spark = (
SparkSession.builder.master("local[*]")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.getOrCreate()
)
csv_filename = "dataset/BankChurners.csv"
# read csv to spark dataframe
df_csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load(csv_filename)
# save to parquet file
df_csv.write.parquet ("output/BankChurners_spark.parquet", mode="overwrite")
# read parquet file to spark dataframe
df_parq = spark.read.parquet("output/BankChurners_spark.parquet")
# save to delta format file
df_parq.write.mode(saveMode="overwrite").format("delta").saveAsTable("output/BankChurners_delta_table")
# read delta table
deltaTable = DeltaTable.forPath(spark, "output/BankChurners_delta_table")
# add a record and save delta table
# did not find any example / syntax for adding a record
Output
<code> 24/05/19 00:13:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "/Users/foobar/workspace/practice/parquet/question.py", line 30, in <module>
df_parq.write.mode(saveMode="overwrite").format("delta").saveAsTable("output/BankChurners_delta_table")
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/sql/readwriter.py", line 1586, in saveAsTable
self._jwrite.saveAsTable(name)
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
raise converted from None
pyspark.errors.exceptions.captured.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near '/'.(line 1, pos 6)
output/BankChurners_delta_table
<code> 24/05/19 00:13:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "/Users/foobar/workspace/practice/parquet/question.py", line 30, in <module>
df_parq.write.mode(saveMode="overwrite").format("delta").saveAsTable("output/BankChurners_delta_table")
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/sql/readwriter.py", line 1586, in saveAsTable
self._jwrite.saveAsTable(name)
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
raise converted from None
pyspark.errors.exceptions.captured.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near '/'.(line 1, pos 6)
== SQL ==
output/BankChurners_delta_table
------^^^
</code>
24/05/19 00:13:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "/Users/foobar/workspace/practice/parquet/question.py", line 30, in <module>
df_parq.write.mode(saveMode="overwrite").format("delta").saveAsTable("output/BankChurners_delta_table")
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/sql/readwriter.py", line 1586, in saveAsTable
self._jwrite.saveAsTable(name)
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
raise converted from None
pyspark.errors.exceptions.captured.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near '/'.(line 1, pos 6)
== SQL ==
output/BankChurners_delta_table
------^^^
Questions:
-
how can I create the delta table?
-
For reading files (csv or parquet) , I noticed two different methods. Which one is preferred?
I. df_csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load(csv_filename)
II. df_csv = spark.read.csv(csv_filename, header=True, inferSchema=True)
-
Is this the correct way to create object of an existing delta table ?
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
-
If yes to above, then how do I append a record to this data table so that it updates the table version ?
-
Are there modules that I need not import?
Feel free to comment on anything else you find incorrect.
Thanks