I have the following consumer:
<code>public Disposable startConsumer() {
this.disposable = Flux.create(senderConfig.getAdapter(), OverflowStrategy.ERROR).share()
.onBackpressureBuffer(properties.getBackpressureBufferSize(), BufferOverflowStrategy.ERROR)
.filter(Objects::nonNull)
.doOnNext(specificRecord -> metricsService.recordCounter(METRIC_SINK_EVENT_PROCESSOR, TAG_SOURCE, senderConfig.getName(), TAG_OUTCOME, SUCCESS))
.parallel()
.runOn(Schedulers.parallel())
.sequential()
.bufferTimeout(properties.getBufferMaxSize(), properties.getBufferMaxTimeout())
.onErrorContinue((e, o) -> {
log.error("Error while consuming the event from sink. Event: [{}]. Error: [{}]", o, e.getMessage());
metricsService.recordCounter(METRIC_SINK_EVENT_PROCESSOR, TAG_OUTCOME, ERROR, TAG_ERROR, e.getClass().getSimpleName(), TAG_SOURCE, senderConfig.getName());
})
.filter(specificRecords -> !specificRecords.isEmpty())
.doOnDiscard(SpecificRecord.class, specificRecord -> log.error("Dropped audit event: [{}]", specificRecord))
.publishOn(Schedulers.newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "request-sink-event-processor"))
.doOnNext(records -> schedulerHookEnabled.compareAndSet(false, true))
.subscribe(records -> {
log.info("Serialising [{}] specific records to AVRO for the source: [{}]", records.size(), senderConfig.getName());
converterService.serializeToAvro(senderConfig, records, configService.getLock());
},
error -> log.error("onError: [{}]", error.getMessage()),
this::processActiveFile
);
return disposable;
}
</code>
<code>public Disposable startConsumer() {
this.disposable = Flux.create(senderConfig.getAdapter(), OverflowStrategy.ERROR).share()
.onBackpressureBuffer(properties.getBackpressureBufferSize(), BufferOverflowStrategy.ERROR)
.filter(Objects::nonNull)
.doOnNext(specificRecord -> metricsService.recordCounter(METRIC_SINK_EVENT_PROCESSOR, TAG_SOURCE, senderConfig.getName(), TAG_OUTCOME, SUCCESS))
.parallel()
.runOn(Schedulers.parallel())
.sequential()
.bufferTimeout(properties.getBufferMaxSize(), properties.getBufferMaxTimeout())
.onErrorContinue((e, o) -> {
log.error("Error while consuming the event from sink. Event: [{}]. Error: [{}]", o, e.getMessage());
metricsService.recordCounter(METRIC_SINK_EVENT_PROCESSOR, TAG_OUTCOME, ERROR, TAG_ERROR, e.getClass().getSimpleName(), TAG_SOURCE, senderConfig.getName());
})
.filter(specificRecords -> !specificRecords.isEmpty())
.doOnDiscard(SpecificRecord.class, specificRecord -> log.error("Dropped audit event: [{}]", specificRecord))
.publishOn(Schedulers.newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "request-sink-event-processor"))
.doOnNext(records -> schedulerHookEnabled.compareAndSet(false, true))
.subscribe(records -> {
log.info("Serialising [{}] specific records to AVRO for the source: [{}]", records.size(), senderConfig.getName());
converterService.serializeToAvro(senderConfig, records, configService.getLock());
},
error -> log.error("onError: [{}]", error.getMessage()),
this::processActiveFile
);
return disposable;
}
</code>
public Disposable startConsumer() {
this.disposable = Flux.create(senderConfig.getAdapter(), OverflowStrategy.ERROR).share()
.onBackpressureBuffer(properties.getBackpressureBufferSize(), BufferOverflowStrategy.ERROR)
.filter(Objects::nonNull)
.doOnNext(specificRecord -> metricsService.recordCounter(METRIC_SINK_EVENT_PROCESSOR, TAG_SOURCE, senderConfig.getName(), TAG_OUTCOME, SUCCESS))
.parallel()
.runOn(Schedulers.parallel())
.sequential()
.bufferTimeout(properties.getBufferMaxSize(), properties.getBufferMaxTimeout())
.onErrorContinue((e, o) -> {
log.error("Error while consuming the event from sink. Event: [{}]. Error: [{}]", o, e.getMessage());
metricsService.recordCounter(METRIC_SINK_EVENT_PROCESSOR, TAG_OUTCOME, ERROR, TAG_ERROR, e.getClass().getSimpleName(), TAG_SOURCE, senderConfig.getName());
})
.filter(specificRecords -> !specificRecords.isEmpty())
.doOnDiscard(SpecificRecord.class, specificRecord -> log.error("Dropped audit event: [{}]", specificRecord))
.publishOn(Schedulers.newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "request-sink-event-processor"))
.doOnNext(records -> schedulerHookEnabled.compareAndSet(false, true))
.subscribe(records -> {
log.info("Serialising [{}] specific records to AVRO for the source: [{}]", records.size(), senderConfig.getName());
converterService.serializeToAvro(senderConfig, records, configService.getLock());
},
error -> log.error("onError: [{}]", error.getMessage()),
this::processActiveFile
);
return disposable;
}
Expectation:
Everytime the bufferTimeout is reached (10000 size and 1 minute timeout), it should pass the list of records to the subscribe method and the code inside the subscribe() should execute on a different thread everytime.
Actual:
Every time request-sink-event-processor-1 thread is being invoked because of which the consumer is slowing down.