提交 29174f89 编写于 作者: T Tzu-Li (Gordon) Tai

[hotfix] Rename keySerializerConfigSnapshot to keySerializerSnapshot in...

[hotfix] Rename keySerializerConfigSnapshot to keySerializerSnapshot in KeyedBackendSerializationProxy

Remove "config" from the field names and getter name to reflect the new
abstraction TypeSerializerSnapshot.
上级 e03cea54
......@@ -64,7 +64,7 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
// TODO the keySerializer field should be removed, once all serializers have the restoreSerializer() method implemented
private TypeSerializer<K> keySerializer;
private TypeSerializerSnapshot<K> keySerializerConfigSnapshot;
private TypeSerializerSnapshot<K> keySerializerSnapshot;
private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
......@@ -82,7 +82,7 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
this.usingKeyGroupCompression = compression;
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
this.keySerializerSnapshot = Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
Preconditions.checkNotNull(stateMetaInfoSnapshots);
Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE);
......@@ -93,8 +93,8 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
return stateMetaInfoSnapshots;
}
public TypeSerializerSnapshot<K> getKeySerializerConfigSnapshot() {
return keySerializerConfigSnapshot;
public TypeSerializerSnapshot<K> getKeySerializerSnapshot() {
return keySerializerSnapshot;
}
public boolean isUsingKeyGroupCompression() {
......@@ -118,7 +118,7 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
// write the compression format used to write each key-group
out.writeBoolean(usingKeyGroupCompression);
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, keySerializerConfigSnapshot, keySerializer);
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, keySerializerSnapshot, keySerializer);
// write individual registered keyed state metainfos
out.writeShort(stateMetaInfoSnapshots.size());
......@@ -142,14 +142,14 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
// only starting from version 3, we have the key serializer and its config snapshot written
if (readVersion >= 6) {
this.keySerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
this.keySerializerSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
in, userCodeClassLoader, null);
} else if (readVersion >= 3) {
Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> keySerializerAndConfig =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0);
this.keySerializerConfigSnapshot = (TypeSerializerSnapshot<K>) keySerializerAndConfig.f1;
this.keySerializerSnapshot = (TypeSerializerSnapshot<K>) keySerializerAndConfig.f1;
} else {
this.keySerializerConfigSnapshot = new BackwardsCompatibleSerializerSnapshot<>(
this.keySerializerSnapshot = new BackwardsCompatibleSerializerSnapshot<>(
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true));
}
this.keySerializer = null;
......
......@@ -162,7 +162,7 @@ public abstract class StateSerializerProvider<T> {
// if we are not yet registered with a new serializer,
// we can just use the restore serializer to read / write the state.
return previousSchemaSerializer();
};
}
/**
* Gets the serializer that recognizes the previous serialization schema of the state.
......
......@@ -395,7 +395,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerConfigSnapshot());
checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
throw new StateMigrationException("The new key serializer must be compatible.");
}
......
......@@ -75,7 +75,7 @@ public class SerializationProxiesTest {
}
Assert.assertTrue(serializationProxy.isUsingKeyGroupCompression());
Assert.assertTrue(serializationProxy.getKeySerializerConfigSnapshot() instanceof IntSerializer.IntSerializerSnapshot);
Assert.assertTrue(serializationProxy.getKeySerializerSnapshot() instanceof IntSerializer.IntSerializerSnapshot);
assertEqualStateMetaInfoSnapshotsLists(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots());
}
......
......@@ -740,7 +740,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerConfigSnapshot());
rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
throw new StateMigrationException("The new key serializer must be compatible.");
}
......@@ -1291,7 +1291,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
TypeSerializerSchemaCompatibility<T> keySerializerSchemaCompat =
stateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerConfigSnapshot());
stateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
throw new StateMigrationException("The new key serializer must be compatible.");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册