Suppose we have a DataStream and it is possiable to attach a MapState to each String element while passing it to downstream. Like:
ds.keyBy(s -> s.hashCode() % 10)
.process(new KeyedProcessFuncion<Integer, String, Tuple2<String, MapState<String,String>>>(){
transient MapState<String,String> map;
public void open(Configuration cfg) {
map = this.getRuntimeContext().getMapState(new MapStateDescription<>("demo", String.class, String.class));
}
public void processElement(String in, Context ctx, Collector<Tuple2<String, MapState<String,String>>> out) throws Exception {
if(map.get("key") == null){
map.put("key", "val");
}
out.collect(new Tuple2<>(in, map));
}
})
.keyBy(t -> t.f0.hashCode() % 10)
.process(/* the demo MapState can be accessed here*/)
The problem is it safe to access the MapState in downstream passed by upstream?