After the job is successful and completed for the first 10 chunks/pages for example
I then insert more rows and run the batch again expecting it to start from the next chunk but it starts from first chunk again chunk 0.
Even though when the job gets aborted by manually updating the Job and Step execution context’s status to FAILED
in the DB as mentioned before in other questions answered here before it does restart at the last successful record
and maintains the last chunk as expected
But does this not also work when the job is completed ? I thought maybe the ExecutionContext
should read the START_AFTER_VALUE
value which should be the last successful record written but it seems to only get the last successful record when it is aborted but when it is successful and its status is Completed it always starts from 0
Expected behaviour was for the JdbcPagingItemReader
to always start from the START_AFTER_VALUE
which actually does work but only after aborted jobs not completed jobs
So am I missing a configuration here ? I debugged the ExecutionContext and it does persist the correct value before completion but when restarting the job again after the previous one was completed successfully it reads this value as 0 from the open
method inside the JdbcPagingItemReader
so i think maybe there is a new instance getting created which does not have the state of the previous completed job ? Not sure
Here is are my configs
@Bean("orderProcessingJob")
public Job orderProcessingJob(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager,
DataSource dataSource,PagingQueryProvider pagingQueryProvider) {
return new JobBuilder("order-processing-job",jobRepository)
.start(processOrder(jobRepository,platformTransactionManager,
dataSource,pagingQueryProvider))
.build();
}
@Bean
public Step processOrder(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager,
DataSource dataSource,
PagingQueryProvider pagingQueryProvider){
return new StepBuilder("order-processing-step",jobRepository)
.allowStartIfComplete(true)
.<Order, Order> chunk(5,platformTransactionManager)
.reader(jdbcOrderItemReader(dataSource,pagingQueryProvider))
.processor(itemProcessor())
.writer(flatFileItemWriter())
.build();
}
@Bean
public JdbcPagingItemReader<Order> jdbcOrderItemReader(DataSource dataSource,
PagingQueryProvider pagingQueryProvider) {
return new JdbcPagingItemReaderBuilder<Order>()
.saveState(true)
.name("jdbc-item-reader")
.dataSource(dataSource)
.queryProvider(pagingQueryProvider)
.pageSize(5)
.rowMapper((resultSet, rowNum) -> {
int id = resultSet.getInt("id");
String name = resultSet.getString("name");
String description = resultSet.getString("description");
return new Order(id, name, description);
})
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean pagingQueryProviderFactoryBean(DataSource dataSource){
SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setSelectClause("select *");
factoryBean.setFromClause("from orders");
factoryBean.setSortKey("id");
return factoryBean;
}
@Bean
public ItemProcessor<Order,Order> itemProcessor(){
return item -> {
logger.info("Order: {" + item.name().toLowerCase(Locale.ROOT) + "} is being processed!");
Thread.sleep(1000); // simulating real processing time
return item;
};
}
@Bean
public FlatFileItemWriter<Order> flatFileItemWriter(){
return new FlatFileItemWriterBuilder<Order>()
.name("orders-item-writer")
.append(true)
.saveState(true)
.delimited()
.names("id","name","description")
.resource(new FileSystemResource("new_orders.csv"))
.build();
}
What I was trying to do is when the job is completed successfully then I would restart it after a while or have a scheduler do it to continue reading starting from the new records if any which I expected to be saved in the ExceutionContext using the START_AFTER_VALUE
value in JdbcPagingItemReader