I am new to Pulsar and created a Golang application that tests the basic functionalities of Pulsar, as follows. It exposes a produce and consume endpoint which produce and consume message respectively. For Pulsar I followed Apache Pulsar website’s Quick start that deploys a standalone mini Pulsar cluster.
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/apache/pulsar-client-go/pulsaradmin"
)
var client pulsar.Client
func produceHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Admin setup...")
cfg := &pulsaradmin.Config{
WebServiceURL: "http://pulsar-broker.apisix.svc.cluster.local:8080",
BKWebServiceURL: "http://pulsar-bookie.apisix.svc.cluster.local:8080",
}
admin, err := pulsaradmin.NewClient(cfg)
if err != nil {
panic(err)
}
// Create a producer and send message
fmt.Fprintln(w, "producing message...")
option := pulsar.ProducerOptions{
Topic: "persistent://tenant/pulsar/test",
}
producer, err := client.CreateProducer(option)
if err != nil {
log.Fatal(err)
}
fmt.Fprintln(w, producer)
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Fprintln(w, "published")
producer.Close()
}
func consumeHandler(w http.ResponseWriter, r *http.Request) {
// Create a consumer and get message
fmt.Fprintln(w, "consuming message...")
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://tenant/pulsar/test",
SubscriptionName: "sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Fprintln(w, "Received message msgId: %#v -- content: '%sn", msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
consumer.Close()
}
func main() {
// Initialize a Go client
var error error
client, error = pulsar.NewClient(pulsar.ClientOptions{
URL: "http://pulsar-proxy.apisix.svc.cluster.local:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if error != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", error)
}
defer client.Close()
fmt.Println("Started server on :8081")
http.HandleFunc("/produce", produceHandler)
http.HandleFunc("/consume", consumeHandler)
http.ListenAndServe(":8081", nil)
}
I deployed it within the same cluster as Pulsar and is able to connect to it, but as the application goes to client.CreateProducer(option)
, it got stuck there and produces an error after a while
2024/07/18 17:45:49 Get "http://pulsar-proxy.apisix.svc.cluster.local:6650/admin/v2/persistent/tenant/pulsar/test/partitions?checkAllowAutoCreation=true": EOF
exit status 1
I thought that the topic associated with producer was somehow incorrect and tried different combinations of it (removing the “persistent://” prefix, adding the “partition” suffix as is produced when listing the topic names), but none of them works. As authentication is by default not enabled it shouldn’t be an issue. I also couldn’t use admin api to create tenant/namespace/topic but am able to list them, which is also strange. Any idea on what could be reflected by the error message?