We are running a spring kafka stream binder based stateful application which has some stateful processing and uses internal rocksdb for local state store replication from kafka topic. The application was working fine when deployed in a PCF environment and using local disk for store state files.
But we are migrating the app to kubernetes and are trying to make it a statefulset deployment and using AWS EFS as a state store dir. We have a storage class defined and use PV/PVC to mount it to deployed POD.
The app seems to work fine for sometime and after a while (no defined frequency as yet, but 2 days to 2 weeks) it stops working and looking at the logs i see multiple issues, but may be the core issue is that one of the manifest file is missing resulting in below exception
Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.rocksdb.RocksDBException: While opening a file for sequentially reading: /mnt/data/appid/0_0/rocksdb/topic-STATE-STORE-0000000001/MANIFEST-000015: No such file or directory
at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-6.19.3.jar:]
at org.rocksdb.RocksDB.open(RocksDB.java:306) ~[rocksdbjni-6.19.3.jar:]
at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:75) ~[kafka-streams-3.0.0.jar:]
... 18 more
Wrapped by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store topic-STATE-STORE-0000000001 at location /mnt/data/appid/0_0/rocksdb/topic-STATE-STORE-0000000001
at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:183) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:250) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:75) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambdainit1(MeteredKeyValueStore.java:126) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:769) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:126) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:97) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:93) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:436) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-3.0.0.jar:]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) [kafka-streams-3.0.0.jar:]
A few other exceptions are noticed after this timeline, we do have multi instances configured,
Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.rocksdb.RocksDBException: While renaming a file to /mnt/data/rto-examiner-v3/0_2/rocksdb/cf_rto_aggregated-STATE-STORE-0000000001/CURRENT: /mnt/data/rto-examiner-v3/0_2/rocksdb/cf_rto_aggregated-STATE-STORE-0000000001/000001.dbtmp: No such file or directory at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-6.19.3.jar:] at org.rocksdb.RocksDB.open(RocksDB.java:306)
Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.rocksdb.RocksDBException: While lock file: /mnt/data/rto-examiner-v3/0_4/rocksdb/cf_rto_aggregated-STATE-STORE-0000000001/LOCK: Resource temporarily unavailable at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-6.19.3.jar:] at org.rocksdb.RocksDB.open(RocksDB.java:306) ~[rocksdbjni-6.19.3.jar:] at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB
All the above exceptions are around the same time. Once those errors are seen, the stream client is down and all the consumers are in perpetual never ending looking of joining the consumer group but never does and at times one instance does out of 6, but that too for a short time and keeps restarting. With logs similar to what we find in this link https://github.com/confluentinc/parallel-consumer/issues/646
We are on boot 2.6.x and Spring kafka streams 3.0. Kubernetes is AWS EKS and File System mounted is on AWS EFS. (SC/PV/PVC)
I do see a few issues surrounding missing manifest files but dont see any solution around it. Any pointers around on one to fix it or how to recover from it as way of workaround. The only way currently we do it change the consumer group name. But that means i have to manage offsets manually on such a change, which is cumbersome, so something programmatic to catch rocksdbexception and recreate state or cleanup state and make app to recreate it will help. Out bean processors are defined using spring functions topology.