提交 e791b1a2 编写于 作者: T Tzu-Li (Gordon) Tai

[FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new...

[FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new GenericArraySerializerSnapshot
上级 68fab127
......@@ -206,7 +206,7 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
// --------------------------------------------------------------------------------------------
@Override
public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() {
return new GenericArraySerializerConfigSnapshot<>(this);
public GenericArraySerializerSnapshot<C> snapshotConfiguration() {
return new GenericArraySerializerSnapshot<>(this);
}
}
......@@ -38,8 +38,12 @@ import static org.apache.flink.util.Preconditions.checkState;
* Point-in-time configuration of a {@link GenericArraySerializer}.
*
* @param <C> The component type.
*
* @deprecated this is deprecated and no longer used by the {@link GenericArraySerializer}.
* It has been replaced by {@link GenericArraySerializerSnapshot}.
*/
@Internal
@Deprecated
public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerializerSnapshot<C[]> {
private static final int CURRENT_VERSION = 2;
......@@ -118,18 +122,12 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
@Override
public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
checkState(componentClass != null && nestedSnapshot != null);
if (newSerializer instanceof GenericArraySerializer) {
GenericArraySerializer<C> serializer = (GenericArraySerializer<C>) newSerializer;
TypeSerializerSchemaCompatibility<C> compat = serializer.getComponentClass() == componentClass ?
TypeSerializerSchemaCompatibility.compatibleAsIs() :
TypeSerializerSchemaCompatibility.incompatible();
return nestedSnapshot.resolveCompatibilityWithNested(
compat, serializer.getComponentSerializer());
}
else {
// delegate to the new snapshot class
GenericArraySerializer<C> castedNewSerializer = (GenericArraySerializer<C>) newSerializer;
GenericArraySerializerSnapshot<C> newSnapshot = new GenericArraySerializerSnapshot<>(castedNewSerializer);
return newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
} else {
return TypeSerializerSchemaCompatibility.incompatible();
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
/**
* Point-in-time configuration of a {@link GenericArraySerializer}.
*
* @param <C> The component type.
*/
public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {
private static final int CURRENT_VERSION = 1;
private Class<C> componentClass;
/**
* Constructor to be used for read instantiation.
*/
public GenericArraySerializerSnapshot() {
super(GenericArraySerializer.class);
}
/**
* Constructor to be used for writing the snapshot.
*/
public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) {
super(genericArraySerializer);
this.componentClass = genericArraySerializer.getComponentClass();
}
@Override
protected int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
protected void writeOuterSnapshot(DataOutputView out) throws IOException {
out.writeUTF(componentClass.getName());
}
@Override
protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
}
@Override
protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
@SuppressWarnings("unchecked")
TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0];
return new GenericArraySerializer<>(componentClass, componentSerializer);
}
@Override
protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) {
return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() };
}
}
......@@ -19,7 +19,7 @@
package org.apache.flink.api.common.typeutils;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
......@@ -55,7 +55,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
// GenericArray<String>
final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerConfigSnapshot.class)
final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
.withTestData("flink-1.6-array-type-serializer-data", 10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册