I am starting learning golang and as example for training I choose dummy application to getting messages from mqtt broker (as subscriber to topic) and passing it to queue in RabbitMQ instance. Right now message is rigit.
Separately both clients are working but together not at all.
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
rabbitmq "github.com/streadway/amqp"
"os"
"os/signal"
"syscall"
)
var channel rabbitmq.Channel
var err01 error
func onMessageReceived(client mqtt.Client, message mqtt.Message) {
err01 = channel.Publish(
"", // exchange
"testing", // key
false, // mandatory
false, // immediate
rabbitmq.Publishing{
ContentType: "text/plain",
Body: []byte("Test Message"),
},
)
if err01 != nil {
panic(err01)
}
fmt.Printf("Received message: %s from topic: %sn", message.Payload(), message.Topic())
}
func main() {
////////////////////////////////////////////////////////////////
connection, err := rabbitmq.Dial("amqp://user:password@loclahost:5672/")
if err != nil {
panic(err)
}
defer connection.Close()
fmt.Println("Successfully connected to RabbitMQ instance")
channel, err := connection.Channel()
if err != nil {
panic(err)
}
defer channel.Close()
queue, err := channel.QueueDeclare(
"testing", // name
false, // durable
false, // auto delete
false, // exclusive
false, // no wait
nil, // args
)
if err != nil {
panic(err)
}
fmt.Println("Queue status:", queue)
err01 = channel.Publish(
"", // exchange
"testing", // key
false, // mandatory
false, // immediate
rabbitmq.Publishing{
ContentType: "text/plain",
Body: []byte("Test Message"),
},
)
if err01 != nil {
panic(err01)
}
////////////////////////////////////////////////////////////////
// mqtt topic
topic := "topic/test"
var broker = "192.168.0.104"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("mqttbroker")
opts.SetPassword("LoopEdge.123")
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(fmt.Sprintf("Error connecting to MQTT broker:", token.Error()))
}
if token := client.Subscribe(topic, 0, onMessageReceived); token.Wait() && token.Error() != nil {
panic(fmt.Sprintf("Error subscribing to topic:", token.Error()))
}
fmt.Println("Subscribed to topic:", topic)
// Wait for a signal to exit the program gracefully
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
client.Unsubscribe(topic)
client.Disconnect(250)
}
I received following errors.
I thing that something is wrong with channel variable, but not sure.
Successfully connected to RabbitMQ instance
Queue status: {testing 0 0}
Subscribed to topic: topic/test
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0xd0 pc=0x62a61d]
goroutine 23 [running]:
github.com/streadway/amqp.(*Channel).sendOpen(0x8d3220, {0x730790, 0xc0000a4360})
/home/go/pkg/mod/github.com/streadway/[email protected]/channel.go:227 +0x15d
github.com/streadway/amqp.(*Channel).send(0x674f80?, {0x730790?, 0xc0000a4360?})
/home/go/pkg/mod/github.com/streadway/[email protected]/channel.go:161 +0x35
github.com/streadway/amqp.(*Channel).Publish(0x8d3220, {0x0, 0x0}, {0x6b7133, _}, _, _, {0x0, {0x6b7ca0, 0xa}, ...})
/home/go/pkg/mod/github.com/streadway/[email protected]/channel.go:1334 +0x397
main.onMessageReceived({0x0?, 0x0?}, {0x7320d0, 0xc0000b81e0})
/home/GolangProjects/MqttClient/main.go:17 +0xf5
github.com/eclipse/paho%2emqtt%2egolang.(*router).matchAndDispatch.func2()
/home/go/pkg/mod/github.com/eclipse/[email protected]/router.go:218 +0x8f7
created by github.com/eclipse/paho%2emqtt%2egolang.(*router).matchAndDispatch in goroutine 20
/home/go/pkg/mod/github.com/eclipse/[email protected]/router.go:173 +0x211
exit status 2
Thank you for hints!