i have the following node MQTT:
//#include "project-conf.h"
#include "os/dev/leds.h"
#include "contiki.h"
#include "net/routing/routing.h"
#include <time.h>
#include "mqtt.h"
#include "net/ipv6/uip.h"
#include "net/ipv6/uip-icmp6.h"
#include "net/ipv6/sicslowpan.h"
#include "sys/etimer.h"
#include "sys/ctimer.h"
#include "lib/sensors.h"
#include "dev/button-hal.h"
#include "os/sys/log.h"
#include "mqtt-node.h" // .h all'interno della stessa directory del .c
#include <string.h>
#include <strings.h>
#include "os/dev/leds.h"
/*---------------------------------------------------------------------------*/
#define LOG_MODULE "mqtt-node"
#ifdef MQTT_CLIENT_CONF_LOG_LEVEL
#define LOG_LEVEL MQTT_CLIENT_CONF_LOG_LEVEL
#else
#define LOG_LEVEL LOG_LEVEL_DBG
#endif
/*---------------------------------------------------------------------------*/
/* MQTT broker address. */
#define MQTT_CLIENT_BROKER_IP_ADDR "fd00::1"
static const char *broker_ip = MQTT_CLIENT_BROKER_IP_ADDR;
// Defaukt config values
#define DEFAULT_BROKER_PORT 1883
// We assume that the broker does not require authentication
/*---------------------------------------------------------------------------*/
// Defaukt config values
// We assume that the broker does not require authentication
/*---------------------------------------------------------------------------*/
/* Various states */
static uint8_t state;
#define STATE_INIT 0
#define STATE_NET_OK 1
#define STATE_CONNECTING 2
#define STATE_CONNECTED 3
#define STATE_SUBSCRIBED 4
#define STATE_DISCONNECTED 5
PROCESS(mqtt_client_example, "mqtt_node_example");
AUTOSTART_PROCESSES(&mqtt_client_example);
// Maximum TCP segment size for outgoing segments of our socket
#define MAX_TCP_SEGMENT_SIZE 32
#define CONFIG_IP_ADDR_STR_LEN 64
// Buffers for Client ID and Topics. Make sure they are large enough to hold the entire respective string
#define BUFFER_SIZE 64
//global variables to store body values
static int trestbps =0;
static int fbs =0;
static int restecg =0;
static int thalach =0;
// Global variables to store sensor values
static int xla = 0;
static int yla = 0;
static int zla = 0;
static int xaa = 0;
static int yaa = 0;
static int zaa = 0;
static int movements=0; //Initially set to 0 to generate movements values
static char client_id[BUFFER_SIZE];
static char pub_mov_topic[BUFFER_SIZE];
static char pub_bod_topic[BUFFER_SIZE];
// Periodic timer to check the state of the MQTT client
//Ad ogni secondo devono essere prese le misure del paziente
#define PUBLISHING_INTERVAL (1 * CLOCK_SECOND)
static struct etimer periodic_timer;
// The main MQTT buffers. We will need to increase if we start publishing more data.
#define APP_BUFFER_SIZE 512
static char app_buffer_mov[APP_BUFFER_SIZE];
static char app_buffer_bod[APP_BUFFER_SIZE];
// pointer to connection
static struct mqtt_connection conn;
//connection status
mqtt_status_t status;
static char broker_address[CONFIG_IP_ADDR_STR_LEN];
static button_hal_button_t *btn; //Pointer to the button
// check that the network is operational
static bool have_connectivity(void) {
if(uip_ds6_get_global(ADDR_PREFERRED) == NULL || uip_ds6_defrt_choose() == NULL) {
return false;
}
return true;
}
static void mqtt_event(struct mqtt_connection *m, mqtt_event_t event, void *data){
printf("Event occursn");
switch(event) {
case MQTT_EVENT_CONNECTED: {
printf("Application has a MQTT connectionn");
leds_on(1);
state = STATE_CONNECTED;
break;
}
case MQTT_EVENT_DISCONNECTED: {
printf("MQTT Disconnect. Reason %un", *((mqtt_event_t *)data));
leds_off(15);
state = STATE_DISCONNECTED;
process_poll(&mqtt_client_example);
break;
}
case MQTT_EVENT_PUBLISH: {
printf("Publishing..");
break;
}
case MQTT_EVENT_PUBACK: {
printf("Publishing complete.n");
break;
}
default:
printf("Application got a unhandled MQTT event: %in", event);
break;
}
}
/*
// Funzione per generare un numero casuale tra min e max
double random_double(double min, double max) {
return min + (double)rand() / RAND_MAX * (max - min);
}
// Funzione per generare le componenti dell'accelerazione
void generate_acceleration_components(int mov, double *xla, double *yla, double *zla, double *xaa, double *yaa, double *zaa) {
if (mov == 1) {
*xla = random_double(1.0, 2.0);
*yla = random_double(1.0, 2.0);
*zla = random_double(1.0, 2.0);
*xaa = random_double(50.0, 200.0);
*yaa = random_double(50.0, 200.0);
*zaa = random_double(50.0, 200.0);
} else {
*xla = random_double(6.0, 9.0);
*yla = random_double(6.0, 9.0);
*zla = random_double(6.0, 9.0);
*xaa = random_double(600.0, 900.0);
*yaa = random_double(600.0, 900.0);
*zaa = random_double(600.0, 900.0);
}
}
*/
double random_double(double min, double max) {
return min + ((double)rand() / ((double)RAND_MAX + 1)) * (max - min);
}
// Funzione per generare le componenti dell'accelerazione
void generate_acceleration_components(int mov, int *xla, int *yla, int *zla, int *xaa, int *yaa, int *zaa) {
if (mov == 1) {
*xla = (int)random_double(1.0, 4.0);
*yla = (int)random_double(1.0, 4.0);
*zla = (int)random_double(1.0, 4.0);
*xaa = (int)random_double(50.0, 200.0);
*yaa = (int)random_double(50.0, 200.0);
*zaa = (int)random_double(50.0, 200.0);
} else {
*xla = (int)random_double(6.0, 9.0);
*yla = (int)random_double(6.0, 9.0);
*zla = (int)random_double(6.0, 9.0);
*xaa = (int)random_double(600.0, 900.0);
*yaa = (int)random_double(600.0, 900.0);
*zaa = (int)random_double(600.0, 900.0);
}
}
void generate() {
srand(time(NULL)); // Inizializzazione del generatore di numeri casuali
// Genera le componenti dell'accelerazione in base al valore di movements
generate_acceleration_components(movements, &xla, &yla, &zla, &xaa, &yaa, &zaa);
}
int generate_random_body(int min, int max) {
return min + rand() % (max - min + 1);
}
//genera valori randomici per i dati presi dal cuore ecc
void generate_values_body(int mov, int *fbs, int *restecg, int *trestbps, int *thalach) {
if (mov == 1) {
*fbs = 1;
*restecg = 1;
*trestbps = generate_random_body(150, 300);
*thalach = generate_random_body(150, 300);
} else if (mov == 0) {
*fbs = 0;
*restecg = 0;
*trestbps = generate_random_body(100, 130);
*thalach = generate_random_body(100, 130);
} else {
printf("X deve essere 0 o 1n");
exit(1);
}
}
void generate_body(){
generate_values_body(movements, &fbs, &restecg, &trestbps, &thalach);
}
static void sensor_sample(){
if(state==STATE_INIT){
if(have_connectivity()==true)
state = STATE_NET_OK;
}
if(state == STATE_NET_OK){
// Connect to MQTT server
printf("Connecting!n");
memcpy(broker_address, broker_ip, strlen(broker_ip));
mqtt_connect(&conn, broker_address, DEFAULT_BROKER_PORT,
PUBLISHING_INTERVAL, MQTT_CLEAN_SESSION_ON);
state = STATE_CONNECTING;
}
if(state == STATE_CONNECTED){
// Publish something
//Due topic:
//patient_movement -> valori di movimenti xyz per accelerazione lineare ed angolare
//patient_values -> valori rilevati internamente
//PUBLISH MOVEMENT
generate(); //Creation of the movement values
sprintf(pub_mov_topic, "%s", "patient_movement");
//app_buffer_mov per l'app buffer dei movements values
//app_buffer_bod per l'app buffer dei body
//Movement app buffer
sprintf(app_buffer_mov, "{"app":"MedicalMonitoring"n"Patient_ID":1,n"xla": %d,n"yla": %d,n"zla": %d,n"xaa": %d,n"yaa": %d,n"zaa": %d,n"MAC": "%s"}", (int)xla,(int)yla,(int)zla,(int)xaa,(int)yaa,(int)zaa, client_id);
printf("Publishing: %s Topic:%sn", app_buffer_mov, pub_mov_topic);
//PUBLISH BODY
generate_body();
sprintf(pub_bod_topic, "%s", "patient_values");
//Body app buffer
sprintf(app_buffer_bod, "{"app":"MedicalMonitoring"n"Patient_ID":1,n"trestbps": %d,n"fbs": %d,n"restecg": %d,n"thalach": %d,n"MAC": "%s"}", trestbps,fbs,restecg,thalach, client_id);
printf("Publishing: %s Topic: %sn", app_buffer_bod, pub_bod_topic);
//MQTT publishing
mqtt_publish(&conn, NULL, pub_mov_topic, (uint8_t *)app_buffer_mov, strlen(app_buffer_mov), MQTT_QOS_LEVEL_0, MQTT_RETAIN_OFF);
mqtt_publish(&conn, NULL, pub_bod_topic, (uint8_t *)app_buffer_bod, strlen(app_buffer_bod), MQTT_QOS_LEVEL_0, MQTT_RETAIN_OFF);
} else if ( state == STATE_DISCONNECTED ){
LOG_ERR("Disconnected form MQTT broker. Trying to reconnect..n");
// Recover from error
state = STATE_INIT;
}
}
static void setXone(){
if(movements ==0){
movements=1;
printf("movements set to: %dn", movements);
leds_off(14);
leds_on(1 << movements);
}
}
static void setXzero(){
if(movements==1){
movements=0;
printf("movements set to: %dn", movements);
leds_off(14);
leds_on(1 << movements);
}
}
PROCESS_THREAD(mqtt_client_example, ev, data){
PROCESS_BEGIN();
printf("MQTT Client Processn");
// Initialize the ClientID as MAC address
snprintf(client_id, BUFFER_SIZE, "%02x%02x%02x%02x%02x%02x",
linkaddr_node_addr.u8[0], linkaddr_node_addr.u8[1],
linkaddr_node_addr.u8[2], linkaddr_node_addr.u8[5],
linkaddr_node_addr.u8[6], linkaddr_node_addr.u8[7]);
printf("MAC ADDRESS: %sn", client_id);
// Broker registration
mqtt_register(&conn, &mqtt_client_example, client_id, mqtt_event, MAX_TCP_SEGMENT_SIZE);
state = STATE_INIT;
// Initialize periodic timer to check the status
etimer_set(&periodic_timer, PUBLISHING_INTERVAL);
btn = button_hal_get_by_index(0); //Return the button of index0 since it's the only one button
/* Main loop */
while(1) {
PROCESS_YIELD();
if(ev == button_hal_press_event) {
setXone();
}
if(ev == button_hal_periodic_event) {
btn = (button_hal_button_t *)data;
if(btn->press_duration_seconds > 2) {
setXzero();
}
}
if((ev == PROCESS_EVENT_TIMER && data == &periodic_timer) || ev == PROCESS_EVENT_POLL){
sensor_sample();
etimer_set(&periodic_timer, PUBLISHING_INTERVAL);
}
}
PROCESS_END();
}
I then use, in Ubuntu terminal, the following commands
1st terminal:** mosquitto -v**
to launch mosquitto
2nd terminal: sudo mosquitto_sub -v -h localhost -p 1883 -t “patient_movement” -t “patient_values”
to subscribe to the 2 topics of the mqtt node.
But when i do the node cannot connect to Mosquitto, it returns a connection error and doesn’t connect.
IF i use the command line:
sudo mosquitto_sub -v -h localhost -p 1883 -t ‘#’
to subscribe to ALL topics then the node starts working.
However i need to connect mosquitto to those specific topics because i have a Java app that will search for those 2 topics, so if i cannot connect with mosquitto i’ll cannot do it either with my app.
All help is aprreciated!
I tried these 2 commands:
mosquitto_sub -v -h localhost -p 1883 -t ‘#’
sudo mosquitto_sub -v -h localhost -p 1883 -t “patient_movement” -t “patient_values”
but i need to subscribe only to the 2 topics specified, not to all topics, but when i use those 2 topic names the node won’t connect to mosquitto and i don’t know why