I am trying to build a simple kafka streams application that just needs to process a record (Assume it takes around 1-2 seconds) and then write it to an output topic, I have come up with the following, while the CompleteableFuture processes the records as expected (I am using this as to process the records concurrently as all the records are independent of each other). However, I unable to see anything on the output topic.
I also want to know if this is a good practice to use kafka streams this way or not, here’s the code I’ve written so far
package org.example;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.processor.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-processor-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
String inputTopic = "input-topic";
String outputTopic = "output-topic";
Topology topology = new Topology();
topology.addSource("kafkaStreamSourceName", inputTopic)
.addProcessor("PrintProcessor", PrintProcessor::new, "kafkaStreamSourceName")
.addSink("Sink", outputTopic, Serdes.String().serializer(), Serdes.String().serializer(), "PrintProcessor");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
static class PrintProcessor implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
System.out.println("start processing "+key+" : "+value);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Processed record - Key: " + key + ", Value: " + value);
});
future.thenAccept((Void) -> {
context.forward(key, value);
});
}
@Override
public void close() {
}
}
}
Thanks in Advance