[hotfix] Restore KeySerializer only once

上级 9a64d50f
...@@ -362,6 +362,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ...@@ -362,6 +362,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int numRegisteredKvStates = 0; int numRegisteredKvStates = 0;
stateTables.clear(); stateTables.clear();
boolean keySerializerRestored = false;
for (KeyedStateHandle keyedStateHandle : state) { for (KeyedStateHandle keyedStateHandle : state) {
if (keyedStateHandle == null) { if (keyedStateHandle == null) {
...@@ -386,20 +388,24 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ...@@ -386,20 +388,24 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.read(inView); serializationProxy.read(inView);
// check for key serializer compatibility; this also reconfigures the if (!keySerializerRestored) {
// key serializer to be compatible, if it is required and is possible // check for key serializer compatibility; this also reconfigures the
if (StateMigrationUtil.resolveCompatibilityResult( // key serializer to be compatible, if it is required and is possible
if (StateMigrationUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(), serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(), serializationProxy.getKeySerializerConfigSnapshot(),
(TypeSerializer) keySerializer) (TypeSerializer) keySerializer)
.isRequiresMigration()) { .isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration // TODO replace with state migration; note that key hash codes need to remain the same after migration
throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
"Aborting now since state migration is currently not available"); "Aborting now since state migration is currently not available");
} }
keySerializerRestored = true;
}
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots(); serializationProxy.getStateMetaInfoSnapshots();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册