I am sending a lot of events to hazelcast. I am trying to count the number of received events using hazelcast jet, but the count at the end is incorrect.
My code is like this.
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.mapJournal("source",
JournalInitialPosition.START_FROM_CURRENT))
.withIngestionTimestamps()
.map(o -> new AbstractMap.SimpleEntry<>("bla", new BigDecimal(1)))
.writeTo(Sinks.mapWithEntryProcessor(1, "target", AbstractMap.SimpleEntry::getKey,
entry -> entry1 -> {
BigDecimal value = entry.getValue();
BigDecimal value1 = ((BigDecimal) entry1.getValue());
if (value1 == null) {
entry1.setValue(value);
} else {
entry1.setValue(value.add(value1));
}
return entry1;
}));
JobConfig cfg = new JobConfig()
.setName("consumer")
.addClass(JobHolderClass.class)
.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
hazelcastInstance.getJet().newJobIfAbsent(pipeline, cfg);
I have configured an event journal on the source map, however even if the capacity is very large, I still get the wrong count in the target map.
How can I configure the jet, so that it locks the target map?