i am a neophyte of java as well as the whole spring framework. I am trying to develop an application in SpringBoot to be used in an MQTT context.
The application consists of two microservices: the first that retrieves data from a broker and saves it to a database, and the second exposes endpoints to allow users to retrieve and perform operations on some kind of data.
The problem is in the first microservice, called DataRetriever
, which has to deal with reading weather messages (written in json
format) from a topic (weather-data) on a Mosquitto broker and contextually parse them to then save the json fields into proper tables within Postgres.
Basically I can’t find a proper way of feeding the message payloads from the topic to the Object Mapper.
The entire logic I’ve built is based onto two files:
MqttBeans
where I handle the connection logic going in and out of the broker channels, the code is this one:
package com.example.demo.configs;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* Setting up MQTT Client connection configurations
* the Client Factory
* the Channels: Inbound + Outbound
* the Message Handler
*/
@Configuration
public class MqttBeans {
// Client Factory (factory configs)
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// Options Settings
options.setServerURIs(new String[] {"tcp://localhost:1883"});
//options.setUserName("postgres");
//String password = "admin123";
//options.setPassword(password.toCharArray());
options.setCleanSession(true);
factory.setConnectionOptions(options);
return factory;
}
// Inbound Ch (Subscribing)
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("serverIn", mqttClientFactory(),"#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
// Msg Handler
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
// Topic
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); // retrieving the topic from the message header
if (topic.equals("weather-data")) {
System.out.println("Here's the topic: " + topic); // printing out the topic
}
// Payload
String payload = message.getPayload().toString();
System.out.println("Here's the payload: " + payload); // printing out any msg that comes in the ch
}
};
}
// Outbound Ch (Publishing)
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
// Msg Handler
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("serverOut", mqttClientFactory());
messageHandler.setAsync(true); // so that the client will always be up and listening
messageHandler.setDefaultTopic("weather-data");
messageHandler.setDefaultRetained(false);
return messageHandler;
}
}
StartupUtility
which should take care of parsing the messages that are written on the broker topic, mapping them (with the objectMapper) to what is written in my CityEntity and creating and populating the tables in postgres (with the data that it read during the parsing of the json fields)
The problem is that, as you can see from the code written in StartupUtility
package com.example.demo.startup;
import com.example.demo.entities.CityEntity;
import com.example.demo.repos.CityWeatherRepo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
// Implementing CommandLineRunnerInterface to be init when proj is created
@Component
@Log
public class StartupUtility implements CommandLineRunner {
// Passing the json as a value to test the parsing logic
@Value("${demo.json.string}") private String json;
// Wiring the Inbound ch
@Autowired
private MessageChannel mqttInputChannel;
// Wiring the Repo
@Autowired private CityWeatherRepo repo;
@Override
public void run(String... args) throws Exception {
// Init Obj Mapper instance
ObjectMapper mapper = new ObjectMapper();
// Avoiding failure in case of unrecognized fields during json mapping, a workaround may be the 'Mixin' features of the Jackson pckg
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// Getting the json + Converting into the City Class
CityEntity value = mapper.readValue(json, CityEntity.class); // replace json with payload // input: json in string format, output: OpenAPI/CityEntity (class)
// Saving
CityEntity save = repo.save(value);
// Checking the Saving process
log.info(" Entity info " + save.toString());
}
}
the json
I’m going to parse is hardcoded into a dedicated variable “demo.json.string”, written in the application.properties
file (I did this to see if at least the parsing logic is correct, and it is) , while I want the jsons to be the ones that are written in the topic (weather-data) like this:
topic messages
the problem is that, as you can see from the code written in StartupUtility, the json I’m going to parse is hardcoded into a dedicated variable “demo.json.string,” written in the application.properties file (I did this to see if at least the parsing logic is correct, and it is) , while I want the jsons to be the ones that are written in the topic (weather-data) like this
My idea was to inject into StartupUtility
the channel and be able to pass the payload (?) as a parameter, but it doesn’t work.
Does anyone know how to address this or have any newbie-suggestions to structure the code differently from a logic pov?
andreasblendorio is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.