I have the following Spring Batch
config with the related ItemReader
and ItemWriter
and try to read and copy filtered data per collection in MongoDB
. In this implementation, I retrieve colectionName
– query
parameters as a map and loop through it to insert each collection based on the corresponding query. However, I read the related data via ItemReader
but could not synchronise each collection read & write operations. I need to read each collection and write only that collection data (or read multiple collections based on the query and then write all of them at once). So, how to achieve this?
@Bean
public Step insertData(JobRepository jobRepository,
PlatformTransactionManager sourceTransactionManager,
MongoTemplate sourceMongoTemplate,
MongoTemplate targetMongoTemplate) {
return new StepBuilder("insertData", jobRepository)
.<Document, Document>chunk(DEFAULT_CHUNK_SIZE, sourceTransactionManager)
.startLimit(DEFAULT_LIMIT_SIZE)
.reader(new MigrationItemReader(sourceMongoTemplate))
.processor(new MigrationItemProcessor())
.writer(new MigrationWriter(targetMongoTemplate))
.listener(new MigrationStepListener())
.build();
}
@Component
public class MigrationWriter implements ItemWriter<Document>, StepExecutionListener {
private Map<String, String> jobParametersMap;
// code omitted for brevity
@Override
public void write(Chunk<? extends Document> itemChunk) {
// Iterate over the map and configure the reader accordingly
for (Map.Entry<String, String> entry : jobParametersMap.entrySet()) {
final List<Document> items = (List<Document>) itemChunk.getItems();
mongoTemplate.insert(items, entry.getKey());
}
}
@Component
public class MigrationItemReader extends MongoCursorItemReader<Document> {
private Map<String, String> jobParametersMap;
// code omitted for brevity
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
// for single collection I can set collectionName and query parameters and read corresponding data
// but I need to read multiple collections at once or one by one before each insertion
JobParameters jobParameters = stepExecution.getJobParameters();
setCollection(jobParameters.getString("collectionName"));
setQuery(new BasicQuery(jobParameters.getString("query")));
}
private void initializeReader(MongoTemplate mongoTemplate) {
setName("migrationDataReader");
setTargetType(Document.class);
setTemplate(mongoTemplate);
setBatchSize(DEFAULT_CHUNK_SIZE);
setLimit(DEFAULT_LIMIT_SIZE);
Map<String, Sort.Direction> sortOptions = new HashMap<>();
sortOptions.put("id", Sort.Direction.ASC);
setSort(sortOptions);
setQuery(new Query());
}
}