未验证 提交 9d8a2e26 编写于 作者: I Igal Shilman 提交者: Tzu-Li (Gordon) Tai

[FLINK-11323] Migrate WritableSerializer to use new serialization compatibility abstractions

上级 22a7ecca
......@@ -21,8 +21,11 @@ package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.GenericTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
......@@ -157,8 +160,8 @@ public final class WritableSerializer<T extends Writable> extends TypeSerializer
// --------------------------------------------------------------------------------------------
@Override
public WritableSerializerConfigSnapshot<T> snapshotConfiguration() {
return new WritableSerializerConfigSnapshot<>(typeClass);
public TypeSerializerSnapshot<T> snapshotConfiguration() {
return new WritableSerializerSnapshot<>(typeClass);
}
@Override
......@@ -176,6 +179,7 @@ public final class WritableSerializer<T extends Writable> extends TypeSerializer
* The config snapshot for this serializer.
* @param <T>
*/
@Deprecated
public static final class WritableSerializerConfigSnapshot<T extends Writable>
extends GenericTypeSerializerConfigSnapshot<T> {
......@@ -192,5 +196,43 @@ public final class WritableSerializer<T extends Writable> extends TypeSerializer
public int getVersion() {
return VERSION;
}
@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
return new WritableSerializerSnapshot<>(getTypeClass())
.resolveSchemaCompatibility(newSerializer);
}
}
/**
* {@link WritableSerializer} snapshot class.
*/
public static final class WritableSerializerSnapshot<T extends Writable>
extends GenericTypeSerializerSnapshot<T, WritableSerializer> {
@SuppressWarnings("unused")
public WritableSerializerSnapshot() {
}
WritableSerializerSnapshot(Class<T> typeClass) {
super(typeClass);
}
@Override
protected TypeSerializer<T> createSerializer(Class<T> typeClass) {
return new WritableSerializer<>(typeClass);
}
@SuppressWarnings("unchecked")
@Override
protected Class<T> getTypeClass(WritableSerializer serializer) {
return serializer.typeClass;
}
@Override
protected Class<?> serializerClass() {
return WritableSerializer.class;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer.WritableSerializerSnapshot;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializerMigrationTest.WritableName;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.hadoop.io.Writable;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
/**
* State migration test for {@link RowSerializer}.
*/
@RunWith(Parameterized.class)
public class WritableSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<WritableName> {
public WritableSerializerMigrationTest(TestSpecification<WritableName> testSpecification) {
super(testSpecification);
}
@SuppressWarnings("unchecked")
@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<TestSpecification<?>> testSpecifications() {
TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
testSpecifications.add(
"writeable-serializer",
WritableSerializer.class,
WritableSerializerSnapshot.class,
() -> new WritableSerializer<>(WritableName.class));
return testSpecifications.get();
}
/**
* A dummy class that is used in this test.
*/
public static final class WritableName implements Writable {
public static final long serialVersionUID = 1L;
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册