I am trying to create a reactive producer/consumer for a ConcurrentLinkedQueue queue using reactive.
A producer is adding elements to the ConcurrentLinkedQueue queue at its own pace.
The consumer should continuously listen/subscribe to the queue and if/when an element is available, it should emit the element and collect until the total number of elements polled so far reaches the maxSize or a maxTime Duration elapses, and then emit the List of elements all at once. The consumer should not complete/stop/block if there are no elements in the queue.
I have applied a bufferTimeout
for the consumer so that the Flux emits the elements each time the buffer reaches a maximum size OR the maxTime Duration elapses.
consumer
Flux.<SpecificRecord>generate(sink -> {
SpecificRecord specificRecord = <queue>.poll(); // or, <queue>.take() for LinkedBlockingQueue
if (specificRecord != null) {
sink.next(specificRecord);
} else {
sink.complete();
}
})
.filter(Objects::nonNull)
.parallel()
.runOn(Schedulers.parallel())
.sequential()
.bufferTimeout(5, Duration.ofSeconds(10))
.doOnError(throwable -> log.error("Error while sending event to File with error: [{}]", throwable.getMessage()))
.repeat()
.subscribeOn(Schedulers.newSingle("queue-event-processor"))
.subscribe(specificRecords -> {
log.info("Sending events to S3 with size: [{}]", specificRecords.size());
// s3FixedSizeRollingPolicy.convertToParquetAndUpload(specificRecords, FulfilmentAuditEventKey.getClassSchema(), FulfilmentAuditEvent.getClassSchema());
});
The consumer above stops as soon as there are no elements in the queue. I tried using LinkedBlockingQueue
as well but this is also not working. LinkedBlockingQueue
does not execute the subscribe
method at all. Most probably because it keeps blocking at take()
call.