I have a large partitioned fact table and a small dimension table.
The partition column of the large table is the key column of the dimension table.
I would like to use the small table to reduce the number of partitions I read from the large table.
There is no condition other than that there has to be a small table record corresponding to each large table partition.
Both ‘INNER JOIN’ and ‘LEFT SEMI JOIN’ are acceptable here.
My problem is that I am not able to get dynamic partition pruning to kick in.
No matter what I do, all partitions of the large table are read.
How do I suggest to the query analyzer to use dynamic partition pruning?
Before I go into the details of my minimal working example, another observation:
When I research dynamic partition pruning, all examples usually include a filter condition on the dimension table.
When I set a non-selective filter condition on the small table in my minimal working example, dynamic partition pruning kicks in.
Before I translate this to my production use case, I need to better understand what happens under the hood.
Minimal working example
Code
import time
import requests
import pyspark.sql as ps
spark = ps.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Create a new schema with every script run.
ts = time.monotonic_ns()
schema_name = f"foo_{ts}"
spark.sql(f"CREATE DATABASE {schema_name}")
# Create large partitioned facts table.
records = [
{"fact_id": i + 1, "fact_partition": i + 1}
for i in range(1000)
]
spark.createDataFrame(
[
{"fact_id": i + 1, "fact_partition": i + 1}
for i in range(1000)
],
).write.partitionBy(
"fact_partition",
).saveAsTable(
f"{schema_name}.fact_table",
)
fact_table = spark.table(f"{schema_name}.fact_table")
# Create small dimension table.
records = [{"dim_key": 1, "dim_val": 1}]
spark.createDataFrame(
[{"dim_key": 1, "dim_val": 1}],
).write.saveAsTable(
f"{schema_name}.dim_table",
)
dim_table = spark.table(f"{schema_name}.dim_table")
# Transform dataframe, and trigger action.
res = fact_table.join(
dim_table,
fact_table["fact_partition"] == dim_table["dim_key"],
how = "left_semi",
)
res.collect()
# Look up rows read in explanation.
res.explain("cost")
# Look up rows read in metrics.
apps = requests.get("http://localhost:4040/api/v1/applications").json()
stages = requests.get(f"http://localhost:4040/api/v1/applications/{apps[0]['id']}/stages").json()
for stage_it in sorted(stages, key=lambda x: x["stageId"]):
print(
"{}t{}t{}".format(
stage_it['inputRecords'],
stage_it['name'],
stage_it['details'].split('n')[0],
),
)
Execution
>>> python3 foo.py
== Optimized Logical Plan ==
Join LeftSemi, (fact_partition#11L = dim_key#20L), Statistics(sizeInBytes=481.4 KiB)
:- Filter isnotnull(fact_partition#11L), Statistics(sizeInBytes=481.4 KiB)
: +- Relation foo_37205255539283.fact_table[fact_id#10L,fact_partition#11L] parquet, Statistics(sizeInBytes=481.4 KiB)
+- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
+- Filter isnotnull(dim_key#20L), Statistics(sizeInBytes=1139.0 B)
+- Relation foo_37205255539283.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(2) BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
:- *(2) ColumnarToRow
: +- FileScan parquet foo_37205255539283.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [isnotnull(fact_partition#11L)], PushedFilters: [], ReadSchema: struct<fact_id:bigint>
+- BroadcastQueryStage 0
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=57]
+- *(1) Filter isnotnull(dim_key#20L)
+- *(1) ColumnarToRow
+- FileScan parquet foo_37205255539283.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/hansyu/Scratch/Cell/compass-spark-ops/spark-warehouse/foo_3..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint>
+- == Initial Plan ==
BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
:- FileScan parquet foo_37205255539283.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [isnotnull(fact_partition#11L)], PushedFilters: [], ReadSchema: struct<fact_id:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=44]
+- Filter isnotnull(dim_key#20L)
+- FileScan parquet foo_37205255539283.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint>
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 collect at foo.py:46 org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)
1 collect at foo.py:46 org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
1000 collect at foo.py:46 org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)
Comments
Both the query explanation and the Spark metrics tell me that all 1000 partitions in fact_table
are read.
How do I get the query analyzer to (i) take the key columns of the small table (dim_key = 1
), and (ii) prune the partitions of the large table (fact_partition = 1
)?
What did I try?
Non-selective filter statement on small table
# Transform dataframe, and trigger action.
res = fact_table.join(
dim_table.filter(dim_table["dim_val"] > 0),
fact_table["fact_partition"] == dim_table["dim_key"],
how = "left_semi",
)
res.collect()
>>> python3 foo.py
== Optimized Logical Plan ==
Join LeftSemi, (fact_partition#11L = dim_key#20L), Statistics(sizeInBytes=481.4 KiB)
:- Filter (isnotnull(fact_partition#11L) AND dynamicpruning#30 [fact_partition#11L]), Statistics(sizeInBytes=481.4 KiB)
: : +- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
: : +- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L)), Statistics(sizeInBytes=1139.0 B)
: : +- Relation foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)
: +- Relation foo_38072275409858.fact_table[fact_id#10L,fact_partition#11L] parquet, Statistics(sizeInBytes=481.4 KiB)
+- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
+- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L)), Statistics(sizeInBytes=1139.0 B)
+- Relation foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(2) BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
:- *(2) ColumnarToRow
: +- FileScan parquet foo_38072275409858.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [isnotnull(fact_partition#11L), dynamicpruningexpression(fact_partition#11L IN dynamicpruning#30)], PushedFilters: [], ReadSchema: struct<fact_id:bigint>
: +- SubqueryBroadcast dynamicpruning#30, 0, [dim_key#20L], [id=#96]
: +- AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
BroadcastQueryStage 1
+- ReusedExchange [dim_key#20L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=68]
+- == Initial Plan ==
BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=88]
+- Project [dim_key#20L]
+- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L))
+- FileScan parquet foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] Batched: true, DataFilters: [isnotnull(dim_val#21L), (dim_val#21L > 0), isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_val), GreaterThan(dim_val,0), IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint,dim_val:bigint>
+- BroadcastQueryStage 0
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=68]
+- *(1) Project [dim_key#20L]
+- *(1) Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L))
+- *(1) ColumnarToRow
+- FileScan parquet foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] Batched: true, DataFilters: [isnotnull(dim_val#21L), (dim_val#21L > 0), isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_val), GreaterThan(dim_val,0), IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint,dim_val:bigint>
+- == Initial Plan ==
BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
:- FileScan parquet foo_38072275409858.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [isnotnull(fact_partition#11L), dynamicpruningexpression(fact_partition#11L IN dynamicpruning#30)], PushedFilters: [], ReadSchema: struct<fact_id:bigint>
: +- SubqueryAdaptiveBroadcast dynamicpruning#30, 0, true, Project [dim_key#20L], [dim_key#20L]
: +- AdaptiveSparkPlan isFinalPlan=false
: +- Project [dim_key#20L]
: +- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L))
: +- FileScan parquet foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] Batched: true, DataFilters: [isnotnull(dim_val#21L), (dim_val#21L > 0), isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_val), GreaterThan(dim_val,0), IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint,dim_val:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=51]
+- Project [dim_key#20L]
+- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L))
+- FileScan parquet foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] Batched: true, DataFilters: [isnotnull(dim_val#21L), (dim_val#21L > 0), isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_val), GreaterThan(dim_val,0), IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint,dim_val:bigint>
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 collect at foo.py:46 org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)
1 collect at foo.py:46 org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
1 collect at foo.py:46 org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)
Dynamic partition pruning kicks in!
Non-selective filter statement on small table (not dimension key/partition column)
# Transform dataframe, and trigger action.
res = fact_table.join(
dim_table.filter(dim_table["dim_key"] > 0),
fact_table["fact_partition"] == dim_table["dim_key"],
how = "left_semi",
)
res.collect()
>>> python3 foo.py
== Optimized Logical Plan ==
Join LeftSemi, (fact_partition#11L = dim_key#20L), Statistics(sizeInBytes=481.4 KiB)
:- Filter (((fact_partition#11L > 0) AND isnotnull(fact_partition#11L)) AND dynamicpruning#30 [fact_partition#11L]), Statistics(sizeInBytes=481.4 KiB)
: : +- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
: : +- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0)), Statistics(sizeInBytes=1139.0 B)
: : +- Relation foo_38346293807150.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)
: +- Relation foo_38346293807150.fact_table[fact_id#10L,fact_partition#11L] parquet, Statistics(sizeInBytes=481.4 KiB)
+- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
+- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0)), Statistics(sizeInBytes=1139.0 B)
+- Relation foo_38346293807150.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(2) BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
:- *(2) ColumnarToRow
: +- FileScan parquet foo_38346293807150.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [(fact_partition#11L > 0), isnotnull(fact_partition#11L), dynamicpruningexpression(fact_partition..., PushedFilters: [], ReadSchema: struct<fact_id:bigint>
: +- SubqueryBroadcast dynamicpruning#30, 0, [dim_key#20L], [id=#89]
: +- AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
BroadcastQueryStage 1
+- ReusedExchange [dim_key#20L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=65]
+- == Initial Plan ==
BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=83]
+- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0))
+- FileScan parquet foo_38346293807150.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L), (dim_key#20L > 0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key), GreaterThan(dim_key,0)], ReadSchema: struct<dim_key:bigint>
+- BroadcastQueryStage 0
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=65]
+- *(1) Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0))
+- *(1) ColumnarToRow
+- FileScan parquet foo_38346293807150.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L), (dim_key#20L > 0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key), GreaterThan(dim_key,0)], ReadSchema: struct<dim_key:bigint>
+- == Initial Plan ==
BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
:- FileScan parquet foo_38346293807150.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [(fact_partition#11L > 0), isnotnull(fact_partition#11L), dynamicpruningexpression(fact_partition..., PushedFilters: [], ReadSchema: struct<fact_id:bigint>
: +- SubqueryAdaptiveBroadcast dynamicpruning#30, 0, true, Project [dim_key#20L], [dim_key#20L]
: +- AdaptiveSparkPlan isFinalPlan=false
: +- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0))
: +- FileScan parquet foo_38346293807150.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L), (dim_key#20L > 0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key), GreaterThan(dim_key,0)], ReadSchema: struct<dim_key:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=52]
+- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0))
+- FileScan parquet foo_38346293807150.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L), (dim_key#20L > 0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key), GreaterThan(dim_key,0)], ReadSchema: struct<dim_key:bigint>
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 saveAsTable at NativeMethodAccessorImpl.java:0 org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0 collect at foo.py:46 org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)
1 collect at foo.py:46 org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
1 collect at foo.py:46 org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)
Dynamic partition pruning kicks in!
Compute statistics
# Create large partitioned facts table.
records = [
{"fact_id": i + 1, "fact_partition": i + 1}
for i in range(1000)
]
spark.createDataFrame(
[
{"fact_id": i + 1, "fact_partition": i + 1}
for i in range(1000)
],
).write.partitionBy(
"fact_partition",
).saveAsTable(
f"{schema_name}.fact_table",
)
spark.sql(f"ANALYZE TABLE {schema_name}.fact_table COMPUTE STATISTICS FOR ALL COLUMNS")
fact_table = spark.table(f"{schema_name}.fact_table")
# Create small dimension table.
records = [{"dim_key": 1, "dim_val": 1}]
spark.createDataFrame(
[{"dim_key": 1, "dim_val": 1}],
).write.saveAsTable(
f"{schema_name}.dim_table",
)
spark.sql(f"ANALYZE TABLE {schema_name}.dim_table COMPUTE STATISTICS FOR ALL COLUMNS")
dim_table = spark.table(f"{schema_name}.dim_table")
No effect.
Spark configuration
I tried a number cost-based optimizations:
spark = ps.SparkSession.builder.config(
key = "spark.sql.cbo.enabled",
value = True,
).config(
key = "spark.sql.cbo.planStats.enabled",
value = True,
).config(
key = "spark.sql.statistics.histogram.enabled",
value = True,
).getOrCreate()
No effect.
Hans Yu is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.