I use spring boot amqp and the x-consistent-hash RabbitMQ plugin. I need
-
multiple queues bound to the same x-consistent-hash exchange
-
separate listener for each queue to keep partial ordering
However, I see in the logs
<code>ERROR 4828 --- [ main] o.s.a.r.l.SimpleMessageListenerContainer : Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?
</code>
<code>ERROR 4828 --- [ main] o.s.a.r.l.SimpleMessageListenerContainer : Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?
</code>
ERROR 4828 --- [ main] o.s.a.r.l.SimpleMessageListenerContainer : Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?
The app configuration is stuck and the app is not starting.
The configuration is following
<code>@Configuration
public class RabbitMqConfig {
@Bean
public ConnectionFactory connectionFactory() {
com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
cf.setUsername("***");
cf.setPassword("***");
cf.setVirtualHost("/");
cf.setAutomaticRecoveryEnabled(false);
return new PooledChannelConnectionFactory(cf);
}
@Bean
public RabbitTemplate feedRabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public RabbitAdmin feedRabbitAdmin(RabbitTemplate feedRabbitTemplate) {
return new RabbitAdmin(feedRabbitTemplate);
}
@Bean
public Exchange myExchange(RabbitAdmin rabbitAdmin) {
Map<String, Object> arguments = Map.of("hash-header", "hash-on");
Exchange exchange = new CustomExchange("my.exchange", "x-consistent-hash", true, false, arguments);
exchange.setAdminsThatShouldDeclare(rabbitAdmin);
return exchange;
}
@Bean("myQueues")
public List<Queue> myQueues(RabbitAdmin rabbitAdmin) {
List<Queue> queues = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
String queueName = String.join(".", "my.queue", Integer.toString(i));
Queue queue = QueueBuilder.durable(queueName)
.singleActiveConsumer()
.build();
queue.setAdminsThatShouldDeclare(rabbitAdmin);
rabbitAdmin.declareQueue(queue);
queues.add(queue);
}
return queues;
}
@Bean
public List<Binding> myBindings(RabbitAdmin rabbitAdmin,
@Qualifier("myQueues") List<Queue> queues,
Exchange exchange) {
List<Binding> bindings = new ArrayList<>();
for (Queue queue : queues) {
Binding binding = BindingBuilder.bind(queue)
.to(exchange)
.with("1")
.noargs();
binding.setAdminsThatShouldDeclare(rabbitAdmin);
rabbitAdmin.declareBinding(binding);
bindings.add(binding);
}
return bindings;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("rabbitmq-listener-");
executor.initialize();
return executor;
}
@Bean
public List<SimpleMessageListenerContainer> feedListenerContainers(RabbitAdmin rabbitAdmin,
@Qualifier("myQueues") List<Queue> queues,
@Qualifier("jsonMessageConverter") Jackson2JsonMessageConverter converter,
MyMessageHandler myMessageHandler,
TaskExecutor taskExecutor) {
ConnectionFactory connectionFactory = rabbitAdmin.getRabbitTemplate().getConnectionFactory();
List<SimpleMessageListenerContainer> containers = new ArrayList<>();
for (Queue queue : queues) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAmqpAdmin(rabbitAdmin);
container.setMessageListener(new MyMessageListenerAdapter(converter, myMessageHandler));
container.setTaskExecutor(taskExecutor);
container.addQueues(queue);
containers.add(container);
container.start();
}
return containers;
}
@Bean("jsonMessageConverter")
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public ObjectMapper objectMapper() {
JavaTimeModule module = new JavaTimeModule();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(module);
return objectMapper;
}
</code>
<code>@Configuration
public class RabbitMqConfig {
@Bean
public ConnectionFactory connectionFactory() {
com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
cf.setUsername("***");
cf.setPassword("***");
cf.setVirtualHost("/");
cf.setAutomaticRecoveryEnabled(false);
return new PooledChannelConnectionFactory(cf);
}
@Bean
public RabbitTemplate feedRabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public RabbitAdmin feedRabbitAdmin(RabbitTemplate feedRabbitTemplate) {
return new RabbitAdmin(feedRabbitTemplate);
}
@Bean
public Exchange myExchange(RabbitAdmin rabbitAdmin) {
Map<String, Object> arguments = Map.of("hash-header", "hash-on");
Exchange exchange = new CustomExchange("my.exchange", "x-consistent-hash", true, false, arguments);
exchange.setAdminsThatShouldDeclare(rabbitAdmin);
return exchange;
}
@Bean("myQueues")
public List<Queue> myQueues(RabbitAdmin rabbitAdmin) {
List<Queue> queues = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
String queueName = String.join(".", "my.queue", Integer.toString(i));
Queue queue = QueueBuilder.durable(queueName)
.singleActiveConsumer()
.build();
queue.setAdminsThatShouldDeclare(rabbitAdmin);
rabbitAdmin.declareQueue(queue);
queues.add(queue);
}
return queues;
}
@Bean
public List<Binding> myBindings(RabbitAdmin rabbitAdmin,
@Qualifier("myQueues") List<Queue> queues,
Exchange exchange) {
List<Binding> bindings = new ArrayList<>();
for (Queue queue : queues) {
Binding binding = BindingBuilder.bind(queue)
.to(exchange)
.with("1")
.noargs();
binding.setAdminsThatShouldDeclare(rabbitAdmin);
rabbitAdmin.declareBinding(binding);
bindings.add(binding);
}
return bindings;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("rabbitmq-listener-");
executor.initialize();
return executor;
}
@Bean
public List<SimpleMessageListenerContainer> feedListenerContainers(RabbitAdmin rabbitAdmin,
@Qualifier("myQueues") List<Queue> queues,
@Qualifier("jsonMessageConverter") Jackson2JsonMessageConverter converter,
MyMessageHandler myMessageHandler,
TaskExecutor taskExecutor) {
ConnectionFactory connectionFactory = rabbitAdmin.getRabbitTemplate().getConnectionFactory();
List<SimpleMessageListenerContainer> containers = new ArrayList<>();
for (Queue queue : queues) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAmqpAdmin(rabbitAdmin);
container.setMessageListener(new MyMessageListenerAdapter(converter, myMessageHandler));
container.setTaskExecutor(taskExecutor);
container.addQueues(queue);
containers.add(container);
container.start();
}
return containers;
}
@Bean("jsonMessageConverter")
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public ObjectMapper objectMapper() {
JavaTimeModule module = new JavaTimeModule();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(module);
return objectMapper;
}
</code>
@Configuration
public class RabbitMqConfig {
@Bean
public ConnectionFactory connectionFactory() {
com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
cf.setUsername("***");
cf.setPassword("***");
cf.setVirtualHost("/");
cf.setAutomaticRecoveryEnabled(false);
return new PooledChannelConnectionFactory(cf);
}
@Bean
public RabbitTemplate feedRabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public RabbitAdmin feedRabbitAdmin(RabbitTemplate feedRabbitTemplate) {
return new RabbitAdmin(feedRabbitTemplate);
}
@Bean
public Exchange myExchange(RabbitAdmin rabbitAdmin) {
Map<String, Object> arguments = Map.of("hash-header", "hash-on");
Exchange exchange = new CustomExchange("my.exchange", "x-consistent-hash", true, false, arguments);
exchange.setAdminsThatShouldDeclare(rabbitAdmin);
return exchange;
}
@Bean("myQueues")
public List<Queue> myQueues(RabbitAdmin rabbitAdmin) {
List<Queue> queues = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
String queueName = String.join(".", "my.queue", Integer.toString(i));
Queue queue = QueueBuilder.durable(queueName)
.singleActiveConsumer()
.build();
queue.setAdminsThatShouldDeclare(rabbitAdmin);
rabbitAdmin.declareQueue(queue);
queues.add(queue);
}
return queues;
}
@Bean
public List<Binding> myBindings(RabbitAdmin rabbitAdmin,
@Qualifier("myQueues") List<Queue> queues,
Exchange exchange) {
List<Binding> bindings = new ArrayList<>();
for (Queue queue : queues) {
Binding binding = BindingBuilder.bind(queue)
.to(exchange)
.with("1")
.noargs();
binding.setAdminsThatShouldDeclare(rabbitAdmin);
rabbitAdmin.declareBinding(binding);
bindings.add(binding);
}
return bindings;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("rabbitmq-listener-");
executor.initialize();
return executor;
}
@Bean
public List<SimpleMessageListenerContainer> feedListenerContainers(RabbitAdmin rabbitAdmin,
@Qualifier("myQueues") List<Queue> queues,
@Qualifier("jsonMessageConverter") Jackson2JsonMessageConverter converter,
MyMessageHandler myMessageHandler,
TaskExecutor taskExecutor) {
ConnectionFactory connectionFactory = rabbitAdmin.getRabbitTemplate().getConnectionFactory();
List<SimpleMessageListenerContainer> containers = new ArrayList<>();
for (Queue queue : queues) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAmqpAdmin(rabbitAdmin);
container.setMessageListener(new MyMessageListenerAdapter(converter, myMessageHandler));
container.setTaskExecutor(taskExecutor);
container.addQueues(queue);
containers.add(container);
container.start();
}
return containers;
}
@Bean("jsonMessageConverter")
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public ObjectMapper objectMapper() {
JavaTimeModule module = new JavaTimeModule();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(module);
return objectMapper;
}
What is wrong and how can it be fixed?
I tried configuring queues, bindings, and listeners separately (one bean – one object) and removing manual declaration/starting, but it did not help. I also tried using different spring boot versions, but no affect.