From 5e6664dfea9668c178047ebfc782278a176afaaf Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Wed, 5 Dec 2018 16:29:52 +0800 Subject: [PATCH] [FLINK-11073] [core] Let MapSerializerSnapshot be a CompositeTypeSerializerSnapshot --- .../common/typeutils/base/MapSerializer.java | 2 +- .../base/MapSerializerConfigSnapshot.java | 3 +- .../typeutils/base/MapSerializerSnapshot.java | 60 +++++-------------- 3 files changed, 18 insertions(+), 47 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java index dd3b81bade7..bedaf693608 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java @@ -202,6 +202,6 @@ public final class MapSerializer extends TypeSerializer> { @Override public TypeSerializerSnapshot> snapshotConfiguration() { - return new MapSerializerSnapshot<>(keySerializer, valueSerializer); + return new MapSerializerSnapshot<>(this); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java index 000924f0245..2b78b527f7e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java @@ -50,8 +50,7 @@ public final class MapSerializerConfigSnapshot extends CompositeTypeSerial // redirect the compatibility check to the new MapSerializerConfigSnapshot MapSerializer mapSerializer = (MapSerializer) newSerializer; - MapSerializerSnapshot mapSerializerSnapshot = - new MapSerializerSnapshot<>(mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer()); + MapSerializerSnapshot mapSerializerSnapshot = new MapSerializerSnapshot<>(mapSerializer); return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer); } else { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java index be2e4b0cbe6..a6db0ef74e6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java @@ -18,78 +18,50 @@ package org.apache.flink.api.common.typeutils.base; -import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.Preconditions; -import java.io.IOException; import java.util.Map; -import static org.apache.flink.util.Preconditions.checkState; - /** * Snapshot class for the {@link MapSerializer}. */ -public class MapSerializerSnapshot implements TypeSerializerSnapshot> { +public class MapSerializerSnapshot extends CompositeTypeSerializerSnapshot, MapSerializer> { private static final int CURRENT_VERSION = 1; - private CompositeSerializerSnapshot nestedKeyValueSerializerSnapshot; - /** * Constructor for read instantiation. */ - public MapSerializerSnapshot() {} + public MapSerializerSnapshot() { + super(MapSerializer.class); + } /** * Constructor to create the snapshot for writing. */ - public MapSerializerSnapshot(TypeSerializer keySerializer, TypeSerializer valueSerializer) { - Preconditions.checkNotNull(keySerializer); - Preconditions.checkNotNull(valueSerializer); - this.nestedKeyValueSerializerSnapshot = new CompositeSerializerSnapshot(keySerializer, valueSerializer); + public MapSerializerSnapshot(MapSerializer mapSerializer) { + super(mapSerializer); } @Override - public int getCurrentVersion() { + public int getCurrentOuterSnapshotVersion() { return CURRENT_VERSION; } @Override - public TypeSerializer> restoreSerializer() { - return new MapSerializer<>( - nestedKeyValueSerializerSnapshot.getRestoreSerializer(0), - nestedKeyValueSerializerSnapshot.getRestoreSerializer(1)); - } + protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer keySerializer = (TypeSerializer) nestedSerializers[0]; - @Override - public TypeSerializerSchemaCompatibility> resolveSchemaCompatibility(TypeSerializer> newSerializer) { - checkState(nestedKeyValueSerializerSnapshot != null); + @SuppressWarnings("unchecked") + TypeSerializer valueSerializer = (TypeSerializer) nestedSerializers[1]; - if (newSerializer instanceof MapSerializer) { - MapSerializer serializer = (MapSerializer) newSerializer; - - return nestedKeyValueSerializerSnapshot.resolveCompatibilityWithNested( - TypeSerializerSchemaCompatibility.compatibleAsIs(), - serializer.getKeySerializer(), - serializer.getValueSerializer()); - } - else { - return TypeSerializerSchemaCompatibility.incompatible(); - } - } - - @Override - public void writeSnapshot(DataOutputView out) throws IOException { - nestedKeyValueSerializerSnapshot.writeCompositeSnapshot(out); + return new MapSerializer<>(keySerializer, valueSerializer); } @Override - public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { - this.nestedKeyValueSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader); + protected TypeSerializer[] getNestedSerializers(MapSerializer outerSerializer) { + return new TypeSerializer[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() }; } } -- GitLab