When I run the following code:
Dataset<Row> aggStreamA = df
.withWatermark("dateTime", "2 days")
.groupBy(
window(col("dateTime"), windowDuration, slideDuration),
col(colA)
)
.agg(count("*").alias("count_a"));
Dataset<Row> aggStreamB = df
.withWatermark("dateTime", "2 days")
.groupBy(
window(col("dateTime"), windowDuration, slideDuration),
col(colB)
)
.agg(count("*").alias("count_b"));
Dataset<Row> joinedStream = aggStreamA
.join(
aggStreamB,
aggStreamA.col("window.start").equalTo(aggStreamB.col("window.start"))
.and(aggStreamA.col("window.end").equalTo(aggStreamB.col("window.end")))
.and(aggStreamA.col(colA).equalTo(aggStreamB.col(colB)))
,
"leftOuter"
);
StreamingQuery query = joinedStream
.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
I get an error:
Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition
To overcome the problem, I tried this one:
Dataset<Row> aggStreamB = df
.withWatermark("dateTime", "2 days")
.groupBy(
window(col("dateTime"), windowDuration, slideDuration),
col(colB)
)
.agg(count("*").alias("count_b")).withColumn("start", col("window.start"))
.withColumn("start", col("window.start"))
.withWatermark("start", "0 second");
Dataset<Row> joinedStream = aggStreamA.as("a")
.join(
aggStreamB.as("b"),
aggStreamA.col("window.start").equalTo(aggStreamB.col("window.start"))
.and(aggStreamA.col("window.end").equalTo(aggStreamB.col("window.end")))
.and(aggStreamA.col(colA).equalTo(aggStreamB.col(colB)))
.and(expr("a.start >= b.start and a.start <= b.start + interval 2 days"))
,
"leftOuter"
);
StreamingQuery query = joinedStream
.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
But it seems that there is a problem with the second watermarks. When I run the following code:
Dataset<Row> aggStreamA = df
.withWatermark("dateTime", "2 days")
.groupBy(
window(col("dateTime"), windowDuration, slideDuration),
col(colA)
)
.agg(count("*").alias("count_a"))
.withColumn("start", col("window.start"));
StreamingQuery query = aggStreamA
.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
It returns results, but the following code returns nothing:
Dataset<Row> aggStreamA = df
.withWatermark("dateTime", "2 days")
.groupBy(
window(col("dateTime"), windowDuration, slideDuration),
col(colA)
)
.agg(count("*").alias("count_a"))
.withColumn("start", col("window.start"))
.withWatermark("start", "0 second");
StreamingQuery query = aggStreamA
.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
}
Any opinions?
Is it possible to set another watermark after aggregation?