From 9d8a2e2607f949f755d1fb5934d55c164f536d27 Mon Sep 17 00:00:00 2001 From: Igal Shilman Date: Sat, 9 Feb 2019 21:11:06 +0100 Subject: [PATCH] [FLINK-11323] Migrate WritableSerializer to use new serialization compatibility abstractions --- .../typeutils/runtime/WritableSerializer.java | 46 ++++++++- .../WritableSerializerMigrationTest.java | 88 ++++++++++++++++++ .../flink-1.6-writeable-serializer-data | Bin 0 -> 150 bytes .../flink-1.6-writeable-serializer-snapshot | Bin 0 -> 506 bytes .../flink-1.7-writeable-serializer-data | Bin 0 -> 150 bytes .../flink-1.7-writeable-serializer-snapshot | Bin 0 -> 494 bytes 6 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerMigrationTest.java create mode 100644 flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.6-writeable-serializer-data create mode 100644 flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.6-writeable-serializer-snapshot create mode 100644 flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.7-writeable-serializer-data create mode 100644 flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.7-writeable-serializer-snapshot diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java index d6d8e2786a0..4f7b13e329c 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -21,8 +21,11 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.GenericTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.InstantiationUtil; @@ -157,8 +160,8 @@ public final class WritableSerializer extends TypeSerializer // -------------------------------------------------------------------------------------------- @Override - public WritableSerializerConfigSnapshot snapshotConfiguration() { - return new WritableSerializerConfigSnapshot<>(typeClass); + public TypeSerializerSnapshot snapshotConfiguration() { + return new WritableSerializerSnapshot<>(typeClass); } @Override @@ -176,6 +179,7 @@ public final class WritableSerializer extends TypeSerializer * The config snapshot for this serializer. * @param */ + @Deprecated public static final class WritableSerializerConfigSnapshot extends GenericTypeSerializerConfigSnapshot { @@ -192,5 +196,43 @@ public final class WritableSerializer extends TypeSerializer public int getVersion() { return VERSION; } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + return new WritableSerializerSnapshot<>(getTypeClass()) + .resolveSchemaCompatibility(newSerializer); + } } + + /** + * {@link WritableSerializer} snapshot class. + */ + public static final class WritableSerializerSnapshot + extends GenericTypeSerializerSnapshot { + + @SuppressWarnings("unused") + public WritableSerializerSnapshot() { + } + + WritableSerializerSnapshot(Class typeClass) { + super(typeClass); + } + + @Override + protected TypeSerializer createSerializer(Class typeClass) { + return new WritableSerializer<>(typeClass); + } + + @SuppressWarnings("unchecked") + @Override + protected Class getTypeClass(WritableSerializer serializer) { + return serializer.typeClass; + } + + @Override + protected Class serializerClass() { + return WritableSerializer.class; + } + } + } diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerMigrationTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerMigrationTest.java new file mode 100644 index 00000000000..8dc7e89d392 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerMigrationTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.java.typeutils.runtime.WritableSerializer.WritableSerializerSnapshot; +import org.apache.flink.api.java.typeutils.runtime.WritableSerializerMigrationTest.WritableName; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.apache.hadoop.io.Writable; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; + +/** + * State migration test for {@link RowSerializer}. + */ +@RunWith(Parameterized.class) +public class WritableSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase { + + public WritableSerializerMigrationTest(TestSpecification testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "writeable-serializer", + WritableSerializer.class, + WritableSerializerSnapshot.class, + () -> new WritableSerializer<>(WritableName.class)); + + return testSpecifications.get(); + } + + /** + * A dummy class that is used in this test. + */ + public static final class WritableName implements Writable { + + public static final long serialVersionUID = 1L; + + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(name); + } + + @Override + public void readFields(DataInput in) throws IOException { + name = in.readUTF(); + } + } + +} diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.6-writeable-serializer-data b/flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.6-writeable-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..c6bd30c5b935999e6494b6958726356b0421229a GIT binary patch literal 150 zcmYk!!4bef2tv_qAiao+O8<@Jl(@$?pW!@H47!r4%=3}-wpx}ny&SN8DNN#L8QQc2 Qh^sm*_o~8D*El583kFadc>n+a literal 0 HcmV?d00001 diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.6-writeable-serializer-snapshot b/flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.6-writeable-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..268534f97651c13209b6d3b9d5eb03ee474cc0f7 GIT binary patch literal 506 zcmcJKJqiLb5Jo40g{3F35$s}PC)n7oHVWFwC~+OlCSej~^-P{j@D66z-(m|vOp(kC z%zOC&KtOp!Czh93#$L3}SEzf2f#p=9!_`K~wI;KR4aqGYP==#Z`#{=D-kU&~k~5}& zORXk~2{oM*-^n0qAQEU48$*rW2WTb*qpQ)R-E!2=8G4nnmCZ71K6KNFsZv1h4h-{3 x$k#s}QOX#|nyK=k{Y*5*Og*v`PwwI+VDX#kMSq7GTC=8fV$k`G4W;C7d2YC!spbFx literal 0 HcmV?d00001 diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.7-writeable-serializer-data b/flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.7-writeable-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..e857bedb027c1b8a2c27e2d5470c6b4c484daf8d GIT binary patch literal 150 zcmYky!2y6U3_?*CaF@_R682wAFPa`7{Oj(FKyruexIR+vxQC?#hF(qmR8(VPt)e(D Q(S5wi^1%Y!6F1X%0N9Bd`2YX_ literal 0 HcmV?d00001 diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.7-writeable-serializer-snapshot b/flink-connectors/flink-hadoop-compatibility/src/test/resources/flink-1.7-writeable-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..b54029bc135ea8adc4f37a7ab5ebbe200758fed5 GIT binary patch literal 494 zcmcJLu?hk)42E-pgQHL2;@n%@1Q%CzIYBpxQqO3wEonVYpUKDYbp#*5Rz(FJ1Tln! zWca^70e~9J4CewZE>==373obFNUX6$;dfSUJt-%6t34G`Oqe{*mCPg)D%wkKzpwg6 zFDcJ7TDLNOpB>Ktkc4a