I’m trying to read a large zstandard file (~30GB compressed) in Databricks with Spark. It’s a collection of chess games in the following PGN format (.pgn.zst)
[Event "Rated Bullet tournament https://lichess.org/tournament/yc1WW2Ox"]
[Site "https://lichess.org/PpwPOZMq"]
[Date "2017.04.01"]
[Round "-"]
[White "Abbot"]
[Black "Costello"]
[Result "0-1"]
[UTCDate "2017.04.01"]
[UTCTime "11:32:01"]
[WhiteElo "2100"]
[BlackElo "2000"]
[WhiteRatingDiff "-4"]
[BlackRatingDiff "+1"]
[WhiteTitle "FM"]
[ECO "B30"]
[Opening "Sicilian Defense: Old Sicilian"]
[TimeControl "300+0"]
[Termination "Time forfeit"]
1. e4 { [%eval 0.17] [%clk 0:00:30] } 1... c5 { [%eval 0.19] [%clk 0:00:30] }
2. Nf3 { [%eval 0.25] [%clk 0:00:29] } 2... Nc6 { [%eval 0.33] [%clk 0:00:30] }
3. Bc4 { [%eval -0.13] [%clk 0:00:28] } 3... e6 { [%eval -0.04] [%clk 0:00:30] }
4. c3 { [%eval -0.4] [%clk 0:00:27] } 4... b5? { [%eval 1.18] [%clk 0:00:30] }
5. Bb3?! { [%eval 0.21] [%clk 0:00:26] } 5... c4 { [%eval 0.32] [%clk 0:00:29] }
6. Bc2 { [%eval 0.2] [%clk 0:00:25] } 6... a5 { [%eval 0.6] [%clk 0:00:29] }
7. d4 { [%eval 0.29] [%clk 0:00:23] } 7... cxd3 { [%eval 0.6] [%clk 0:00:27] }
8. Qxd3 { [%eval 0.12] [%clk 0:00:22] } 8... Nf6 { [%eval 0.52] [%clk 0:00:26] }
9. e5 { [%eval 0.39] [%clk 0:00:21] } 9... Nd5 { [%eval 0.45] [%clk 0:00:25] }
10. Bg5?! { [%eval -0.44] [%clk 0:00:18] } 10... Qc7 { [%eval -0.12] [%clk 0:00:23] }
11. Nbd2?? { [%eval -3.15] [%clk 0:00:14] } 11... h6 { [%eval -2.99] [%clk 0:00:23] }
12. Bh4 { [%eval -3.0] [%clk 0:00:11] } 12... Ba6? { [%eval -0.12] [%clk 0:00:23] }
13. b3?? { [%eval -4.14] [%clk 0:00:02] } 13... Nf4? { [%eval -2.73] [%clk 0:00:21] } 0-1
To load the file, I am using spark.read.text
. From my understanding, zst files are not splittable so this ends up reading the whole file into a single partition which causes massive spills. I also wanted to apply a pivot transformation to extract each games into a single record. I believe due to me having to go line by line grouping each rows into a game, this step also runs inefficiently on one partition as well.
I’m wondering if there is a better way to do this in Spark, and if not what is a more appropriate tool for a job like this?
Below is my code with a sample df for clarification
df = spark.read.text(file_name).filter(
(col('value') != '') &
(~col('value').like('%UTCTime%')) &
(~col('value').like('%Result%'))
)
value |
---|
[Event “Rated Blitz game”] |
[White “btr18am”] |
[Black “Kozionov_sergey”] |
… |
[Opening “Queen’s Gambit Declined: Exchange Variation, Positional Variation”] |
1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } … 27. Nxf7 { [%clk 0:01:28] } 1-0 |
[Event “Rated UltraBullet tournament https://lichess.org/tournament/5Emn5TK6”] |
[White “rickyrich”] |
[Black “seanysean”] |
… |
[Opening “Alekhine Defense”] |
1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } … 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 |
# Rename the "value" column to "Line"
df = df.withColumnRenamed("value", "Line")
# Extract the "Key" and "Value" columns based on the structure of the "Line" column
df = df.withColumn('Key', when(col('Line').startswith('1.'), 'Moves')
.otherwise(regexp_extract(col('Line'), r'[(.*?)s', 1)))
.withColumn('Value', when(col('Line').startswith('1.'), col('Line'))
.otherwise(regexp_extract(col('Line'), r'"(.*)"', 1)))
# Add a column to identify the start of a game
df = df.withColumn("StartOfGame", when(col("Line").startswith("[Event"), 1).otherwise(0))
# Define a window specification for calculating the cumulative sum
windowSpec = Window.orderBy(monotonically_increasing_id())
# Calculate the cumulative sum of "StartOfGame" to create "GameID"
df = df.withColumn("GameID", sum("StartOfGame").over(windowSpec))
Line | Key | Value | StartOfGame | GameID |
---|---|---|---|---|
[Event “Rated Blitz game”] | Event | Rated Blitz game | 1 | 1 |
[White “btr18am”] | White | btr18am | 0 | 1 |
[Black “Kozionov_sergey”] | Black | Kozionov_sergey | 0 | 1 |
… | … | … | 0 | 1 |
[Opening “Queen’s Gambit Declined: Exchange Variation, Positional Variation”] | Opening | Queen’s Gambit Declined: Exchange Variation, Positional Variation | 0 | 1 |
1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } … 27. Nxf7 { [%clk 0:01:28] } 1-0 | Moves | 1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } … 27. Nxf7 { [%clk 0:01:28] } 1-0 | 0 | 1 |
[Event “Rated UltraBullet tournament https://lichess.org/tournament/5Emn5TK6”] | Event | Rated UltraBullet tournament https://lichess.org/tournament/5Emn5TK6 | 1 | 2 |
[White “rickyrich”] | White | rickyrich | 0 | 2 |
[Black “seanysean”] | Black | seanysean | 0 | 2 |
… | … | … | 0 | 2 |
[Opening “Alekhine Defense”] | Opening | Alekhine Defense | 0 | 2 |
1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } … 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 | Moves | 1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } … 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 | 0 | 2 |
# Define the list of columns for pivoting
col_list = [
'UTCDate', 'Event', 'TimeControl', 'Opening', 'ECO' , 'Site',
'Termination', 'Moves',
'White', 'WhiteTitle', 'WhiteElo', 'WhiteRatingDiff',
'Black', 'BlackTitle', 'BlackElo', 'BlackRatingDiff',
]
# Pivot the DataFrame based on "GameID" and the specified columns
df = df.groupBy("GameID").pivot("Key", col_list).agg(first("Value"))
df = df.filter(col('Moves').contains('%eval'))
df.write.partitionBy('UTCDate').mode("overwrite").parquet(silver_file_path)
GameID | Event | White | Black | … | Opening | Moves |
---|---|---|---|---|---|---|
1 | Rated Blitz game | btr18am | Kozionov_sergey | … | Queen’s Gambit Declined: Exchange Variation, Positional Variation | 1. d4 { [%clk 0:03:00] } d5 { [%clk 0:03:00] } 2. c4 { [%clk 0:03:01] } e6 { [%clk 0:03:01] } … 27. Nxf7 { [%clk 0:01:28] } 1-0 |
2 | Rated UltraBullet tournament https://lichess.org/tournament/5Emn5TK6 | rickyrich | seanysean | … | Alekhine Defense | 1. e4 { [%clk 0:00:15] } Nf6 { [%clk 0:00:15] } 2. e5 { [%clk 0:00:14] } g6 { [%clk 0:00:15] } … 22. Rxe1 { [%clk 0:00:00] } Bc3+ { [%clk 0:00:11] } 0-1 |
I have tried using a bigger cluster with more memory. While it does reduce spills, I’m not sure if this is a more ‘efficient’ way of doing this as it also increaes the number of cores in the cluster which are ultimately unsued.