From 301be5552290788864676b131e09741f769ef471 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Mon, 7 Jan 2019 12:50:27 +0100 Subject: [PATCH] [FLINK-11073] [core] Replace EitherSerializerSnapshot with new JavaEitherSerializerSnapshot --- .../typeutils/runtime/EitherSerializer.java | 4 +- .../runtime/EitherSerializerSnapshot.java | 12 ++-- .../runtime/JavaEitherSerializerSnapshot.java | 61 +++++++++++++++++++ ...teTypeSerializerSnapshotMigrationTest.java | 4 +- 4 files changed, 72 insertions(+), 9 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java index 3d4e8e92762..01286400694 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java @@ -205,7 +205,7 @@ public class EitherSerializer extends TypeSerializer> { // ------------------------------------------------------------------------ @Override - public EitherSerializerSnapshot snapshotConfiguration() { - return new EitherSerializerSnapshot<>(leftSerializer, rightSerializer); + public JavaEitherSerializerSnapshot snapshotConfiguration() { + return new JavaEitherSerializerSnapshot<>(this); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java index 016fd0430f7..3b7a8e785ef 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java @@ -35,8 +35,12 @@ import static org.apache.flink.util.Preconditions.checkState; /** * Configuration snapshot for the {@link EitherSerializer}. + * + * @deprecated this snapshot class is no longer used by any serializers. + * Instead, {@link JavaEitherSerializerSnapshot} is used. */ @Internal +@Deprecated public final class EitherSerializerSnapshot implements TypeSerializerSnapshot> { private static final int CURRENT_VERSION = 2; @@ -110,12 +114,10 @@ public final class EitherSerializerSnapshot implements TypeSerializerSnaps checkState(nestedSnapshot != null); if (newSerializer instanceof EitherSerializer) { + // delegate compatibility check to the new snapshot class EitherSerializer serializer = (EitherSerializer) newSerializer; - - return nestedSnapshot.resolveCompatibilityWithNested( - TypeSerializerSchemaCompatibility.compatibleAsIs(), - serializer.getLeftSerializer(), - serializer.getRightSerializer()); + JavaEitherSerializerSnapshot newSnapshot = new JavaEitherSerializerSnapshot<>(serializer); + return newSnapshot.resolveSchemaCompatibility(serializer); } else { return TypeSerializerSchemaCompatibility.incompatible(); diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java new file mode 100644 index 00000000000..503634599f8 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.Either; + +/** + * Snapshot class for the {@link EitherSerializer}. + */ +public class JavaEitherSerializerSnapshot extends CompositeTypeSerializerSnapshot, EitherSerializer> { + + private static final int CURRENT_VERSION = 1; + + /** + * Constructor for read instantiation. + */ + @SuppressWarnings("unused") + public JavaEitherSerializerSnapshot() { + super(EitherSerializer.class); + } + + /** + * Constructor to create the snapshot for writing. + */ + public JavaEitherSerializerSnapshot(EitherSerializer eitherSerializer) { + super(eitherSerializer); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return CURRENT_VERSION; + } + + @Override + protected EitherSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + return new EitherSerializer<>(nestedSerializers[0], nestedSerializers[1]); + } + + @Override + protected TypeSerializer[] getNestedSerializers(EitherSerializer outerSerializer) { + return new TypeSerializer[]{ outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java index c6b49a4839e..62135d7e550 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; -import org.apache.flink.api.java.typeutils.runtime.EitherSerializerSnapshot; +import org.apache.flink.api.java.typeutils.runtime.JavaEitherSerializerSnapshot; import org.apache.flink.types.Either; import org.junit.runner.RunWith; @@ -48,7 +48,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer // Either - final TestSpecification> either = TestSpecification.>builder("1.6-either", EitherSerializer.class, EitherSerializerSnapshot.class) + final TestSpecification> either = TestSpecification.>builder("1.6-either", EitherSerializer.class, JavaEitherSerializerSnapshot.class) .withSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE)) .withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot") .withTestData("flink-1.6-either-type-serializer-data", 10); -- GitLab