Have developed spring webflux application using java 17 and springboot 3.2,
Have implemented an api to scroll the elastic search index with batch size of 200 records and insert to redis, however elastic search is too fast in reading the data which redis is failing to persist in same speed and resulting in out of memory error.
so need to make implementation in such a way that redis save must be completed before scrolling next set of elements from the elastic search.
current implementation :
Mono<String> scrollIdMono = Mono.fromFuture(searchResponse).flatMap(searchRes -> {
List<> collect = searchRes.hits().hits().stream().map(Hit::source)
.collect(Collectors.toList());
collect.stream().forEach(esCache -> {
Detail Detail = new Detail(esCache.getId(),
LocalDateTime.parse(esCache.getDate()));
List<Detail> Details = new ArrayList<>();
Details.add(Detail);
reactiveRedisOperations.set(esCache.getKey(), Details).subscribe();
});
totalValue = searchRes.hits().total().value();
processedValue = searchRes.hits().hits().size();
return Mono.just(searchRes.scrollId());
});
return scrollIdMono.cache().flatMap(scrollId -> {
while (startTheScroll) {
ScrollRequest scrollRequest = new ScrollRequest.Builder().scrollId(scrollId)
.scroll(time == null ? getScrollTime() : time).build();
CompletableFuture<ScrollResponse<EVENT>> scrollResponse = null;
try {
scrollResponse = elasticsearchAsyncClient.scroll(scrollRequest, EVENT.class);
} catch (ElasticsearchException e) {
log.error("setRedisCacheThroughScroll():scrollRequest : {}, ex: {}", scrollRequest, e.getMessage());
}
Mono<List<EVENT>> EVENTMono = Mono.fromFuture(scrollResponse).cache().flatMap(scrollRes -> {
if (scrollRes != null && scrollRes.hits() != null && !scrollRes.hits().hits().isEmpty()) {
List<EVENT> collect = scrollRes.hits().hits().stream().map(Hit::source)
.collect(Collectors.toList());
processedValue += scrollRes.hits().hits().size();
return Mono.just(collect);
} else {
startTheScroll = false;
return Mono.empty();
}
}).doOnTerminate(() -> {
log.info("{} of items processed out of {} ", processedValue, totalValue);
if (totalValue == processedValue) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest.Builder().scrollId(scrollId)
.build();
CompletableFuture<ClearScrollResponse> clearScrollResponse = null;
try {
clearScrollResponse = elasticsearchAsyncClient.clearScroll(clearScrollRequest);
} catch (ElasticsearchException e) {
log.error(" setRedisCacheThroughScroll():clearScrollRequest : {}, ex: {}",
clearScrollRequest, e.getMessage());
}
Mono.fromFuture(clearScrollResponse).flatMap(clearScrollRes -> {
log.info("******setRedisCacheThroughScroll()***:clearScrollRes : {}, ex: {}",
clearScrollRes.succeeded());
return Mono.just("***********crawlId_cache_completed********* with totalValue: " + totalValue
+ "processedValue : " + processedValue);
}).subscribe();
}
});
Flux<EVENT> EVENTF =EVENTMono.cache().flatMapMany(Flux::fromIterable);
Flux<Boolean> result = EVENTF.concatMap(esCache -> {
Detail Detail = new Detail(esCache.getCrawlId(),
LocalDateTime.parse(esCache.getCrawledDate()));
List<Detail> Details = new ArrayList<>();
Details.add(Detail);
return reactiveRedisOperations.set(esCache.getKey(), Details);
}).flatMap(res->{
return Flux.just(res);
});
result.subscribe();
finalResult.add(result);
}
return Mono.just(finalResult);
});