I have an EventHubConsumer that consumes data from Azure EventHub. So I have Event Grid Topic that is consuming from external Producer. The event grid topic after consuming from Producer it pushes data to EventHub and I am then consuming from the EventHub in my spring boot application. The consumer is working and I am able to consume events/me ssages and I save them to a database tracker table. The issue I am seeing is that I see duplicates are been processed and saved. The producer is not sending duplicates.
So this is the architecture I have:
Producer: Event Grid Topic: One EventHub consumer (EventHub subscription to event grid): One partition: One consumer group: Spring boot application
I debugged my application and I see that its not my service method processAzureEventHubMessage that is causing the issue because I see that it processes a 1 message at a time and finishes it by saving it to database, and I see “Checkpoint was updated successfully” after each execution of the method when it finishes processing a single message. I don’t know if it’s not checkpointing correctly, and I don’t know why the same message comes again from eventHub to my application, but I set up all my config correctly and I am not sure If something is missing in my config or my code.
Please help if you see any bug in the code that might cause the duplication issue.
My Consumer method:
@Bean
public Consumer<Message<String>> consume(){
return message ->{
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
message.getPayload(),
message.getHeaders().get(AzureHeaders.PARTITION_KEY),
message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
message.getHeaders().get(EventHubsHeaders.OFFSET),
message.getHeaders().get(EventHubsHeaders.ENQUEUED)
);
try{
processAzureEventHubMessage(message);
checkpointer.success().block();
log.inf("Checkpoit was updated successfully", message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER));
}catch(Exception e){
log.error("Error processing event", e.getMessage());
}
};
}
My Service method that processes the message do some mapping and save it to a Database table
public Response processAzureEventHubMessage(Message<String> message){
Response reponse = new Response();
try{
Data data = eventConverter.convertToObject(message.getPayload());
Request request = new Request();
String player_id = data.getEvent().getPaymentEvent().getPlayerId().trim();
String status = data.getEvent().getPlayerEvent().getType().trim().toUpperCase();
// below is just some rest of logic that process the data and send it to database
}
}
return response;
}
My config yaml file for the eventHub Consumer:
spring:
cloud:
azure:
eventhubs:
connection-string: ......
processor:
checkpointer-store:
account-name: checkpointerstorage
container-name: messages-container
stream:
bindings:
consume-in-o:
destination: my-eventhub
group: my-consumer-group
eventhubs:
bindings:
consume-in-0:
consumer:
checkpoint:
mode: MANUAL
poller:
initial-delay: 50
fixed delay: 1000
function:
destination: consume
Jawad Timzit is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.