`So I have a KStream that that gets deserialized into a POJO like so –
public class Payment {
public String user_id;
public String state_code;
public String exchange_id;
}
And below is Global Ktable record looks like and which has translations for state_code
public class State{
public String field_name
public String source_value;
}
I want to join kstream with this global Ktable based on the value from Payment.State_code = State.field_name and there are no common keys in both.
I’m able to join them but not with values
Wrapper class –
public class PaymentStateWrapper{
public String Payment
public String State;
}
KStream<String, Payment> paymentStream =
builder.stream(
INCOMING_TOPIC,
Consumed.with(Serdes.String(), paymentStreamSerde)
);
GlobalKTable<String, State> stateStore =
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), stateSerde)
);
KStream<String, PaymentStateWrapper> stream = paymentStream.leftJoin(
stateStore,
(Key, Value) -> Value.state_code,
(paymentValue, StateValue) -> new PaymentStateJoiner()
);`
thecoder is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.