For migration documents between different MongoDB
collections, I read items via MongoCursorItemReader
based on a filter and then write these data using ItemWriter
as shown below. In order to prevent duplicate key error, I remove the items based on the same filter (query).
@Override
public void write(Chunk<? extends Document> chunk) {
final List<ObjectId> ids = chunk.getItems().stream()
.map(document -> document.getObjectId("_id"))
.collect(Collectors.toList());
final Query deleteQuery = new Query(Criteria.where("_id").in(ids));
mongoTemplate.remove(deleteQuery, collectionName);
mongoTemplate.insert(chunk.getItems(), collectionName);
totalWrittenItems += chunk.size();
}
It works perfectly if the amount of read items is less than batch size. If not, let’s sat batch size is 1000 and there are 1500 filtered items, then first it deletes all 1500 records and insert 1000 records. Then, after reading the next batch (last 500 records), it deletes the first inserted 100 records as they are also matches the query and just insert the last batch (500 records inserted in total instead of 1500).
I thought to update records at first but it would be too slow and deleting items before insert would be much better. Is there a proper way to handle a similar scenario e.g. inserting data using Spring Batch
and prevent from conflict?