spark version: 3.5.1 local mode
i failed to do watermark + groupBy + count then join.
my first code is: watermark -> join -> groupBy
package com.test.bigdata.example
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.SparkSession
object WatermarkJoinAgg {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Watermark + join + agg")
.getOrCreate()
import spark.implicits._
val testStream1 = spark.readStream
.format("rate")
.option("rowsPerSecond", "10")
.option("numPartitions", "1")
.load()
val testStream2 = spark.readStream
.format("rate")
.option("rowsPerSecond", "10")
.option("numPartitions", "1")
.load()
val impressions = testStream1
.select(
(col("value") + 10).as("impressionAdId"),
col("timestamp").as("impressionTime"))
val clicks = testStream2
.select(col("value").as("clickAdId"), col("timestamp").as("clickTime"))
val impressionsWithWatermark =
impressions.withWatermark("impressionTime", "60 seconds")
val clicksWithWatermark = clicks.withWatermark("clickTime", "60 seconds")
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
impressionTime <= clickTime AND
clickTime <= impressionTime + interval 60 seconds
"""),
"leftOuter")
val adCounts =
joined.groupBy(window(col("clickTime"), "60 seconds"), col("clickAdId")).count()
val query = adCounts.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.option("truncate", false)
.start()
query.awaitTermination()
}
}
it works as expected.
then i try this: withwatermark -> groupBy + Agg -> join
package com.test.bigdata.example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, expr, window}
import org.apache.spark.sql.streaming.Trigger
object WatermarkAggJoin {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Watermark + agg + join")
.getOrCreate()
import spark.implicits._
val testStream1 = spark.readStream
.format("rate")
.option("rowsPerSecond", "10")
.option("numPartitions", "1")
.load()
val testStream2 = spark.readStream
.format("rate")
.option("rowsPerSecond", "10")
.option("numPartitions", "1")
.load()
val impressions = testStream1
.select(
(col("value") + 15).as("impressionAdId"),
col("timestamp").as("impressionTime"))
val clicks = testStream2
.select(col("value").as("clickAdId"), col("timestamp").as("clickTime"))
val impressionsWithWatermark =
impressions.withWatermark("impressionTime", "60 seconds")
val clicksWithWatermark = clicks.withWatermark("clickTime", "60 seconds")
val clicksWindow = clicksWithWatermark
.groupBy(window(col("clickTime"), "60 seconds").alias("window"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window(col("impressionTime"), "60 seconds").alias("window"))
.count()
val result = clicksWindow.join(impressionsWindow, "window", "inner")
val query = result.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.option("truncate", false)
.start()
query.awaitTermination()
}
}
got empty result all the time
+------+-----+-----+
|window|count|count|
+------+-----+-----+
+------+-----+-----+
following the sample code from spark structured streaming programming guide
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
anything i missed?