I am writing a Spark Structed Streaming job but find something unexpected.
Here is the sample code.
<code>/* in main func */
val res = ds
.groupByKey(c => c.PartitionKey)
.flatMapGroupsWithState[State, MyOutput](
OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(GroupFunc)
.......
def GroupFunc(
partitionKey: String,
iterator: Iterator[Input],
groupState: GroupState[State]): Iterator[MyOutput] = {
if (groupState.hasTimedOut) {
groupState.remove()
}
if (!iterator.hasNext) {
return Seq.empty[BotInfo].iterator
}
var existGroupState = groupState.getOption.getOrElse(State())
/* do something and create a newState */
groupState.update(newState)
groupState.setTimeoutDuration("10 minutes")
Iterator(myOutput)
}
</code>
<code>/* in main func */
val res = ds
.groupByKey(c => c.PartitionKey)
.flatMapGroupsWithState[State, MyOutput](
OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(GroupFunc)
.......
def GroupFunc(
partitionKey: String,
iterator: Iterator[Input],
groupState: GroupState[State]): Iterator[MyOutput] = {
if (groupState.hasTimedOut) {
groupState.remove()
}
if (!iterator.hasNext) {
return Seq.empty[BotInfo].iterator
}
var existGroupState = groupState.getOption.getOrElse(State())
/* do something and create a newState */
groupState.update(newState)
groupState.setTimeoutDuration("10 minutes")
Iterator(myOutput)
}
</code>
/* in main func */
val res = ds
.groupByKey(c => c.PartitionKey)
.flatMapGroupsWithState[State, MyOutput](
OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(GroupFunc)
.......
def GroupFunc(
partitionKey: String,
iterator: Iterator[Input],
groupState: GroupState[State]): Iterator[MyOutput] = {
if (groupState.hasTimedOut) {
groupState.remove()
}
if (!iterator.hasNext) {
return Seq.empty[BotInfo].iterator
}
var existGroupState = groupState.getOption.getOrElse(State())
/* do something and create a newState */
groupState.update(newState)
groupState.setTimeoutDuration("10 minutes")
Iterator(myOutput)
}
My finding is that in some microbatches, the groupState get removed but I am pretty sure that the group should have the inputs and the groupState shouldn’t be timeout.
I am so curious about why it happens.
My understanding is that in the end of each microbatch, the groupState will be persisted to HDFS (checkpoint path) and reloaded in next microbatch, is this how state persisting works?
Hope anyone can help on this, thx.