- I have kafka cluster with 3 brokers inside kubernetes cluster.
- Empty topic with 1 partition created manualy via kafka ui app
- .NET consumer config (nuget package Confluent.Kafka Version=”2.5.1″)
var config = new ConsumerConfig
{
BootstrapServers = _streamProviderOptions.BootstrapServers,
GroupId = $"consumer-{_topicName}",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
EnablePartitionEof = true,
ClientId = $"consumer-client-{_topicName}",
IsolationLevel = IsolationLevel.ReadCommitted
};
consumer.Assign(_topicPartition);
- Consume code (inside hangfire background job)
ConsumeResult<string, TestModel>? testModel = null;
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult.IsPartitionEOF)
{
if (testModel is not null)
{
var prevLastOffset = new TopicPartitionOffset(_topicPartition, new Offset(testModel .Offset - 1));
consumer.Commit([prevLastOffset]);
}
break;
}
if (consumeResult.Message is not null)
{
testModel = consumeResult;
}
}
When I run the code var consumeResult = consumer.Consume(cancellationToken);
hangs and does not return a result, despite topic is created and all brokers available.
Used different replica factors from 1 to 3
Logs from kafka:
%7|1723651842.028|FETCH|consumer-client-my-topic#consumer-2| [thrd:kafka-0.kafka.confluent.svc.cluster.local:9071/0]: kafka-0.kafka.confluent.svc.cluster.local:9071/0: Topic my-topic [0] MessageSet size 0, error "Broker: Unknown topic id", MaxOffset -1, LSO -1, Ver 2/2
%7|1723651842.028|FETCHERR|consumer-client-my-topic#consumer-2| [thrd:kafka-0.kafka.confluent.svc.cluster.local:9071/0]: kafka-0.kafka.confluent.svc.cluster.local:9071/0: my-topic [0]: Fetch failed at offset 0 (leader epoch 1): UNKNOWN_TOPIC_ID
%7|1723651842.028|BROKERUA|consumer-client-my-topic#consumer-2| [thrd:kafka-0.kafka.confluent.svc.cluster.local:9071/0]: my-topic [0]: broker unavailable: fetch: Broker: Unknown topic id
Expected result – returned ConsumeResult where consumeResult.IsPartitionEOF = true and consumeResult.Message is null
Locally from docker compose with single broker everything works fine.