According to documentation, fetch.max.bytes and max.partition.fetch.bytes should limit the maximum size of a message consumed by Kafka client.
I set “max.poll.records” to “1” because documentation states that first batch is not limited in size. I set “fetch.max.bytes” and “max.partition.fetch.bytes” to “1”. I use default broker and topic properties.
When large messages are produced, they are successfully consumed despite the documentation. I assume that if a message is out of size, the broker should throw an exception or such message should be omitted. Instead of this consumer receives and processes large messages.
I have tried to run the following code.
package io.conduktor.demos.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);
public static void main(String[] args) {
log.info("I am a Kafka Consumer");
String bootstrapServers = "127.0.0.1:9092";
String groupId = "my-fifth-application";
String topic = "demo_java";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1");
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "1");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
try {
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));
// poll for new data
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
} catch (WakeupException e) {
log.info("Wake up exception!");
// we ignore this as this is an expected exception when closing a consumer
} catch (Exception e) {
log.error("Unexpected exception", e);
} finally {
consumer.close(); // this will also commit the offsets if need be.
log.info("The consumer is now gracefully closed.");
}
}
}
And send a big message with kafka-console-producer.
Zuckerman is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.