提交 65e87045 编写于 作者: A Aljoscha Krettek 提交者: Stephan Ewen

[FLINK-7420] [avro] Abstract all Avro interaction behind AvroUtils

Before, we would try and dynamicall load Avro-related classes in several
places. Now, we only reflectively instantiate the right AvroUtils and
all other operations are methods on this.

The default AvroUtils throw exceptions with a helpful message for most
operations.
上级 29249b2e
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import java.util.LinkedHashMap;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
/**
* Utility methods for dealing with Avro types. This has a default implementation for the case that
* Avro is not present on the classpath and an actual implementation in flink-avro that is
* dynamically loaded when present.
*/
public abstract class AvroUtils {
protected static final String AVRO_SPECIFIC_RECORD_BASE = "org.apache.avro.specific.SpecificRecordBase";
protected static final String AVRO_GENERIC_RECORD = "org.apache.avro.generic.GenericData$Record";
private static final String AVRO_KRYO_UTILS = "org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils";
private static final String AVRO_GENERIC_DATA_ARRAY = "org.apache.avro.generic.GenericData$Array";
/**
* Loads the utility class from <code>flink-avro</code> and adds Avro-specific serializers. This
* method will throw an exception if we see an Avro type but flink-avro is not in the classpath.
*/
public abstract void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type);
/**
* Registers a special Serializer for GenericData.Array.
*/
public abstract void addAvroGenericDataArrayRegistration(
LinkedHashMap<String,
KryoRegistration> kryoRegistrations);
/**
* Creates an {@code AvroSerializer} if flink-avro is present, otherwise throws an exception.
*/
public abstract <T> TypeSerializer<T> createAvroSerializer(Class<T> type);
/**
* Creates an {@code AvroTypeInfo} if flink-avro is present, otherwise throws an exception.
*/
public abstract <T> TypeInformation<T> createAvroTypeInfo(Class<T> type);
/**
* Returns either the default {@link AvroUtils} which throw an exception in cases where Avro
* would be needed or loads the specific utils for Avro from flink-avro.
*/
public static AvroUtils getAvroUtils() {
// try and load the special AvroUtils from the flink-avro package
Class<?> clazz;
try {
clazz = Class.forName(AVRO_KRYO_UTILS, false, AvroUtils.class.getClassLoader());
} catch (ClassNotFoundException e) {
// cannot find the utils, return the default implementation
return new DefaultAvroUtils();
}
try {
return (AvroUtils) clazz.getConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException("Could not instantiate " + AVRO_KRYO_UTILS + ".");
}
}
private static class DefaultAvroUtils extends AvroUtils {
@Override
public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(
type,
AVRO_GENERIC_RECORD)) {
throw new RuntimeException("Could not load class '" + AVRO_KRYO_UTILS + "'. " +
"You may be missing the 'flink-avro' dependency.");
}
}
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
new KryoRegistration(Serializers.DummyAvroRegisteredClass.class, (Class) Serializers.DummyAvroKryoSerializerClass.class));
}
@Override
public <T> TypeSerializer<T> createAvroSerializer(Class<T> type) {
throw new RuntimeException("Could not load the AvroSerializer class. " +
"You may be missing the 'flink-avro' dependency.");
}
@Override
public <T> TypeInformation<T> createAvroTypeInfo(Class<T> type) {
throw new RuntimeException("Could not load the AvroTypeInfo class. " +
"You may be missing the 'flink-avro' dependency.");
}
}
}
......@@ -31,9 +31,7 @@ import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -303,27 +301,12 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
@PublicEvolving
@SuppressWarnings("unchecked")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
if(config.isForceKryoEnabled()) {
if (config.isForceKryoEnabled()) {
return new KryoSerializer<>(getTypeClass(), config);
}
if(config.isForceAvroEnabled()) {
Class<?> clazz;
try {
clazz = Class.forName("org.apache.flink.formats.avro.typeutils.AvroSerializer");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load the AvroSerializer class. " +
"You may be missing the 'flink-avro' dependency.");
}
try {
Constructor<?> constructor = clazz.getConstructor(Class.class);
return (TypeSerializer<T>) constructor.newInstance(getTypeClass());
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
throw new RuntimeException("Incompatible versions of the Avro classes found.");
} catch (InvocationTargetException e) {
throw new RuntimeException("Cannot create AvroSerializer.", e.getTargetException());
}
if (config.isForceAvroEnabled()) {
return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
}
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
......
......@@ -117,8 +117,6 @@ public class TypeExtractor {
private static final String AVRO_SPECIFIC_RECORD_BASE_CLASS = "org.apache.avro.specific.SpecificRecordBase";
private static final String AVRO_TYPEINFO_CLASS = "org.apache.flink.formats.avro.typeutils.AvroTypeInfo";
private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
public static final int[] NO_INDEX = new int[] {};
......@@ -1794,7 +1792,7 @@ public class TypeExtractor {
// special case for POJOs generated by Avro.
if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
return createAvroTypeInfo(clazz);
return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
}
if (Modifier.isInterface(clazz.getModifiers())) {
......@@ -2175,33 +2173,4 @@ public class TypeExtractor {
// ignore
}
}
// ------------------------------------------------------------------------
// Utilities to handle Avro's 'SpecificRecord' type via reflection
// ------------------------------------------------------------------------
private static <T> TypeInformation<T> createAvroTypeInfo(Class<T> clazz) {
Class<?> typeInfoClass;
try {
typeInfoClass = Class.forName(AVRO_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader());
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load the TypeInformation for the class '"
+ AVRO_TYPEINFO_CLASS + "'. You may be missing the 'flink-avro' dependency.");
}
try {
Constructor<?> constr = typeInfoClass.getConstructor(Class.class);
@SuppressWarnings("unchecked")
TypeInformation<T> typeInfo = (TypeInformation<T>) constr.newInstance(clazz);
return typeInfo;
}
catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
throw new RuntimeException("Incompatible versions of the Avro classes found.");
}
catch (InvocationTargetException e) {
throw new RuntimeException("Cannot create AvroTypeInfo.", e.getTargetException());
}
}
}
......@@ -29,6 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.java.typeutils.AvroUtils;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
......@@ -477,7 +478,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
}
// add Avro support if flink-avro is available; a dummy otherwise
Serializers.addAvroGenericDataArrayRegistration(kryoRegistrations);
AvroUtils.getAvroUtils().addAvroGenericDataArrayRegistration(kryoRegistrations);
return kryoRegistrations;
}
......
......@@ -22,10 +22,10 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.typeutils.AvroUtils;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
......@@ -36,19 +36,14 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
/**
* Class containing utilities for the serializers of the Flink Runtime.
*
......@@ -61,14 +56,6 @@ import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSupercl
@Internal
public class Serializers {
private static final String AVRO_SPECIFIC_RECORD_BASE = "org.apache.avro.specific.SpecificRecordBase";
private static final String AVRO_GENERIC_RECORD = "org.apache.avro.generic.GenericData$Record";
private static final String AVRO_KRYO_UTILS = "org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils";
private static final String AVRO_GENERIC_DATA_ARRAY = "org.apache.avro.generic.GenericData$Array";
public static void recursivelyRegisterType(TypeInformation<?> typeInfo, ExecutionConfig config, Set<Class<?>> alreadySeen) {
if (typeInfo instanceof GenericTypeInfo) {
GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo;
......@@ -104,9 +91,7 @@ public class Serializers {
else {
config.registerKryoType(type);
// add serializers for Avro type if necessary
if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(type, AVRO_GENERIC_RECORD)) {
addAvroSerializers(config, type);
}
AvroUtils.getAvroUtils().addAvroSerializersIfRequired(config, type);
Field[] fields = type.getDeclaredFields();
for (Field field : fields) {
......@@ -161,43 +146,19 @@ public class Serializers {
}
/**
* Loads the utility class from <code>flink-avro</code> and adds Avro-specific serializers.
* This is used in case we don't have Avro on the classpath. Flink versions before 1.4
* always registered special Serializers for Kryo but starting with Flink 1.4 we don't have
* Avro on the classpath by default anymore. We still have to retain the same registered
* Serializers for backwards compatibility of savepoints.
*/
private static void addAvroSerializers(ExecutionConfig reg, Class<?> type) {
Class<?> clazz;
try {
clazz = Class.forName(AVRO_KRYO_UTILS, false, Serializers.class.getClassLoader());
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load class '" + AVRO_KRYO_UTILS + "'. " +
"You may be missing the 'flink-avro' dependency.");
}
try {
clazz.getDeclaredMethod("addAvroSerializers", ExecutionConfig.class, Class.class).invoke(null, reg, type);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Could not access method in 'flink-avro' dependency.", e);
}
}
@SuppressWarnings("unchecked")
public static void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
try {
Class<?> clazz = Class.forName(AVRO_GENERIC_DATA_ARRAY, false, Serializers.class.getClassLoader());
kryoRegistrations.put(
AVRO_GENERIC_DATA_ARRAY,
new KryoRegistration(
clazz,
new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
}
catch (ClassNotFoundException e) {
kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
new KryoRegistration(DummyAvroRegisteredClass.class, (Class) DummyAvroKryoSerializerClass.class));
}
}
public static class DummyAvroRegisteredClass {}
/**
* This is used in case we don't have Avro on the classpath. Flink versions before 1.4
* always registered special Serializers for Kryo but starting with Flink 1.4 we don't have
* Avro on the classpath by default anymore. We still have to retain the same registered
* Serializers for backwards compatibility of savepoints.
*/
public static class DummyAvroKryoSerializerClass<T> extends Serializer<T> {
@Override
public void write(Kryo kryo, Output output, Object o) {
......
......@@ -19,7 +19,13 @@
package org.apache.flink.formats.avro.utils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.AvroUtils;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import org.apache.flink.formats.avro.typeutils.AvroSerializer;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
......@@ -29,22 +35,50 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import java.io.Serializable;
import java.util.LinkedHashMap;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
/**
* Utilities for integrating Avro serializers in Kryo.
*/
public class AvroKryoSerializerUtils {
public class AvroKryoSerializerUtils extends AvroUtils {
@Override
public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(type, AVRO_GENERIC_RECORD)) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
}
}
@Override
public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
kryoRegistrations.put(
GenericData.Array.class.getName(),
new KryoRegistration(
GenericData.Array.class,
new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
}
public static void addAvroSerializers(ExecutionConfig reg, Class<?> type) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
@Override
public <T> TypeSerializer<T> createAvroSerializer(Class<T> type) {
return new AvroSerializer<>(type);
}
// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public <T> TypeInformation<T> createAvroTypeInfo(Class<T> type) {
// we have to be raw here because we cannot have "<T extends SpecificRecordBase>" in
// the interface of AvroUtils
return new AvroTypeInfo(type);
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册