I try to handle exception after flatMap (Commented as // Error handler after flatMap). This handler handles only one excetion and then i get [ERROR] (boundedElastic-5) Operator called default onErrorDropped – java.lang.Exception: ER
java.lang.Exception: ER.
AtomicReference<Consumer<String>> onEventListener = new AtomicReference<>();
var t1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
onEventListener.get().accept(Integer.valueOf(i).toString());
}
});
var t2 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
onEventListener.get().accept(Integer.valueOf(i).toString());
}
});
var f = Flux.<String>create(emitter -> {
Consumer<String> al = emitter::next;
onEventListener.set(al);
System.out.println("Generator initialized!");
}, FluxSink.OverflowStrategy.BUFFER)
.groupBy(els -> els)
.flatMap(els -> els.publishOn(Schedulers.boundedElastic())
.concatMap(el -> {
System.out.println(Thread.currentThread().getName() + " : " + el);
return Mono.error(new Exception());
})
.onErrorResume(ex -> {
System.out.println(Thread.currentThread().getName() + " INSIDE: " + ex);
return Mono.error(new Exception("ER"));
}))
.onErrorResume(ex -> {
System.out.println(Thread.currentThread().getName() + " AFTER: " + ex);
return Mono.error(new Exception("ER"));
});
f.subscribe(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " Event handled: " + s);
}, e -> {
}, () -> {
});
t1.start();
t2.start();
System.out.println(Thread.currentThread().getName() + " Hello World!");
Thread.sleep(10000);
System.out.println("END");
I can handle exception inside flatMap by returning Mono.empty(), but i want to handle excption after flatMap.
There are next solutions:
- Returning Mono.empty() inside flatMap
- flatMapDelayError insted of flatMap
Is there any way to use flatMap and handle excptions after it?
Output:
Generator initialized!
main Hello World!
boundedElastic-2 : 1
boundedElastic-5 : 4
boundedElastic-3 : 2
boundedElastic-4 : 3
boundedElastic-1 : 0
boundedElastic-1 INSIDE: java.lang.Exception
boundedElastic-5 INSIDE: java.lang.Exception
boundedElastic-4 INSIDE: java.lang.Exception
boundedElastic-2 INSIDE: java.lang.Exception
boundedElastic-3 INSIDE: java.lang.Exception
boundedElastic-1 AFTER: java.lang.Exception:
[ERROR] (boundedElastic-5) Operator called default onErrorDropped - java.lang.Exception: ER
java.lang.Exception: ER
[ERROR] (boundedElastic-4) Operator called default onErrorDropped - java.lang.Exception: ER
java.lang.Exception: ER
[ERROR] (boundedElastic-3) Operator called default onErrorDropped - java.lang.Exception: ER
java.lang.Exception: ER
[ERROR] (boundedElastic-2) Operator called default onErrorDropped - java.lang.Exception: ER
java.lang.Exception: ER
END