My usecase:
- When A new address is added, I will send the user domain entity to kafka (it includes array address) to notify the redis caching this data.
But it has problem, when I send the user data with many addresses, it still send to kafka and topic still get it but it moves the data to the dead letter (Successful dead-letter publication: cache-user-0@88 to cache-user.DLT-0@60)
But when I send user data with null or empty, it works smoothly.
Maybe It can’t deserialize the nested class Address?
Can anyone help me solve, I am stuck with problem a long time?
Thanks advance
My user domain entity:
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
private Long id;
private String fullName;
private Email email;
private List<Address> addresses;
private PhoneNumber phoneNumber;
}
My address domain entity:
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Address {
private Long id;
private String province;
private String district;
private String ward;
private String homeAddress;
private AddressType type;
private Long userId;
}
My consumer:
@KafkaListener(topics = { CACHE_USER_TOPIC }, groupId = KafkaConfiguration.GROUP_ID)
@KafkaHandler(isDefault = true)
public void cacheUser(ConsumerRecord<String, Object> record) {
User inputData = this.objectMapper.convertValue(record.value(),
User.class);
this.userRedisCaching.cache(inputData);
}
My usecase impl:
@Override
@Transactional("transactionManager")
public UserAddressOutputModel createAddress(Long userId, CreateUserAddressInputModel inputModel) {
Optional<User> matchedUser = this.loadUserPort.loadUser(userId);
if (matchedUser.isEmpty()) {
throw new UserNotFoundException();
}
if (matchedUser.get().getAddresses().size() >= MAX_ADDRESSES) {
throw new ReachTheMaximumAddressesPerUser();
}
Address address = Address.builder()
.province(inputModel.getProvince())
.district(inputModel.getDistrict())
.ward(inputModel.getWard())
.homeAddress(inputModel.getHomeAddress())
.type(new AddressType(inputModel.getType()))
.userId(matchedUser.get().getId())
.build();
Address newAddress = this.createUserAddressPort.createUserAddress(address);
matchedUser.get().addNewAddress(newAddress);
// User user = User.builder().addresses(Collections.emptyList()).build();
this.sendEventToMessageQueuePort.cacheUser(matchedUser.get());
return UserAddressOutputModel.convertFromDomain(newAddress);
}
My application.properties
spring.kafka.bootstrap-servers=localhost:9092
# Producer Configuration
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Consumer Configuration
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=exam-outline-pj
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
# kafka transaction
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.isolation-level=READ_COMMITTED
spring.kafka.listener.ack-mode=RECORD
# kafka debug
logging.level.org.springframework.kafka=DEBUG
Resolve problem and know the root reason.