I am including a library in my spring boot service. The library is included as a gradle dependency and my spring boot service uses the library to connect and consume msgs in Azure Event Hubs.
The EventHubsProcessorConfiguration library includes a EventHubsProcessorConfiguration Configuration class that defines the consume() method as a Bean
@Data
@Slf4j
@Configuration
public class EventHubsProcessorConfiguration {
@Bean
public Consumer<Message<String>> consume() {
return message ->
{
Checkpointer checkpointer = (Checkpointer) message.getHeaders()
.get(AzureHeaders.CHECKPOINTER);
log.info(
"Message received: partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
message.getHeaders().get(EventHubsHeaders.OFFSET),
message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
);
final String payload = message.getPayload();
assert checkpointer != null;
checkpointer.success()
.doOnSuccess(success
-> log.info(
"Message '{}' successfully checkpointed processed message ",
payload))
.doOnError(error
-> log.error(
"Error checkpointing successfully processed message. Partition key: {}, Sequence number: {}, Offset: {}, Enqueued time: {}",
message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
message.getHeaders().get(EventHubsHeaders.OFFSET),
message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME), error))
.onErrorComplete()
.block();
};
}
}
I want to override the behavior of the consume() Bean in my spring boot application. To do this I include the library as dependency in my build.gradle
build.gradle
implementation 'com.my.enterprise:enterprise-eventhubs-processor'
Enable bean overriding in my application.yaml
spring:
main:
allow-bean-definition-overriding: true
and I created a CustomProcessorConfiguration class that extends the library’s EventHubsProcessorConfiguration that redefines the consumer() Bean for use in my spring boot service
@Slf4j
@Configuration
public class CustomProcessorConfiguration extends EventHubsProcessorConfiguration {
final CosmosService cosmosService;
public EventHubsProcessorConfiguration(CosmosService cosmosService) {
this.cosmosService = cosmosService;
}
@Override
@Bean
@Primary
public Consumer<Message<String>> consume() {
return message -> {
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(AzureHeaders.CHECKPOINTER);
log.info(
"Message received: partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
message.getHeaders().get(EventHubsHeaders.OFFSET),
message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
);
final String payload = message.getPayload();
assert checkpointer != null;
try {
log.info("Begin Processing message '{}'", payload);
cosmosService.processMessage(payload, Objects.requireNonNull(message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)).toString());
log.info("End Processing message '{}'", payload);
checkpointer.success()
.doOnSuccess(
success -> log.info(
"Message '{}' successfully checkpointed processed message ",
payload))
.doOnError(error -> log.info(
"Error checkpointing successfully processed message. Partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
message.getHeaders().get(EventHubsHeaders.OFFSET),
message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME),
error))
.onErrorComplete()
.block();
} catch (Exception e) {
lo
g.info(
"Failed to process Message '{}' ", payload);
checkpointer.failure()
.doOnSuccess(
success -> log.info(
"Failed to process Message '{}' ",
payload))
.doOnError(error -> log.info(
"Error checkpointing successfully processed message. Partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
message.getHeaders().get(EventHubsHeaders.OFFSET),
message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME),
error))
.onErrorComplete()
.block();
}
};
}
}
I am not sure why – but when the service is deployed in Azure AKS, the version of the consume() Bean that is actually run seems to indicate that the version of the consume() Bean that is actually being used is the one from the EventHubsProcessorConfiguration library and not the one defined in my CustomProcessorConfiguration class.
I’m not sure what I’m doing wrong here? I’d like to override the bean provided by my library in my CustomProcessorConfiguration, could use some advice. Thanks.