Converting code from legacy spark streaming to structured streaming,
- Streaming for 10mins
- Sliding window with 4hrs read and 2hrs of overlap
Streaming code:
# creating spark context
sc = SparkContext(appName = appName)
kafkaParams = {"metadata.broker.list": broker,
"zookeeper.connect": zookeeper,
"group.id": appName}
# streaming for every 10mins
stream = StreamingContext(sc, 10*60)
kafka_stream = KafkaUtils.createDirectStream(stream,
[topic],
kafkaParams,
fromOffsets=fromOffsets)
# parse data to json
parsed = kafka_stream.map(lambda (k, v): json.loads(v))
# sliding window
parsed = parsed.window(4*60*60, 2*60*60)
Tried converting to Structured Streaming:
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", topic)
.load()
# read from stream for 10mins and offset written to checkpoint
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(offset AS STRING)")
.writeStream
.outputMode("append")
.option('checkpointLocation', 'tempCheckpoint')
.option('truncate', 'false')
.trigger(processingTime = '10 minutes')
.foreachBatch(parseDf)
.start()
.awaitTermination()
def parseDf(df, epoch_id):
print("Parsing DataFrame")
df = df.select(col("offset"), from_json(col("value").cast("STRING"), schema).alias("values"))
.withColumn("s", col("values")["sys"])
.withColumn("tg", col("values")["tag"])
.withColumn("t", col("values")["t"])
.withColumn("v", col("values")["v"])
.select(col("sys"), col("tag"), col("t"), col("v"))
# Group the data by window and word and compute the count of each group of system
windowedDF = df
.withWatermark("t", "5 minutes")
rdd = windowedDF.rdd
## map reduce functions
How to implement sliding window in Structured Streaming approach without any aggregate function?