I have some processes that need to run together that they depend on each other and need to run in parallel. But they only need to run once each, and also they need to be cancellable since they are expensive, so if at the end I set a timeout, it should interrupt all ongoin processes.
Here’s an example:
class Example {
public static String execute(String service) {
System.out.println("starting " + service + ", " + Thread.currentThread().getName());
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 2000));
} catch (InterruptedException e) {
System.out.println("interrupted " + service);
return "";
}
System.out.println("finishing " + service);
return "";
}
public static void main(String[] args) throws InterruptedException {
Scheduler scheduler = Schedulers.newBoundedElastic(100, 10, "scheduler");
var start = Mono.just("starting input").subscribeOn(scheduler);
var service1 = start.map(i -> execute("service1"));
var service2 = start.map(i -> execute("service2"));
var service3 = start.map(i -> execute("service3"));
var service4 = start.map(i -> execute("service4"));
var aggregator1 = Mono.zip(start, service1, (i, k) -> execute("aggregator 1"));
var aggregator2 = Mono.zip(start, service1, service2).map(tuple -> execute("aggregator2"));
var aggregator3 = Mono.zip(start, service3, (i, r) -> execute("aggregator 3"));
var last = Mono.zip(start, aggregator1, service4, aggregator2, aggregator3).map(values -> execute("last"));
System.out.println("blocking");
try {
last.timeout(Duration.ofMillis(1000)).block();
} catch (Exception e) {
}
System.out.println("timeout completed");
Thread.sleep(10_000);
System.out.println("final result");
scheduler.dispose();
}
}
When I run this I get:
blocking
starting service1, scheduler-4
starting service4, scheduler-2
starting service2, scheduler-5
starting service1, scheduler-3
starting service3, scheduler-7
interrupted service1
interrupted service1
interrupted service4
interrupted service3
interrupted service2
timeout completed
final result
So service1 is being interrupted but being called twice. I can stop it being called twice by using .cache()
but now the output is:
blocking
starting service4, scheduler-2
starting service1, scheduler-1
starting service2, scheduler-4
starting service3, scheduler-5
interrupted service2
interrupted service3
interrupted service4
timeout completed
finishing service1
final result
Now service1 has run a single time but was not interrupted when the timeout happened.
Is there a way that I can achieve the same chain of events but having both single execution and cancellability?