@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
connectionOptions.
setConnectionTimeout(30);
connectionOptions.
setMaxReconnectDelay(30);
connectionOptions.setUserName(mqttProperties.getUsername());
connectionOptions.setPassword(mqttProperties
.getPassword()
.getBytes(StandardCharsets.UTF_8)
);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager =
new Mqttv5ClientManager(
connectionOptions,
Objects.requireNonNull(MyUtils.getClientId()));
clientManager.setPersistence(new MemoryPersistence());
return clientManager;
}
@Bean
public IntegrationFlow mqttInFlow(
final ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(
clientManager, "$share/group1/test");
messageProducer.setPayloadType(byte[].class);
messageProducer.setManualAcks(true);
messageProducer.setMessageConverter(new ByteArrayMessageConverter());
return IntegrationFlow.from(messageProducer)
.handle(mqttMessageHandler)
.get();
}
public class MqttMessageHandler implements GenericHandler<byte[]> {
@Override
public Object handle(final byte[] payload, final MessageHeaders headers) {
log.info("Received message: {}", payload);
}
}
Given:
Above will subscribe to shared topic “$share/group1/test”
Expectation:
message(s) published to “test” should be available in MqttMessageHandler handle method to use.
Note: It’s working for non-shared topic subscription.
Similar Question: Shared Subscription with Spring Integration Mqttv5 not Receiving Messages