I’m building a url shortener that shorten the url and redirect the shortened url to the original one and also calculating metrics for each shortened url I came up with the idea of using kafka for the requests that come for the url and extract aggregates from requests metadata that is produced as message to kafka topic and then store/update the aggregates in the database (mysql).
this are the data in the kafka message and the key is the short-url that got the request,
the aggregates I need to store are (count of requests, count requests per reigon, count requests per continent).
I have setuped the consumer and the producer (they’re working fine) using confluent-go and here are snippets of consumer and producer
Consumer:
func InitializeConsumer() error {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": os.Getenv("KAFKA_SERVER"),
"security.protocol": os.Getenv("KAFKA_PROTOCOL"),
"sasl.mechanisms": os.Getenv("KAFKA_SASL_MECHANISM"),
"sasl.username": os.Getenv("KAFKA_USERNAME"),
"sasl.password": os.Getenv("KAFKA_PASSWORD"),
"group.id": KafkaGroupId,
"auto.offset.reset": "earliest",
})
if err != nil {
return err
}
ConsumerClient = c
topic := KafkaTopic
err = c.SubscribeTopics([]string{topic}, nil)
if err != nil {
return err
}
for {
msg, err := c.ReadMessage(-1)
if err == nil {
var requestData models.RequestData
err = json.Unmarshal(msg.Value, &requestData)
if err != nil {
log.Printf("Error decoding message: %vn", err)
continue
}
log.Printf("Received Request: %+vn", requestData)
log.Printf("Request url key: %+vn", string(msg.Key))
} else {
log.Printf("Error: %vn", err)
}
}
}
Producer :
func ProduceMessage(key string, msg models.RequestData) error {
value, err := json.Marshal(msg)
if err != nil {
log.Printf("error marshalling message :%s", err)
return err
}
topic := KafkaTopic
err = ProducerClient.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: value,
Key: []byte(key),
}, nil)
if err != nil {
return err
}
return nil
}
I tried to use kafka streams but seems there is no kafka streams for go in confluent.