I am working on a Spring Boot application where I need to schedule a Spring Batch job based on a country code parameter (uk for United Kingdom, us for USA). Each country should have a different scheduling interval: every 30 minutes for UK and every 50 minutes for USA.
I have a SchedulerConfig class responsible for scheduling the job and a BatchConfig class defining the Spring Batch job components (Job, Step, etc.). Here’s a simplified version of my current setup:
@Slf4j
@Configuration
@EnableScheduling
@RequiredArgsConstructor
public class SchedulerConfig {
private final JobLauncher jobLauncher;
private final Job job;
// TODO: Find a way to pass a parameter and schedule the execution based on a parameter (uk ot us)
public void scheduleJob() {
log.info("Starting scheduled spring batch job");
JobParameters jobParameters = new JobParametersBuilder()
.addLong("startAt", System.currentTimeMillis())
.toJobParameters();
try {
jobLauncher.run(job, jobParameters);
} catch (JobExecutionAlreadyRunningException
| JobRestartException
| JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
throw new RuntimeException(e);
}
}
}
@Configuration
public class CustomerReader {
@Value("${file.input}")
private String fileInput;
// csv files are also called flat files
public FlatFileItemReader<Customer> itemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.resource(new ClassPathResource(fileInput))
.name("csv-reader")
.linesToSkip(1) // skip the first line of the csv because it contains the headers
.lineMapper(lineMapper())
.build();
}
// implementation of Item Reader
private LineMapper<Customer> lineMapper() {
DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
// specify how my columns are separated
tokenizer.setDelimiter(",");
// specify the fields
tokenizer.setNames("id", "firstName", "lastName", "email", "gender", "contactNo", "country", "dob");
tokenizer.setStrict(false);
BeanWrapperFieldSetMapper<Customer> mapper = new BeanWrapperFieldSetMapper<>();
mapper.setTargetType(Customer.class);
lineMapper.setFieldSetMapper(mapper);
lineMapper.setLineTokenizer(tokenizer);
return lineMapper;
}
}
@Configuration
public class CustomerProcessor implements ItemProcessor<Customer, Customer> {
// This process will be executed for each record provided as input
@Override
public Customer process(Customer item) throws Exception {
Long contactNumber = Long.parseLong(item.getContactNo().toString());
if (contactNumber == null) {
return null;
}
item.setId(null);
return item;
}
}
@RequiredArgsConstructor
@Configuration
public class CustomerWriter {
private final CustomerRepository customerRepository;
// Implementation of Item Writer
@Bean
public RepositoryItemWriter<Customer> itemWriter() {
return new RepositoryItemWriterBuilder<Customer>()
.repository(customerRepository)
.methodName("save") // we want to use the "save" method of customerRepository
.build();
}
}
@RequiredArgsConstructor
@Configuration
public class BatchConfig {
private final CustomerReader customerReader;
private final CustomerWriter customerWriter;
private final SkipListener<Customer, Customer> skipListener;
// Implementation of Item Processor
@Bean
public CustomerProcessor processor() {
return new CustomerProcessor();
}
@Bean
public Step step(JobRepository repository, PlatformTransactionManager transactionManager) {
return new StepBuilder("csv-step", repository)
.<Customer, Customer>chunk(1000, transactionManager)
.reader(customerReader.itemReader())
.processor(processor())
.writer(customerWriter.itemWriter())
.faultTolerant()
// .skipLimit(100)
// .skip(Exception.class)
.skipPolicy(skipPolicy()) // use our custom skip policy which does not require skip limit
.taskExecutor(taskExecutor()) // configure multi-thread step
.listener(skipListener)
.build();
}
// TaskExecutor can be used to process data in parallel using threads
private TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutorBuilder()
.concurrencyLimit(10) // set async threads that will be used. Each thread will process 10 chunks of data
.build();
}
// specify the Step of the Job
@Bean(name = "csvJob")
public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("csv-job", jobRepository)
.flow(step(jobRepository, transactionManager))
.end()
.build();
}
@Bean
public SkipPolicy skipPolicy() {
return new ExceptionSkipPolicy();
}
}
Is there a better approach to handle scheduling based on dynamic parameters such as country codes in Spring Boot applications?
Any help or suggestions would be greatly appreciated! Thanks in advance!