Dynamic partition pruning between large and small table without extra filter conditions

I have a large partitioned fact table and a small dimension table.
The partition column of the large table is the key column of the dimension table.
I would like to use the small table to reduce the number of partitions I read from the large table.
There is no condition other than that there has to be a small table record corresponding to each large table partition.
Both ‘INNER JOIN’ and ‘LEFT SEMI JOIN’ are acceptable here.

My problem is that I am not able to get dynamic partition pruning to kick in.
No matter what I do, all partitions of the large table are read.
How do I suggest to the query analyzer to use dynamic partition pruning?

Before I go into the details of my minimal working example, another observation:
When I research dynamic partition pruning, all examples usually include a filter condition on the dimension table.
When I set a non-selective filter condition on the small table in my minimal working example, dynamic partition pruning kicks in.
Before I translate this to my production use case, I need to better understand what happens under the hood.

Minimal working example

Code

import time

import requests
import pyspark.sql as ps

spark = ps.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Create a new schema with every script run.
ts = time.monotonic_ns()
schema_name = f"foo_{ts}"
spark.sql(f"CREATE DATABASE {schema_name}")

# Create large partitioned facts table.
records = [
    {"fact_id": i + 1, "fact_partition": i + 1}
    for i in range(1000)
]
spark.createDataFrame(
    [
        {"fact_id": i + 1, "fact_partition": i + 1}
        for i in range(1000)
    ],
).write.partitionBy(
    "fact_partition",
).saveAsTable(
    f"{schema_name}.fact_table",
)
fact_table = spark.table(f"{schema_name}.fact_table")

# Create small dimension table.
records = [{"dim_key": 1, "dim_val": 1}]
spark.createDataFrame(
    [{"dim_key": 1, "dim_val": 1}],
).write.saveAsTable(
    f"{schema_name}.dim_table",
)
dim_table = spark.table(f"{schema_name}.dim_table")

# Transform dataframe, and trigger action.
res = fact_table.join(
    dim_table,
    fact_table["fact_partition"] == dim_table["dim_key"],
    how = "left_semi",
)
res.collect()

# Look up rows read in explanation.
res.explain("cost")

# Look up rows read in metrics.
apps = requests.get("http://localhost:4040/api/v1/applications").json()
stages = requests.get(f"http://localhost:4040/api/v1/applications/{apps[0]['id']}/stages").json()

for stage_it in sorted(stages, key=lambda x: x["stageId"]):
    print(
        "{}t{}t{}".format(
            stage_it['inputRecords'],
            stage_it['name'],
            stage_it['details'].split('n')[0],
        ),
    )

Execution

