提交 5e6664df 编写于 作者: T Tzu-Li (Gordon) Tai

[FLINK-11073] [core] Let MapSerializerSnapshot be a CompositeTypeSerializerSnapshot

上级 faf093d3
...@@ -202,6 +202,6 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> { ...@@ -202,6 +202,6 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
@Override @Override
public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() { public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() {
return new MapSerializerSnapshot<>(keySerializer, valueSerializer); return new MapSerializerSnapshot<>(this);
} }
} }
...@@ -50,8 +50,7 @@ public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerial ...@@ -50,8 +50,7 @@ public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerial
// redirect the compatibility check to the new MapSerializerConfigSnapshot // redirect the compatibility check to the new MapSerializerConfigSnapshot
MapSerializer<K, V> mapSerializer = (MapSerializer<K, V>) newSerializer; MapSerializer<K, V> mapSerializer = (MapSerializer<K, V>) newSerializer;
MapSerializerSnapshot<K, V> mapSerializerSnapshot = MapSerializerSnapshot<K, V> mapSerializerSnapshot = new MapSerializerSnapshot<>(mapSerializer);
new MapSerializerSnapshot<>(mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer());
return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer); return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
} }
else { else {
......
...@@ -18,78 +18,50 @@ ...@@ -18,78 +18,50 @@
package org.apache.flink.api.common.typeutils.base; package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import static org.apache.flink.util.Preconditions.checkState;
/** /**
* Snapshot class for the {@link MapSerializer}. * Snapshot class for the {@link MapSerializer}.
*/ */
public class MapSerializerSnapshot<K, V> implements TypeSerializerSnapshot<Map<K, V>> { public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {
private static final int CURRENT_VERSION = 1; private static final int CURRENT_VERSION = 1;
private CompositeSerializerSnapshot nestedKeyValueSerializerSnapshot;
/** /**
* Constructor for read instantiation. * Constructor for read instantiation.
*/ */
public MapSerializerSnapshot() {} public MapSerializerSnapshot() {
super(MapSerializer.class);
}
/** /**
* Constructor to create the snapshot for writing. * Constructor to create the snapshot for writing.
*/ */
public MapSerializerSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) { public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
Preconditions.checkNotNull(keySerializer); super(mapSerializer);
Preconditions.checkNotNull(valueSerializer);
this.nestedKeyValueSerializerSnapshot = new CompositeSerializerSnapshot(keySerializer, valueSerializer);
} }
@Override @Override
public int getCurrentVersion() { public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION; return CURRENT_VERSION;
} }
@Override @Override
public TypeSerializer<Map<K, V>> restoreSerializer() { protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
return new MapSerializer<>( @SuppressWarnings("unchecked")
nestedKeyValueSerializerSnapshot.getRestoreSerializer(0), TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
nestedKeyValueSerializerSnapshot.getRestoreSerializer(1));
}
@Override @SuppressWarnings("unchecked")
public TypeSerializerSchemaCompatibility<Map<K, V>> resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) { TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
checkState(nestedKeyValueSerializerSnapshot != null);
if (newSerializer instanceof MapSerializer) { return new MapSerializer<>(keySerializer, valueSerializer);
MapSerializer<K, V> serializer = (MapSerializer<K, V>) newSerializer;
return nestedKeyValueSerializerSnapshot.resolveCompatibilityWithNested(
TypeSerializerSchemaCompatibility.compatibleAsIs(),
serializer.getKeySerializer(),
serializer.getValueSerializer());
}
else {
return TypeSerializerSchemaCompatibility.incompatible();
}
}
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
nestedKeyValueSerializerSnapshot.writeCompositeSnapshot(out);
} }
@Override @Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) {
this.nestedKeyValueSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader); return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册