I am just trying to use Rate with Structured Streaming, so as to write to multiple tables names per MicroBatch. I.e. just refreshing multiple sinks logic in prep for some certification, in pyspark.
No errors, but there is no persistence occuring. A while since I looked; must be something basic.
Coding as follows on Databricks Community Edition, no Hive Catalog. Basic stuff.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit
spark = SparkSession.builder
.appName("SimulateKAFKAandMultipleSinks")
.getOrCreate()
rate_stream = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
message_stream = rate_stream.select(
(rate_stream["value"] + 1000).alias("message_id"),
rate_stream["timestamp"].alias("event_time"),
(concat(lit("T"), rate_stream["value"] % 5)).alias("table_id")
)
def append_to_parquet(df, table_id):
table_path = f"/mnt/parquet/{table_id}"
df.write
.format("parquet")
.mode("append")
.option("path", table_path)
.save()
def process_batch(df, batch_id):
partitioned_df = df.repartition("table_id")
def process_partition(iterator):
for partition in iterator:
table_id_value = partition[0]['table_id']
print(f"Writing partition for table_id: {table_id_value}")
partition_df = partition.filter(col("table_id") == table_id_value)
append_to_parquet(partition_df, table_id_value)
partitioned_df.rdd.mapPartitions(process_partition)
query = message_stream.writeStream
.foreachBatch(process_batch)
.outputMode("append")
.option("checkpointLocation", "/mnt/parquet/checkpoints/")
.start()
query.awaitTermination()