We have a use case to:
Divide a stream into smaller chunks whose size is less than a
threshold value and parallelly process the chunks.
Our current implementation is like this, where counter is a shared variable across all threads in the ForkJoinPool
. The issue with current implementation is that we observe that some of the chunks size is exceeding the threshold value and we doubt if it is due to common shared variable.
AtomicLong counter = new AtomicLong(0L);
StreamEx.of(stream).parallel(FORK_JOIN_POOL).groupRuns((prev, next) -> counter.incrementAndGet() % batchSize != 0)
.forEach(entities -> {
// business logic.
});
Therefore we have tried modifying the condition to below. But we are observing that some of the chunks are still exceeding the batchsize value. Any leads would be helpful. TIA!
ThreadLocal<AtomicLong> threadLocalCounter = ThreadLocal.withInitial(() -> new AtomicLong(0L));
StreamEx.of(stream).parallel(FORK_JOIN_POOL).groupRuns((prev, next) ->
{
AtomicLong counter = threadLocalCounter.get();
return counter.incrementAndGet() % batchSize != 0;
}.forEach(entities -> {
// business logic.
});
You can just zip it with integers and have honest partitioning.
StreamEx.of(stream).zipWith(Stream.iterate(0, a->a+1))
.map(x->x)
.parallel().groupRuns((prev, next) -> next.getValue() % batchSize != 0)
.forEach(entries -> {
var entities = entries.stream().map(x->x.getKey()).toList();
// business logic.
});
If you don’t mind to try other library: abacus-common. This library will let you easily splitting the stream by batch size and specifying the thread number for parallel execution:
StreamEx.of(elements)
.split(batchSize)
.parallel(threadNumber)
.forEach(batchEelements -> {
// TODO
});
(I’m the developer of this library)