I have multiple jobs that does the same thing. Receive XLSX via controller -> process -> write to a Kafka topic
.
Each model/XLSX has its own Kafka topic.
I’m not putting the controller here for simplicity.
I have an abstract class to provide common configuration for my jobs. It’s called AbstractJobConfig
.
@Configuration
public abstract class AbstractJobConfig<T> {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
protected abstract RowMapper<T> getRowMapper();
protected abstract String setDefaultTopic();
protected abstract Converter<T, String> getKeyMapper();
public ItemReader<T> itemReader() {
return new AbstractItemReader<T>() {
@Override
protected RowMapper<T> createRowMapper() {
return getRowMapper();
}
};
}
public ProducerFactory<String, T> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
public KafkaItemWriter<String, T> kafkaItemWriter() {
var kafkaItemWriter = new KafkaItemWriter<String, T>();
var kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(setDefaultTopic());
kafkaItemWriter.setKafkaTemplate(kafkaTemplate);
kafkaItemWriter.setItemKeyMapper(getKeyMapper());
kafkaItemWriter.setDelete(Boolean.FALSE);
return kafkaItemWriter;
}
public ItemProcessor<T, T> getItemProcessor() {
return t -> t;
}
public JobParametersValidator jobParametersValidator() {
return new EmployeeJobParametersValidator();
}
public JobParametersValidator compositeJobParametersValidator() {
CompositeJobParametersValidator bean = new CompositeJobParametersValidator();
bean.setValidators(Collections.singletonList(jobParametersValidator()));
return bean;
}
}
Example of a job UnidadeJobConfig
using that abstract class.
@Configuration
public class UnidadeInstituicaoJobConfig extends AbstractJobConfig<UnidadeInstituicao> {
@Override
protected RowMapper<UnidadeInstituicao> getRowMapper() {
return new UnidadeInstituicaoItemRowMapper();
}
@Override
protected String setDefaultTopic() {
return "unidade-instituicao-topic";
}
public ItemReader<UnidadeInstituicao> unidadeInstituicaoItemReader() {
return new AbstractItemReader<UnidadeInstituicao>() {
@Override
protected RowMapper<UnidadeInstituicao> createRowMapper() {
return new UnidadeInstituicaoItemRowMapper();
}
};
}
@Override
protected Converter<UnidadeInstituicao, String> getKeyMapper() {
return unidadeInstituicao -> String.valueOf(unidadeInstituicao.getIdUnidadeInstituicao());
}
}
Inside of my BatchConfiguration
, I start the Job
and the Step
.
@Configuration
public class BatchConfiguration {
private final EntityManagerFactory emf;
private final UsuarioJobConfig usuarioJobConfig;
private final UnidadeInstituicaoJobConfig unidadeInstituicaoJobConfig;
private final TransportadoraValorJobConfig transportadoraValorJobConfig;
private final TipoTerminalJobConfig tipoTerminalJobConfig;
public BatchConfiguration(EntityManagerFactory emf, UsuarioJobConfig usuarioJobConfig, UnidadeInstituicaoJobConfig unidadeInstituicaoJobConfig, TransportadoraValorJobConfig transportadoraValorJobConfig, TipoTerminalJobConfig tipoTerminalJobConfig) {
this.emf = emf;
this.usuarioJobConfig = usuarioJobConfig;
this.unidadeInstituicaoJobConfig = unidadeInstituicaoJobConfig;
this.transportadoraValorJobConfig = transportadoraValorJobConfig;
this.tipoTerminalJobConfig = tipoTerminalJobConfig;
}
@Bean
public Job unidadeInstituicaoJob(JobCompletionListener listener, Step unidadeInstituicaoStep, JobRepository jobRepository) {
return new JobBuilder("unidadeInstituicaoJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(unidadeInstituicaoStep)
.end()
.validator(unidadeInstituicaoJobConfig.compositeJobParametersValidator())
.build();
}
@Bean
public Step unidadeInstituicaoStep(JobRepository jobRepository, JpaTransactionManager transactionManager, TaskExecutor taskExecutor) throws Exception {
return new StepBuilder("unidadeInstituicaoStep", jobRepository)
.<UnidadeInstituicao, UnidadeInstituicao>chunk(50, transactionManager)
.reader(unidadeInstituicaoJobConfig.unidadeInstituicaoItemReader())
.processor(unidadeInstituicaoJobConfig.getItemProcessor())
.writer(unidadeInstituicaoJobConfig.kafkaItemWriter())
.build();
}
............
I think that the problem relies in the AbstractJobConfig
but I cannot tell what’s wrong.