I am batch processing messages using kafka listener and I want them to be validated before processing. Here’s how I have configured my kafka listener
@KafkaListener(id = "listener.id",
topics = "kafka.events",
idIsGroup = true,
batch = "true",
concurrency = "1")
public void onMessage(@Valid @Payload ConsumerRecords<String, KafkaEvent> consumerRecords) {
...
}
And my event message looks as below – fields are annotated with required constraints against which they need to be validated:
@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaEvent {
@NotNull
@Size(min = 1, max = 64)
private String eventId;
@NotNull @PositiveOrZero
@Digits(integer = 20, fraction = 6)
private BigDecimal amount;
....
}
As per the documentation https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/validation.html, I have also configured a validator:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
But fields are not getting validated if I follow the above approach. So, I had to change my approach, create an error de-serializer to handle the deserialization failures and a custom validator which I am setting in de-serializer to validate the messages. As I understand from the documentation, we do need an error de-serializer to handle deserialization failures but do we really need a custom validator to validate the messages in batch? Shouldn’t messages in the batch be validated by spring itself after deserialization, if we have correctly placed the @Valid @Payload annotations.
Approach 2:
Configuration
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
...
properties:
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.deserializer.validator.class: com.shinky.myservice.validator.CustomValidator
Validator
import jakarta.validation.Validation;
import jakarta.validation.Validator;
import org.springframework.validation.beanvalidation.SpringValidatorAdapter;
public class CustomValidator extends SpringValidatorAdapter {
public CustomValidator() {
super(validator());
}
public static Validator validator() {
return Validation.byDefaultProvider()
.configure()
.buildValidatorFactory().getValidator();
}
}
I wanted to understand, if I have missed something on implementation because of which validation is not working using @Payload and @Valid annotations or if this is intentional for any valid reason?