I want to distribute kafka messages with the same key equally among 2 partitions. I have
the following code.
<code>import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class MessageBalancePartitioner implements Partitioner{
private int partition=0;
@Override
public void configure(Map<String, ?> arg0) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueInBytes, Cluster cluster) {
if(partition==0) {
partition=1;
}
else {
partition=0;
}
return partition;
}
}
</code>
<code>import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class MessageBalancePartitioner implements Partitioner{
private int partition=0;
@Override
public void configure(Map<String, ?> arg0) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueInBytes, Cluster cluster) {
if(partition==0) {
partition=1;
}
else {
partition=0;
}
return partition;
}
}
</code>
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class MessageBalancePartitioner implements Partitioner{
private int partition=0;
@Override
public void configure(Map<String, ?> arg0) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueInBytes, Cluster cluster) {
if(partition==0) {
partition=1;
}
else {
partition=0;
}
return partition;
}
}
Sender code
<code>import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public class SenderWithMessageBalancePartitioner {
public static void main(String[] args) {
Properties props=new Properties();
//kafka server url
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MessageBalancePartitioner.class.getName());
KafkaProducer<String, String> producer=new KafkaProducer<>(props);
String topic="x-topic";
for(int i=1;i<=5;i++) {
ProducerRecord<String, String> record=new ProducerRecord<>(topic,
"first-key","This is test message "+i
);
producer.send(record,new Callback() {
@Override
public void onCompletion(RecordMetadata rmd, Exception ex) {
// TODO Auto-generated method stub
if(ex==null) {
System.out.println("message published at partition: "+rmd.partition());
}
}
});
}
producer.close();
System.out.println("messages sent");
}
}
</code>
<code>import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public class SenderWithMessageBalancePartitioner {
public static void main(String[] args) {
Properties props=new Properties();
//kafka server url
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MessageBalancePartitioner.class.getName());
KafkaProducer<String, String> producer=new KafkaProducer<>(props);
String topic="x-topic";
for(int i=1;i<=5;i++) {
ProducerRecord<String, String> record=new ProducerRecord<>(topic,
"first-key","This is test message "+i
);
producer.send(record,new Callback() {
@Override
public void onCompletion(RecordMetadata rmd, Exception ex) {
// TODO Auto-generated method stub
if(ex==null) {
System.out.println("message published at partition: "+rmd.partition());
}
}
});
}
producer.close();
System.out.println("messages sent");
}
}
</code>
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public class SenderWithMessageBalancePartitioner {
public static void main(String[] args) {
Properties props=new Properties();
//kafka server url
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MessageBalancePartitioner.class.getName());
KafkaProducer<String, String> producer=new KafkaProducer<>(props);
String topic="x-topic";
for(int i=1;i<=5;i++) {
ProducerRecord<String, String> record=new ProducerRecord<>(topic,
"first-key","This is test message "+i
);
producer.send(record,new Callback() {
@Override
public void onCompletion(RecordMetadata rmd, Exception ex) {
// TODO Auto-generated method stub
if(ex==null) {
System.out.println("message published at partition: "+rmd.partition());
}
}
});
}
producer.close();
System.out.println("messages sent");
}
}
It is not working as expected. It publishes the message always to partition 0.
x-topic has been created with 2 partions.
What is the problem with this? Thank you.
The custom partitioner is not honoured by the message sender.
New contributor
P Subramanian is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.