so I’m trying to create my own windowing scheme thought the use of unkeyed processFunctions. I’m using a source and would like to use watermarks. My current implementation of watermarks is as follows
this.watermarkStrategy = WatermarkStrategy
.<EventBasic>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
I’ve created my source as follows
DataStream<EventBasic> mainStream = env.readTextFile(csvFilePath)
.map(new MapFunction<String, EventBasic>() {
@Override
public EventBasic map(String line) throws Exception {
String[] parts = line.split(",");
if (parts.length == 3) {
String key = parts[0];
int valueInt = Integer.parseInt(parts[1]);
long valueTimeStamp = Long.parseLong(parts[2]);
return new EventBasic(key, valueInt, valueTimeStamp);
} else {
return null;
}
}
}).setParallelism(3).assignTimestampsAndWatermarks(watermarkStrategy).name("source");
this source function reads a CSV file that has the following format:
key,val,timestamp
A,0,500
C,1,500
A,2,500
A,3,500
A,4,500
B,5,500
A,6,500
H,7,500
...
a,100,1500
With timestamps increasing monotonously
when observing immediately (i created a dummy processfunction to observe that my timestamps were working) I observe the value -9223372036854775808
which constantly. This means that the watermark generation doesn’t know when to add a new watermark.
I’ve also tried the following watermark strategy which lead to the same output:
this.watermarkStrategy = WatermarkStrategy
.<EventBasic>forBoundedOutOfOrderness(Duration.ofMillis(500))
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
I don’t know what my issue could be and I’ve tried looking everywhere but nothing seems to change.