I purposely force the consumer to fail consuming the message (5th
) in the handler. When I restart the consumer, I don’t see any attempt to consume failed message although I didn’t call session.MarkMessage(msg, "")
at the end for that message perviously. Technically the message is lost now (not visible to the consumer).
Is this a configuration issue in Sarama or something else please? Code is mostly from the https://github.com/IBM/sarama/blob/main/examples/consumergroup/main.go).
Using: Kafka v3.3.2 and Sarama v1.43.2
package kafka
import (
"context"
"errors"
"log"
"time"
"github.com/IBM/sarama"
)
type Kafka struct {
brokers []string
config *sarama.Config
}
func New(brokers []string) Kafka {
cfg := sarama.NewConfig()
cfg.Consumer.Return.Errors = true
cfg.Consumer.Retry.Backoff = time.Second * 3
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
return Kafka{
brokers: brokers,
config: cfg,
}
}
func (k Kafka) ConsumerGroup(ctx context.Context, topic string) error {
group, err := sarama.NewConsumerGroup(k.brokers, topic + "-group", k.config)
if err != nil {
return err
}
defer group.Close()
go func() {
for err := range group.Errors() {
log.Printf("error in consumer group: %s: %sn", err)
}
}()
for {
err := group.Consume(ctx, []string{topic}, handler{})
switch {
case errors.Is(err, sarama.ErrClosedConsumerGroup):
return nil
case err != nil:
return err
case ctx.Err() != nil:
return ctx.Err()
}
}
}
type handler struct {}
func (handler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (handler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case <-session.Context().Done():
return nil
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
// Purposely simulating error.
if string(msg.Key) == "5th" {
log.Println("failed to consume message '5th'")
continue
}
log.Println("successfully consumed message")
session.MarkMessage(msg, "")
}
}
}