So I have an extremely simple streaming pipeline setup using apache flink, streaming data from kinesis, I don’t think this really matters but I am using AWS’s Managed Flink Environment, that said here is my implementation.
Each individual event payload from stream
class Event {
String deviceId;
String metricName;
Double value;
// added for debugging - while I parse the record, the deserializer uses the current system time to automatically add this field.
String receivedTimeStamp;
}
Streaming Pipeline
final DataStream<Event> stream = // create streaming kinesis source
// setting up an extremely simple pipeline
stream.keyBy(Event::getDeviceId)
.window(SlidingProcessingTimeWindows.of(
/*Window length*/ Time.hours(2),
/*Slide length*/ Time.hours(1)))
.process(new EventsWindowFunction());
List<TransformedEvent>
– OUT of my ProcessWindowFunction
public class TransformedEvent {
private String deviceId;
private Double value;
private String metricName;
// added for debugging - this is simply read from Event item from each element in Iterable<Event> iterable
String receivedTimeStamp;
}
ProcessWindowFunction
@Override
public void process(
final String s,
final Context context,
final Iterable<Event> iterable,
final Collector<List< TransformedEvent>> collector) throws Exception {
// using this for logging
final String deviceId = Iterables.get(iterable, 0).getDeviceId();
final List<TransformedEvent> output = new ArrayList<>();
StreamSupport.stream(iterable.spliterator(), false)
.map(/* Transform raw Event to ConsolidatedEvent*/)
.forEach(output::add);
// added logging here to print the entire output for the deviceId (keying by deviceId)
collector.collect(output);
}
Here is what I am noticing with the above implementation:
For lets say a deviceId ABC
I am seeing the following output:
Output at 6AM (window from 4-6am)
List<TransformedEvent>
- deviceId:
ABC
value:0.79
metricName:MetricOne
receivedTimeStamp:5:07 AM
- deviceId:
ABC
value:0.12
metricName:MetricTwo
receivedTimeStamp:4:15 AM
- deviceId:
Output at 7AM (window from 5-7am)
- Nothing for deviceId: ABC
My Question
- According to the documentation on sliding windows shouldn’t
MetricOne
for deviceIdABC
appear in 5-7am window ? As seen above it currently only shows up from 4-6am window and not in the subsequent window (ie 5-7am). But I also noticed that in the 4-6 am window it correctly included events from the previous window (ie 3-5am). So I’m confused. - I am almost certain I’m overlooking some caveats, so looking for any pointers whatsoever. Also the deviceId may not have emitted an event after the last event that was received at 5:07 AM.
- Also as David Anderson always says if you are not seeing expected results switch to using “wallclock time/ingestion time” and I believe using
SlidingProcessingTimeWindows
should be automatically considering ingestion time ? I am suspecting I am misunderstanding something at this specific point, but only my speculation.