Consumer
@Getter
public class EventSinkAdapter implements Consumer<FluxSink<String>> {
private FluxSink<String> sink;
@Override
public void accept(FluxSink<String> sink) {
this.sink = sink;
}
public void send(String event) {
this.sink.next(event);
}
@NotNull
@Override
public Consumer<FluxSink<String>> andThen(@NotNull Consumer<? super FluxSink<String>> after) {
return Consumer.super.andThen(after);
}
}
Publisher
A service class that creates and stores the EventSinkAdapter object and does something like `().send(“string-“+counter++);“
Processor
Flux.create(<adapter>)).share()
.filter(Objects::nonNull)
.doOnNext(specificRecord -> metricsService.recordCounter(METRIC_SINK_EVENT_PROCESSOR, TAG_SOURCE, senderConfig.getName(), TAG_OUTCOME, SUCCESS))
.parallel()
.runOn(Schedulers.parallel())
.sequential()
.onBackpressureBuffer(properties.getBackpressureBufferSize(), BufferOverflowStrategy.DROP_OLDEST)
.bufferTimeout(properties.getMaxSize(), properties.getMaxTimeout())
.onErrorContinue((e, o) -> {
log.error("Error while consuming the event from sink. Event: [{}]. Error: [{}]", o, e.getMessage());
metricsService.recordCounter(METRIC_SINK_EVENT_PROCESSOR, TAG_OUTCOME, ERROR, TAG_ERROR, e.getClass().getSimpleName(), TAG_SOURCE, senderConfig.getName());
})
.filter(specificRecords -> !specificRecords.isEmpty())
.doOnDiscard(SpecificRecord.class, specificRecord -> log.error("Dropped audit event: [{}]", specificRecord))
.publishOn(Schedulers.newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "audit-sink-event-processor"))
.subscribe(specificRecords -> {
log.info("Converting events to parquet and uploading to S3 with size: [{}]", specificRecords.size());
// metricsService.recordGauge(METRIC_SINK_EVENT_PROCESSOR_ROLLOVER, specificRecords, TAG_SOURCE, senderConfig.getName());
converterService.convertToParquetAndUpload(specificRecords, FulfilmentAuditEvent.getClassSchema(), properties.getCompressionCodecName(),
properties.getFileName(), senderConfig.getName());
});
Now this is leading to memory leak mostly because the Flux is not discarding the elements already consumed. Any way we can discard the elements consumed from the flux?