>>> python3 foo.py
== Optimized Logical Plan ==
Join LeftSemi, (fact_partition#11L = dim_key#20L), Statistics(sizeInBytes=481.4 KiB)
:- Filter isnotnull(fact_partition#11L), Statistics(sizeInBytes=481.4 KiB)
:  +- Relation foo_37205255539283.fact_table[fact_id#10L,fact_partition#11L] parquet, Statistics(sizeInBytes=481.4 KiB)
+- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
   +- Filter isnotnull(dim_key#20L), Statistics(sizeInBytes=1139.0 B)
      +- Relation foo_37205255539283.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet foo_37205255539283.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [isnotnull(fact_partition#11L)], PushedFilters: [], ReadSchema: struct<fact_id:bigint>
   +- BroadcastQueryStage 0
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=57]
         +- *(1) Filter isnotnull(dim_key#20L)
            +- *(1) ColumnarToRow
               +- FileScan parquet foo_37205255539283.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/hansyu/Scratch/Cell/compass-spark-ops/spark-warehouse/foo_3..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint>
+- == Initial Plan ==
   BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
   :- FileScan parquet foo_37205255539283.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [isnotnull(fact_partition#11L)], PushedFilters: [], ReadSchema: struct<fact_id:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=44]
      +- Filter isnotnull(dim_key#20L)
         +- FileScan parquet foo_37205255539283.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint>


0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       collect at foo.py:46    org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)
1       collect at foo.py:46    org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
1000    collect at foo.py:46    org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)

Comments

Both the query explanation and the Spark metrics tell me that all 1000 partitions in fact_table are read.
How do I get the query analyzer to (i) take the key columns of the small table (dim_key = 1), and (ii) prune the partitions of the large table (fact_partition = 1)?

What did I try?

Non-selective filter statement on small table

# Transform dataframe, and trigger action.
res = fact_table.join(
    dim_table.filter(dim_table["dim_val"] > 0),
    fact_table["fact_partition"] == dim_table["dim_key"],
    how = "left_semi",
)
res.collect()
>>> python3 foo.py
== Optimized Logical Plan ==
Join LeftSemi, (fact_partition#11L = dim_key#20L), Statistics(sizeInBytes=481.4 KiB)
:- Filter (isnotnull(fact_partition#11L) AND dynamicpruning#30 [fact_partition#11L]), Statistics(sizeInBytes=481.4 KiB)
:  :  +- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
:  :     +- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L)), Statistics(sizeInBytes=1139.0 B)
:  :        +- Relation foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)
:  +- Relation foo_38072275409858.fact_table[fact_id#10L,fact_partition#11L] parquet, Statistics(sizeInBytes=481.4 KiB)
+- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
   +- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L)), Statistics(sizeInBytes=1139.0 B)
      +- Relation foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet foo_38072275409858.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [isnotnull(fact_partition#11L), dynamicpruningexpression(fact_partition#11L IN dynamicpruning#30)], PushedFilters: [], ReadSchema: struct<fact_id:bigint>
   :        +- SubqueryBroadcast dynamicpruning#30, 0, [dim_key#20L], [id=#96]
   :           +- AdaptiveSparkPlan isFinalPlan=true
               +- == Final Plan ==
                  BroadcastQueryStage 1
                  +- ReusedExchange [dim_key#20L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=68]
               +- == Initial Plan ==
                  BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=88]
                  +- Project [dim_key#20L]
                     +- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L))
                        +- FileScan parquet foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] Batched: true, DataFilters: [isnotnull(dim_val#21L), (dim_val#21L > 0), isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_val), GreaterThan(dim_val,0), IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint,dim_val:bigint>
   +- BroadcastQueryStage 0
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=68]
         +- *(1) Project [dim_key#20L]
            +- *(1) Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L))
               +- *(1) ColumnarToRow
                  +- FileScan parquet foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] Batched: true, DataFilters: [isnotnull(dim_val#21L), (dim_val#21L > 0), isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_val), GreaterThan(dim_val,0), IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint,dim_val:bigint>
+- == Initial Plan ==
   BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
   :- FileScan parquet foo_38072275409858.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [isnotnull(fact_partition#11L), dynamicpruningexpression(fact_partition#11L IN dynamicpruning#30)], PushedFilters: [], ReadSchema: struct<fact_id:bigint>
   :     +- SubqueryAdaptiveBroadcast dynamicpruning#30, 0, true, Project [dim_key#20L], [dim_key#20L]
   :        +- AdaptiveSparkPlan isFinalPlan=false
   :           +- Project [dim_key#20L]
   :              +- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L))
   :                 +- FileScan parquet foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] Batched: true, DataFilters: [isnotnull(dim_val#21L), (dim_val#21L > 0), isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_val), GreaterThan(dim_val,0), IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint,dim_val:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=51]
      +- Project [dim_key#20L]
         +- Filter ((isnotnull(dim_val#21L) AND (dim_val#21L > 0)) AND isnotnull(dim_key#20L))
            +- FileScan parquet foo_38072275409858.dim_table[dim_key#20L,dim_val#21L] Batched: true, DataFilters: [isnotnull(dim_val#21L), (dim_val#21L > 0), isnotnull(dim_key#20L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_val), GreaterThan(dim_val,0), IsNotNull(dim_key)], ReadSchema: struct<dim_key:bigint,dim_val:bigint>


0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       collect at foo.py:46    org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)
1       collect at foo.py:46    org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
1       collect at foo.py:46    org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)

Dynamic partition pruning kicks in!

Non-selective filter statement on small table (not dimension key/partition column)

# Transform dataframe, and trigger action.
res = fact_table.join(
    dim_table.filter(dim_table["dim_key"] > 0),
    fact_table["fact_partition"] == dim_table["dim_key"],
    how = "left_semi",
)
res.collect()
>>> python3 foo.py
== Optimized Logical Plan ==
Join LeftSemi, (fact_partition#11L = dim_key#20L), Statistics(sizeInBytes=481.4 KiB)
:- Filter (((fact_partition#11L > 0) AND isnotnull(fact_partition#11L)) AND dynamicpruning#30 [fact_partition#11L]), Statistics(sizeInBytes=481.4 KiB)
:  :  +- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
:  :     +- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0)), Statistics(sizeInBytes=1139.0 B)
:  :        +- Relation foo_38346293807150.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)
:  +- Relation foo_38346293807150.fact_table[fact_id#10L,fact_partition#11L] parquet, Statistics(sizeInBytes=481.4 KiB)
+- Project [dim_key#20L], Statistics(sizeInBytes=759.0 B)
   +- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0)), Statistics(sizeInBytes=1139.0 B)
      +- Relation foo_38346293807150.dim_table[dim_key#20L,dim_val#21L] parquet, Statistics(sizeInBytes=1139.0 B)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet foo_38346293807150.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [(fact_partition#11L > 0), isnotnull(fact_partition#11L), dynamicpruningexpression(fact_partition..., PushedFilters: [], ReadSchema: struct<fact_id:bigint>
   :        +- SubqueryBroadcast dynamicpruning#30, 0, [dim_key#20L], [id=#89]
   :           +- AdaptiveSparkPlan isFinalPlan=true
               +- == Final Plan ==
                  BroadcastQueryStage 1
                  +- ReusedExchange [dim_key#20L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=65]
               +- == Initial Plan ==
                  BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=83]
                  +- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0))
                     +- FileScan parquet foo_38346293807150.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L), (dim_key#20L > 0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key), GreaterThan(dim_key,0)], ReadSchema: struct<dim_key:bigint>
   +- BroadcastQueryStage 0
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=65]
         +- *(1) Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0))
            +- *(1) ColumnarToRow
               +- FileScan parquet foo_38346293807150.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L), (dim_key#20L > 0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key), GreaterThan(dim_key,0)], ReadSchema: struct<dim_key:bigint>
+- == Initial Plan ==
   BroadcastHashJoin [fact_partition#11L], [dim_key#20L], LeftSemi, BuildRight, false
   :- FileScan parquet foo_38346293807150.fact_table[fact_id#10L,fact_partition#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1000 paths)[file:/..., PartitionFilters: [(fact_partition#11L > 0), isnotnull(fact_partition#11L), dynamicpruningexpression(fact_partition..., PushedFilters: [], ReadSchema: struct<fact_id:bigint>
   :     +- SubqueryAdaptiveBroadcast dynamicpruning#30, 0, true, Project [dim_key#20L], [dim_key#20L]
   :        +- AdaptiveSparkPlan isFinalPlan=false
   :           +- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0))
   :              +- FileScan parquet foo_38346293807150.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L), (dim_key#20L > 0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key), GreaterThan(dim_key,0)], ReadSchema: struct<dim_key:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=52]
      +- Filter (isnotnull(dim_key#20L) AND (dim_key#20L > 0))
         +- FileScan parquet foo_38346293807150.dim_table[dim_key#20L] Batched: true, DataFilters: [isnotnull(dim_key#20L), (dim_key#20L > 0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key), GreaterThan(dim_key,0)], ReadSchema: struct<dim_key:bigint>


0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       saveAsTable at NativeMethodAccessorImpl.java:0  org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
0       collect at foo.py:46    org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)
1       collect at foo.py:46    org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
1       collect at foo.py:46    org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3725)

Dynamic partition pruning kicks in!

Compute statistics

# Create large partitioned facts table.
records = [
    {"fact_id": i + 1, "fact_partition": i + 1}
    for i in range(1000)
]
spark.createDataFrame(
    [
        {"fact_id": i + 1, "fact_partition": i + 1}
        for i in range(1000)
    ],
).write.partitionBy(
    "fact_partition",
).saveAsTable(
    f"{schema_name}.fact_table",
)
spark.sql(f"ANALYZE TABLE {schema_name}.fact_table COMPUTE STATISTICS FOR ALL COLUMNS")
fact_table = spark.table(f"{schema_name}.fact_table")

# Create small dimension table.
records = [{"dim_key": 1, "dim_val": 1}]
spark.createDataFrame(
    [{"dim_key": 1, "dim_val": 1}],
).write.saveAsTable(
    f"{schema_name}.dim_table",
)
spark.sql(f"ANALYZE TABLE {schema_name}.dim_table COMPUTE STATISTICS FOR ALL COLUMNS")
dim_table = spark.table(f"{schema_name}.dim_table")

No effect.

Spark configuration

I tried a number cost-based optimizations:

spark = ps.SparkSession.builder.config(
    key = "spark.sql.cbo.enabled",
    value = True,
).config(
    key = "spark.sql.cbo.planStats.enabled",
    value = True,
).config(
    key = "spark.sql.statistics.histogram.enabled",
    value = True,
).getOrCreate()

No effect.

New contributor

Hans Yu is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật