I have a Flink application with:
**3 parallelism on KafkaSource;**
**9 parallelism on the other operators.**
** Two Kafka sources with 3 partitions each.**
I have this use case where i’m getting some problems :
Two topics with 3 partitions each, and all the messages are correctly distributed using the ID of the message (messages with the same ID are in the same partition on both topics, message with ID =1 are on partition one in both topics).
When i consume messages and use keyBy(ID) everything works fine with our stateful operations.
Now i create a snapshot.
Let’s assume, with the same previous configurations, i add a new source with 10 partitions. Here, the same message(ID=1) is on partition 7.
When I start the application from snapshot Flink will detected new data on the new source and will processed. The problem is on the keyBy function, because sometimes I don’t have the state created from the previous snapshot, so I lost my stateful information regarding this message.
Is this behaviour corrected, once i’m using keyBy(ID)?
Thanks for the explanation