As part of learning kafka streams I am trying to write a streaming application that sums up the amount in bank transactions. The bank transactions are read from a topic as json objects. I get a compilation error for the below code
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
initialBalance.put("balance", 0);
initialBalance.put("count", 0);
initialBalance.put("time", Instant.ofEpochMilli(0).toString());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, JsonNode> source = builder.stream("bank-transactions", Consumed.with(Serdes.String(), jsonSerde));
KTable<String, JsonNode> resultTable = source.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.aggregate(
() -> initialBalance,
(name, transaction, balance) -> calculateBalance(transaction, balance),
jsonSerde,
"balance-agg"
);
resultTable.toStream().to("bank-balance", Produced.with(Serdes.String(), jsonSerde));
The calculateBalance function is as follows:
private static JsonNode calculateBalance(JsonNode transaction, JsonNode balance) {
ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
newBalance.put("count", balance.get("count").asInt() + 1);
newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());
Instant balanceTime = Instant.parse(balance.get("time").asText());
Instant transactionTime = Instant.parse(transaction.get("time").asText());
newBalance.put("time", balanceTime.isBefore(transactionTime) ? transactionTime.toString() : balanceTime.toString());
return newBalance;
}
I get the following compiler error.
:42:17
java: no suitable method found for aggregate(()->initialBalance,(name,tran[…]ance),org.apache.kafka.common.serialization.Serde<com.fasterxml.jackson.databind.JsonNode>)
method org.apache.kafka.streams.kstream.KGroupedStream.aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator<? super java.lang.String,? super com.fasterxml.jackson.databind.JsonNode,VR>) is not applicable
(cannot infer type-variable(s) VR
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.KGroupedStream.aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator<? super java.lang.String,? super com.fasterxml.jackson.databind.JsonNode,VR>,org.apache.kafka.streams.kstream.Materialized<java.lang.String,VR,org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>) is not applicable
(cannot infer type-variable(s) VR
(argument mismatch; org.apache.kafka.common.serialization.Serde<com.fasterxml.jackson.databind.JsonNode> cannot be converted to org.apache.kafka.streams.kstream.Materialized<java.lang.String,VR,org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>))
method org.apache.kafka.streams.kstream.KGroupedStream.aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator<? super java.lang.String,? super com.fasterxml.jackson.databind.JsonNode,VR>,org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized<java.lang.String,VR,org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>) is not applicable
(cannot infer type-variable(s) VR
(actual and formal argument lists differ in length))
What am I doing wrong?