I have a Flink 1.15 application running on AWS KDA (Managed Apache FLink). The application is being subjected to Snapshot (savepoint) on update. I got this exception (at the bottom of the message) I think because I moved a wrapped class from a ValueState (and therefore subjected to savepoint).
threadName (7eedaf4db54f14ae5b70ce55bfe409f0) switched from INITIALIZING to FAILED with failure cause: java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_5856e2d12487eeccb905e376fa806891_(10/32) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:484)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.IOException: Could not find class 'old.class.path' in classpath.
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:775)
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:750)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.readTypeClass(KryoSerializerSnapshotData.java:186)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.createFrom(KryoSerializerSnapshotData.java:69)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.readSnapshot(KryoSerializerSnapshot.java:81)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.readMetaData(FullSnapshotRestoreOperation.java:194)
at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.restoreKeyGroupsInStateHandle(FullSnapshotRestoreOperation.java:171)
at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.access$100(FullSnapshotRestoreOperation.java:113)
at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation$1.next(FullSnapshotRestoreOperation.java:158)
at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation$1.next(FullSnapshotRestoreOperation.java:140)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:102)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:315)
... 18 more
Caused by: java.lang.ClassNotFoundException: old.class.path
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:68)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:52)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:177)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:773)
... 38 more
I did a class shift of the type ‘old.class.path.MyClass’ to ‘new.class.path.MyClass’.
In another Class I have:
public class Operator extends KeyedProcessFunction<String, OperatorInput, OperatorOutput> {
private ValueState<MyClass> myClassState;
public void open(Configuration parameters) {
ValueStateDescriptor<MyClass> myClassCacheDescriptor = new ValueStateDescriptor<>("State", MyClass.class);
myClassCacheDescriptor .enableTimeToLive(ttlConfig);
this.myClassState= getRuntimeContext().getState(myClassCacheDescriptor);
}
}
How can this migration be done while keeping the state submitted to Savepoint?