I’m stuck with sample code I wrote, basically it seems like one object in stream is not processed on some code level. The most of my code with comments:
private static final int HOW_MANY_DOCS = 500;
private final int numThreads = 4;
private final Scheduler scheduler = Schedulers.newParallel("xml-parser-scheduler", numThreads);
private final boolean runIt = true;
@EventListener(ApplicationReadyEvent.class)
public void init() {
if (runIt) {
Instant start = Instant.now();
// this is the addition which prevents onComplete - if I start from parsing(0L) I get correct number of results and
// onComplete is called
wikiDocumentRepository.findMaxId()
.switchIfEmpty(Mono.just(0L))
.flatMapMany(this::parsing)
.doOnError(error -> System.err.println("Error: " + error))
.doOnComplete(() -> log.info("Parsing completed"))
.doOnComplete(() -> log.info("Time elapsed: {}", Instant.now().getEpochSecond() - start.getEpochSecond()))
.doOnComplete(() -> log.info("Processed documents threads: {}", processedDocumentsThreads))
.subscribe();
}
}
so if I remove this part: wikiDocumentRepository.findMaxId()...flatMapMany(this::parsing)
and replace it with parsing(0L)
it works, the onComplete is called.
Parsing method, just uses a sample dump to get some data to play with:
private Flux<Void> parsing(Long id) {
XmlStreamingParser parser = new XmlStreamingParser();
File xmlFile = new File("enwiki-20240520-pages-articles-multistream.xml");
return parser.parseXmlFile(xmlFile)
.filter(element -> isNull(element.getRedirect()) && StringUtils.hasText(element.getRevision().getText())) // && element.getId() > id)
.take(HOW_MANY_DOCS)
.publishOn(scheduler)
.doOnNext(entity -> {
long counted = counterXmlDocuments.incrementAndGet();
log.info("xml entity id: {} {}", entity.getId(), counted);
processedDocumentsIds.add(entity.getId());
})
.flatMap(this::storeDocument, numThreads);
}
And storeDocument just puts doc on MongoDB:
private Mono<Void> storeDocument(WikiPageElement element) {
String id = element.getId().toString();
return wikiDocumentRepository.findById(id)
.publishOn(scheduler)
.switchIfEmpty(wikiDocumentRepository.save(new WikiDocument(id, element.getTitle(), element.getRevision().getText())))
.doOnNext(entity -> {
long counted = counterMongoDocuments.incrementAndGet();
log.info("mongo entity id: {} {}", entity.getId(), counted);
processedDocumentsIds.remove(Long.valueOf(entity.getId()));
log.info("ids processed: {}", processedDocumentsIds.size());
})
.then();
}
My logs ends without onComplete here:
2024-05-28T09:32:26.450+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-2] c.e.d.service.WikiDocumentService : ids not processed: [12, 2023]
2024-05-28T09:32:26.450+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : xml entity id: 2032 493
2024-05-28T09:32:26.450+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : xml entity id: 2037 494
2024-05-28T09:32:26.450+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-3] c.e.d.service.WikiDocumentService : mongo entity id: 2023 491 xml-parser-scheduler-3
2024-05-28T09:32:26.450+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-3] c.e.d.service.WikiDocumentService : ids not processed: [12, 2032, 2037]
2024-05-28T09:32:26.450+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : xml entity id: 2038 495
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : mongo entity id: 2032 492 xml-parser-scheduler-1
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : ids not processed: [12, 2037, 2038]
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : xml entity id: 2039 496
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-4] c.e.d.service.WikiDocumentService : mongo entity id: 2037 493 xml-parser-scheduler-4
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-4] c.e.d.service.WikiDocumentService : ids not processed: [12, 2038, 2039]
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-2] c.e.d.service.WikiDocumentService : mongo entity id: 2038 494 xml-parser-scheduler-2
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-2] c.e.d.service.WikiDocumentService : ids not processed: [12, 2039]
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : xml entity id: 2041 497
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : xml entity id: 2052 498
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-3] c.e.d.service.WikiDocumentService : mongo entity id: 2039 495 xml-parser-scheduler-3
2024-05-28T09:32:26.451+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-3] c.e.d.service.WikiDocumentService : ids not processed: [12, 2041, 2052]
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : xml entity id: 2061 499
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : mongo entity id: 2052 496 xml-parser-scheduler-1
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : ids not processed: [12, 2041, 2061]
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : xml entity id: 2063 500
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-4] c.e.d.service.WikiDocumentService : mongo entity id: 2041 497 xml-parser-scheduler-4
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-4] c.e.d.service.WikiDocumentService : ids not processed: [12, 2061, 2063]
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-2] c.e.d.service.WikiDocumentService : mongo entity id: 2063 498 xml-parser-scheduler-2
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-2] c.e.d.service.WikiDocumentService : ids not processed: [12, 2061]
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-3] c.e.d.service.WikiDocumentService : mongo entity id: 2061 499 xml-parser-scheduler-3
2024-05-28T09:32:26.452+02:00 INFO 48273 --- [demoreactive] [ser-scheduler-3] c.e.d.service.WikiDocumentService : ids not processed: [12]
And 12
is the first parsed page, it stays there for all the time.
If I switch back my init method to:
parsing(0L)
.doOnError(error -> System.err.println("Error: " + error))
.doOnComplete(() -> log.info("Parsing completed"))
.doOnComplete(() -> log.info("Time elapsed: {}", Instant.now().getEpochSecond() - start.getEpochSecond()))
.doOnComplete(() -> log.info("Processed documents threads: {}", processedDocumentsThreads))
.subscribe();
Everything is correctly processed, 500 documents and onComplete called:
2024-05-28T09:35:29.480+02:00 INFO 48299 --- [demoreactive] [ser-scheduler-3] c.e.d.service.WikiDocumentService : ids not processed: [2061, 2063]
2024-05-28T09:35:29.480+02:00 INFO 48299 --- [demoreactive] [ser-scheduler-4] c.e.d.service.WikiDocumentService : mongo entity id: 2061 499 xml-parser-scheduler-4
2024-05-28T09:35:29.480+02:00 INFO 48299 --- [demoreactive] [ser-scheduler-4] c.e.d.service.WikiDocumentService : ids not processed: [2063]
2024-05-28T09:35:29.481+02:00 INFO 48299 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : mongo entity id: 2063 500 xml-parser-scheduler-1
2024-05-28T09:35:29.481+02:00 INFO 48299 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : ids not processed: []
2024-05-28T09:35:29.481+02:00 INFO 48299 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : Parsing completed
2024-05-28T09:35:29.481+02:00 INFO 48299 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : Time elapsed: 1
2024-05-28T09:35:29.481+02:00 INFO 48299 --- [demoreactive] [ser-scheduler-1] c.e.d.service.WikiDocumentService : Processed documents threads: [xml-parser-scheduler-1, xml-parser-scheduler-2, xml-parser-scheduler-3, xml-parser-scheduler-4]
Please help me to find out what I’m doing wrong.