So I have an application running on:
- spring-boot-starter-parent:3.3.5
- spring-statemachine-core:4.0.0
I use this application mainly to run a “state-machine” once through to do a DB migration.
The state-machine runs through the states and at several points it starts (a lot) of CompletableFuture
s in order to do tasks in parallel.
Somehow this seams to occupy all of the threads/cpus available.
Here’s some example code:
State-Machine config:
@EnableStateMachine
@Configuration
public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<State, Event> {
@Override
public void configure(StateMachineConfigurationConfigurer<State, Event> config) throws Exception {
config
.withConfiguration()
.autoStartup(true)
.transitionConflictPolicy(TransitionConflictPolicy.CHILD)
.regionExecutionPolicy(RegionExecutionPolicy.PARALLEL);
}
@Override
public void configure(StateMachineStateConfigurer<State, Event> states) throws Exception {
states
.withStates()
.initial(State.INITIAL)
.fork(State.PREPARE_MIGRATION)
.join(State.READY_FOR_MIGRATION)
.stateExit(State.TRANSFORMATION, transformationAction)
.stateExit(State.MIGRATE_TABLES, migrateTableAction)
.stateExit(State.COPY_DATA, dataCopyAction)
.fork(State.COPY_SECONDARY_ELEMENTS)
.join(State.SECONDARY_ELEMENTS_MIGRATED)
.stateExit(State.VALIDATION, validationAction)
.end(State.FINISHED)
...
}
...
}
Action:
@Override
public Map<String, Long> copyData(...) {
final List<CompletableFuture<Statistic>> futures = new ArrayList<>(tableConversions.size());
for (Table table : tables) {
...
final CompletableFuture<Statistic> copy = copier.copy(tableEntry, pk, columns);
futures.add(copy);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return ...;
}
DataCopier:
@Async
public CompletableFuture<Statistic> copy(...) {
...
final List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
for (long offset = 0; offset < count; offset += readBatchLength) {
// Create a read future (asynchronous)
final CompletableFuture<List<Row>> readSourceFuture = ...;
// After read, trigger write operations in parallel
final CompletableFuture<Void> writeAfterReadFuture =
readSourceFuture.thenComposeAsync(
sourceRows -> writeTarget(sourceRows, table, columns, writeBatchLength));
writeFutures.add(writeAfterReadFuture);
}
// Combine all read and write futures into one final future
return CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> new Statistic(table.source().name(), count));
}
Scheduled Job:
@Scheduled(cron = "0 * * * * *")
public void readStats() {
if (log.isDebugEnabled()) {
log.debug("reading live statistics");
}
}
First I was confused if I configured it wrong, because the job never got executed, but I discovered if the state-machine finished and all the CompletableFutures are done suddenly the job pops up in the logs…
So that means that the execution of the Scheduled Job is blocked by all the “threads” I start with CompletableFuture
on the ForkJoinPool.commonPool()
. Also it seems the spring state-machine does it’s own thing on an own pool (as the logs look different):
How can I reserve 1 Thread or CPU on my machine for the scheduled job? Or how can I put the job into the pool ignoring FIFO?