提交 63c04a51 编写于 作者: T Tzu-Li (Gordon) Tai

[FLINK-6178] [core] Introduce TypeDeserializer interface for CompatibilityResult

Previously, the CompatibilityResult class accepts a full-blown
TypeSerializer for its convert deserializer, which will actually only
ever be used for deserialization.

This commit narrows down the interface by introducing a new
TypeDeserializer interface that contains only the read methods.

This closes #3834.
This closes #3804.
上级 8aa5e057
......@@ -38,7 +38,7 @@ public final class CompatibilityResult<T> {
*
* <p>This is only relevant if migration is required.
*/
private final TypeSerializer<T> convertDeserializer;
private final TypeDeserializer<T> convertDeserializer;
/**
* Returns a strategy that signals that the new serializer is compatible and no migration is required.
......@@ -61,16 +61,16 @@ public final class CompatibilityResult<T> {
*
* @return a result that signals migration is necessary, possibly providing a convert deserializer.
*/
public static <T> CompatibilityResult<T> requiresMigration(TypeSerializer<T> convertDeserializer) {
public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) {
return new CompatibilityResult<>(true, convertDeserializer);
}
private CompatibilityResult(boolean requiresMigration, TypeSerializer<T> convertDeserializer) {
private CompatibilityResult(boolean requiresMigration, TypeDeserializer<T> convertDeserializer) {
this.requiresMigration = requiresMigration;
this.convertDeserializer = convertDeserializer;
}
public TypeSerializer<T> getConvertDeserializer() {
public TypeDeserializer<T> getConvertDeserializer() {
return convertDeserializer;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils;
import org.apache.flink.core.memory.DataInputView;
import java.io.IOException;
/**
* This interface describes the methods that are required for a data type to be read by the Flink runtime.
* Specifically, this interface contains the deserialization methods. In contrast, the {@link TypeSerializer}
* interface contains the complete set of methods for both serialization and deserialization.
*
* <p>The methods in this class are assumed to be stateless, such that it is effectively thread safe. Stateful
* implementations of the methods may lead to unpredictable side effects and will compromise both stability and
* correctness of the program.
*
* @param <T> The data type that the deserializer deserializes.
*/
public interface TypeDeserializer<T> {
/**
* Creates a deep copy of this deserializer if it is necessary, i.e. if it is stateful. This
* can return itself if the serializer is not stateful.
*
* We need this because deserializers might be used in several threads. Stateless deserializers
* are inherently thread-safe while stateful deserializers might not be thread-safe.
*/
TypeSerializer<T> duplicate();
/**
* De-serializes a record from the given source input view.
*
* @param source The input view from which to read the data.
* @return The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
*/
T deserialize(DataInputView source) throws IOException;
/**
* De-serializes a record from the given source input view into the given reuse record instance if mutable.
*
* @param reuse The record instance into which to de-serialize the data.
* @param source The input view from which to read the data.
* @return The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
*/
T deserialize(T reuse, DataInputView source) throws IOException;
/**
* Gets the length of the data type, if it is a fix length data type.
*
* @return The length of the data type, or <code>-1</code> for variable length data types.
*/
int getLength();
/**
* Returns true if the given object can be equaled with this object. If not, it returns false.
*
* @param obj Object which wants to take part in the equality relation
* @return true if obj can be equaled with this, otherwise false
*/
boolean canEqual(Object obj);
boolean equals(Object obj);
int hashCode();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* A utility class that wraps a {@link TypeDeserializer} as a {@link TypeSerializer}.
*
* <p>Methods related to deserialization are directly forwarded to the wrapped deserializer,
* while serialization methods are masked and not intended for use.
*
* @param <T> The data type that the deserializer deserializes.
*/
public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 1L;
/** The actual wrapped deserializer instance */
private final TypeDeserializer<T> deserializer;
/**
* Creates a {@link TypeSerializer} that wraps a {@link TypeDeserializer}.
*
* @param deserializer the actual deserializer to wrap.
*/
public TypeDeserializerAdapter(TypeDeserializer<T> deserializer) {
this.deserializer = Preconditions.checkNotNull(deserializer);
}
// --------------------------------------------------------------------------------------------
// Forwarded deserialization related methods
// --------------------------------------------------------------------------------------------
public T deserialize(DataInputView source) throws IOException {
return deserializer.deserialize(source);
}
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserializer.deserialize(reuse, source);
}
public TypeSerializer<T> duplicate() {
return deserializer.duplicate();
}
public int getLength() {
return deserializer.getLength();
}
public boolean equals(Object obj) {
return deserializer.equals(obj);
}
public boolean canEqual(Object obj) {
return deserializer.canEqual(obj);
}
public int hashCode() {
return deserializer.hashCode();
}
// --------------------------------------------------------------------------------------------
// Irrelevant methods not intended for use
// --------------------------------------------------------------------------------------------
public boolean isImmutableType() {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}
public T createInstance() {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}
public T copy(T from) {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}
public T copy(T from, T reuse) {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}
public void serialize(T record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}
public void copy(DataInputView source, DataOutputView target) throws IOException {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}
public TypeSerializerConfigSnapshot snapshotConfiguration() {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}
}
......@@ -36,7 +36,7 @@ import java.io.Serializable;
* @param <T> The data type that the serializer serializes.
*/
@PublicEvolving
public abstract class TypeSerializer<T> implements Serializable {
public abstract class TypeSerializer<T> implements TypeDeserializer<T>, Serializable {
private static final long serialVersionUID = 1L;
......@@ -197,7 +197,7 @@ public abstract class TypeSerializer<T> implements Serializable {
* has been reconfigured to be compatible, to continue reading previous data, and that the
* serialization schema remains the same. No migration needs to be performed.</li>
*
* <li>{@link CompatibilityResult#requiresMigration(TypeSerializer)}: this signals Flink that
* <li>{@link CompatibilityResult#requiresMigration(TypeDeserializer)}: this signals Flink that
* migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
* compatible, for previous data. Furthermore, in the case that the preceding serializer cannot be found or
* restored to read the previous data to perform the migration, the provided convert deserializer can be
......
......@@ -23,6 +23,7 @@ import java.lang.reflect.Array;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
......@@ -213,7 +214,7 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
return CompatibilityResult.requiresMigration(
new GenericArraySerializer<>(
componentClass,
compatResult.getConvertDeserializer()));
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
}
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
......@@ -188,7 +189,7 @@ public final class ListSerializer<T> extends TypeSerializer<List<T>> {
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new ListSerializer<>(compatResult.getConvertDeserializer()));
new ListSerializer<>(new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
......@@ -220,8 +221,8 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
} else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new MapSerializer<>(
keyCompatResult.getConvertDeserializer(),
valueCompatResult.getConvertDeserializer()));
new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer())));
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
......@@ -212,8 +213,8 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
if (leftCompatResult.getConvertDeserializer() != null && rightCompatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new EitherSerializer<>(
leftCompatResult.getConvertDeserializer(),
rightCompatResult.getConvertDeserializer()));
new TypeDeserializerAdapter<>(leftCompatResult.getConvertDeserializer()),
new TypeDeserializerAdapter<>(rightCompatResult.getConvertDeserializer())));
}
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
......@@ -276,7 +277,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
// one of the field serializers cannot provide a fallback deserializer
return CompatibilityResult.requiresMigration(null);
} else {
convertDeserializers[i] = compatResult.getConvertDeserializer();
convertDeserializers[i] =
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
}
}
}
......
......@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
......@@ -516,7 +517,8 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new PriorityQueueSerializer<>(compatResult.getConvertDeserializer(), factory));
new PriorityQueueSerializer<>(
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), factory));
}
}
......
......@@ -96,7 +96,8 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
if (compatResult.requiresMigration()) {
if (compatResult.getConvertDeserializer != null) {
CompatibilityResult.requiresMigration(
new CRowSerializer(compatResult.getConvertDeserializer)
new CRowSerializer(
new TypeDeserializerAdapter(compatResult.getConvertDeserializer))
)
} else {
CompatibilityResult.requiresMigration(null)
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
......@@ -155,7 +156,7 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new ArrayListSerializer<>(compatResult.getConvertDeserializer()));
new ArrayListSerializer<>(new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
......@@ -221,8 +222,8 @@ public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>>
} else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new HashMapSerializer<>(
keyCompatResult.getConvertDeserializer(),
valueCompatResult.getConvertDeserializer()));
new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer())));
}
}
......
......@@ -18,7 +18,7 @@
package org.apache.flink.api.scala.typeutils
import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeDeserializerAdapter, TypeSerializer, TypeSerializerConfigSnapshot}
import org.apache.flink.api.java.typeutils.runtime.EitherSerializerConfigSnapshot
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
......@@ -133,8 +133,8 @@ class EitherSerializer[A, B, T <: Either[A, B]](
CompatibilityResult.requiresMigration(
new EitherSerializer[A, B, T](
leftCompatResult.getConvertDeserializer,
rightCompatResult.getConvertDeserializer
new TypeDeserializerAdapter(leftCompatResult.getConvertDeserializer),
new TypeDeserializerAdapter(rightCompatResult.getConvertDeserializer)
)
)
......
......@@ -114,7 +114,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
if (compatResult.requiresMigration()) {
if (compatResult.getConvertDeserializer != null) {
CompatibilityResult.requiresMigration(
new OptionSerializer[A](compatResult.getConvertDeserializer))
new OptionSerializer[A](
new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
} else {
CompatibilityResult.requiresMigration(null)
}
......
......@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
......@@ -228,7 +229,8 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new MultiplexingStreamRecordSerializer<>(compatResult.getConvertDeserializer()));
new MultiplexingStreamRecordSerializer<>(
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
}
}
......
......@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
......@@ -167,7 +168,8 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
return CompatibilityResult.requiresMigration(null);
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new StreamRecordSerializer<>(compatResult.getConvertDeserializer()));
new StreamRecordSerializer<>(
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
}
}
......
......@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
......@@ -289,7 +290,8 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new StreamElementSerializer<>(compatResult.getConvertDeserializer()));
new StreamElementSerializer<>(
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册