提交 f73a12e7 编写于 作者: T twalthr

[FLINK-3046] Integrate the Either Java type with the TypeExtractor

This closes #1393.
上级 e69d1452
......@@ -37,7 +37,7 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
private final TypeInformation<R> rightType;
public EitherTypeInfo(TypeInformation<L> leftType,TypeInformation<R> rightType) {
public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) {
this.leftType = leftType;
this.rightType = rightType;
}
......@@ -108,4 +108,14 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
return obj instanceof EitherTypeInfo;
}
// --------------------------------------------------------------------------------------------
public TypeInformation<L> getLeftType() {
return leftType;
}
public TypeInformation<R> getRightType() {
return rightType;
}
}
......@@ -420,65 +420,53 @@ public class TypeExtractor {
}
typeHierarchy.add(curT);
ParameterizedType tupleChild = (ParameterizedType) curT;
Type[] subtypes = new Type[tupleChild.getActualTypeArguments().length];
// materialize possible type variables
for (int i = 0; i < subtypes.length; i++) {
// materialize immediate TypeVariables
if (tupleChild.getActualTypeArguments()[i] instanceof TypeVariable<?>) {
subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) tupleChild.getActualTypeArguments()[i]);
// create the type information for the subtypes
TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
// type needs to be treated a pojo due to additional fields
if (subTypesInfo == null) {
if (t instanceof ParameterizedType) {
return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
}
// class or parameterized type
else {
subtypes[i] = tupleChild.getActualTypeArguments()[i];
return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
}
}
// return tuple info
return new TupleTypeInfo(typeToClass(t), subTypesInfo);
TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length];
for (int i = 0; i < subtypes.length; i++) {
ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
subTypeHierarchy.add(subtypes[i]);
// sub type could not be determined with materializing
// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
if (subtypes[i] instanceof TypeVariable<?>) {
tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
// variable could not be determined
if (tupleSubTypes[i] == null) {
throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
+ "' could not be determined. This is most likely a type erasure problem. "
+ "The type extraction currently supports types with generic variables only in cases where "
+ "all variables in the return type can be deduced from the input type(s).");
}
} else {
tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
}
}
// check if type is a subclass of Either
else if (isClassType(t) && Either.class.isAssignableFrom(typeToClass(t))) {
Type curT = t;
// go up the hierarchy until we reach Either (with or without generics)
// collect the types while moving up for a later top-down
while (!(isClassType(curT) && typeToClass(curT).equals(Either.class))) {
typeHierarchy.add(curT);
curT = typeToClass(curT).getGenericSuperclass();
}
Class<?> tAsClass = null;
if (isClassType(t)) {
tAsClass = typeToClass(t);
}
Preconditions.checkNotNull(tAsClass, "t has a unexpected type");
// check if the class we assumed to be a Tuple so far is actually a pojo because it contains additional fields.
// check for additional fields.
int fieldCount = countFieldsInClass(tAsClass);
if(fieldCount != tupleSubTypes.length) {
// the class is not a real tuple because it contains additional fields. treat as a pojo
// check if Either has generics
if (curT instanceof Class<?>) {
throw new InvalidTypesException("Either needs to be parameterized by using generics.");
}
typeHierarchy.add(curT);
// create the type information for the subtypes
TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
// type needs to be treated a pojo due to additional fields
if (subTypesInfo == null) {
if (t instanceof ParameterizedType) {
return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
}
else {
return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
}
}
return new TupleTypeInfo(tAsClass, tupleSubTypes);
// return either info
return (TypeInformation<OUT>) new EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]);
}
// type depends on another type
// e.g. class MyMapper<E> extends MapFunction<String, E>
......@@ -675,6 +663,71 @@ public class TypeExtractor {
}
return info;
}
/**
* Creates the TypeInformation for all elements of a type that expects a certain number of
* subtypes (e.g. TupleXX or Either).
*
* @param originalType most concrete subclass
* @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes)
* @param typeHierarchy necessary for type inference
* @param in1Type necessary for type inference
* @param in2Type necessary for type inference
* @return array containing TypeInformation of sub types or null if definingType contains
* more subtypes (fields) that defined
*/
private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type originalType, ParameterizedType definingType,
ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
Type[] subtypes = new Type[definingType.getActualTypeArguments().length];
// materialize possible type variables
for (int i = 0; i < subtypes.length; i++) {
// materialize immediate TypeVariables
if (definingType.getActualTypeArguments()[i] instanceof TypeVariable<?>) {
subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) definingType.getActualTypeArguments()[i]);
}
// class or parameterized type
else {
subtypes[i] = definingType.getActualTypeArguments()[i];
}
}
TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length];
for (int i = 0; i < subtypes.length; i++) {
ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
subTypeHierarchy.add(subtypes[i]);
// sub type could not be determined with materializing
// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
if (subtypes[i] instanceof TypeVariable<?>) {
subTypesInfo[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
// variable could not be determined
if (subTypesInfo[i] == null) {
throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
+ "' could not be determined. This is most likely a type erasure problem. "
+ "The type extraction currently supports types with generic variables only in cases where "
+ "all variables in the return type can be deduced from the input type(s).");
}
} else {
subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
}
}
Class<?> originalTypeAsClass = null;
if (isClassType(originalType)) {
originalTypeAsClass = typeToClass(originalType);
}
Preconditions.checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
// check if the class we assumed to conform to the defining type so far is actually a pojo because the
// original type contains additional fields.
// check for additional fields.
int fieldCount = countFieldsInClass(originalTypeAsClass);
if(fieldCount > subTypesInfo.length) {
return null;
}
return subTypesInfo;
}
// --------------------------------------------------------------------------------------------
// Extract type parameters
......@@ -830,9 +883,32 @@ public class TypeExtractor {
}
for (int i = 0; i < subTypes.length; i++) {
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], ((TupleTypeInfo<?>) typeInfo).getTypeAt(i));
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
}
}
// check for Either
else if (typeInfo instanceof EitherTypeInfo) {
// check if Either at all
if (!(isClassType(type) && Either.class.isAssignableFrom(typeToClass(type)))) {
throw new InvalidTypesException("Either type expected.");
}
// go up the hierarchy until we reach Either (with or without generics)
while (!(isClassType(type) && typeToClass(type).equals(Either.class))) {
typeHierarchy.add(type);
type = typeToClass(type).getGenericSuperclass();
}
// check if Either has generics
if (type instanceof Class<?>) {
throw new InvalidTypesException("Parameterized Either type expected.");
}
EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, ?>) typeInfo;
Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments();
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType());
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType());
}
// check for Writable
else if (typeInfo instanceof WritableTypeInfo<?>) {
// check if writable at all
......@@ -1224,7 +1300,7 @@ public class TypeExtractor {
if(Writable.class.isAssignableFrom(clazz) && !Writable.class.equals(clazz)) {
return (TypeInformation<OUT>) WritableTypeInfo.getWritableTypeInfo((Class<? extends Writable>) clazz);
}
// check for basic types
TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
if (basicTypeInfo != null) {
......@@ -1245,6 +1321,11 @@ public class TypeExtractor {
throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class.");
}
// check for subclasses of Either
if (Either.class.isAssignableFrom(clazz)) {
throw new InvalidTypesException("Type information extraction for Either cannot be done based on the class.");
}
// check for Enums
if(Enum.class.isAssignableFrom(clazz)) {
return new EnumTypeInfo(clazz);
......@@ -1558,7 +1639,20 @@ public class TypeExtractor {
infos[i] = privateGetForObject(field);
}
return new TupleTypeInfo(value.getClass(), infos);
} else {
}
// we can not extract the types from an Either object since it only contains type information
// of one type, but from Either classes
else if (value instanceof Either) {
try {
return (TypeInformation<X>) privateCreateTypeInfo(value.getClass());
}
catch (InvalidTypesException e) {
throw new InvalidTypesException("Automatic type extraction is not possible on an Either type "
+ "as it does not contain information about both possible types. "
+ "Please specify the types directly.");
}
}
else {
return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
}
}
......
......@@ -46,6 +46,8 @@ import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.api.java.typeutils.Either;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.EnumTypeInfo;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
......@@ -67,8 +69,6 @@ import org.apache.hadoop.io.Writable;
import org.junit.Assert;
import org.junit.Test;
import javax.xml.bind.TypeConstraintException;
public class TypeExtractorTest {
......@@ -1829,4 +1829,81 @@ public class TypeExtractorTest {
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, BasicTypeInfo.INT_TYPE_INFO);
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
}
public static class Either1<T> extends Either<String, T> {
@Override
public String left() throws IllegalStateException {
return null;
}
@Override
public T right() throws IllegalStateException {
return null;
}
}
public static class Either2 extends Either1<Tuple1<Integer>> {
// nothing to do here
}
public static class EitherMapper<T> implements MapFunction<T, Either1<T>> {
@Override
public Either1<T> map(T value) throws Exception {
return null;
}
}
public static class EitherMapper2 implements MapFunction<String, Either2> {
@Override
public Either2 map(String value) throws Exception {
return null;
}
}
public static class EitherMapper3 implements MapFunction<Either2, Either2> {
@Override
public Either2 map(Either2 value) throws Exception {
return null;
}
}
@Test
public void testEither() {
MapFunction<?, ?> function = new MapFunction<Either<String, Boolean>, Either<String, Boolean>>() {
@Override
public Either<String, Boolean> map(Either<String, Boolean> value) throws Exception {
return null;
}
};
TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected);
Assert.assertEquals(expected, ti);
}
@Test
public void testEitherHierarchy() {
MapFunction<?, ?> function = new EitherMapper<Boolean>();
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.BOOLEAN_TYPE_INFO);
TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
Assert.assertEquals(expected, ti);
function = new EitherMapper2();
ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.STRING_TYPE_INFO);
expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO));
Assert.assertEquals(expected, ti);
function = new EitherMapper3();
ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected);
Assert.assertEquals(expected, ti);
Either<String, Tuple1<Integer>> either = new Either2();
ti = TypeExtractor.getForObject(either);
Assert.assertEquals(expected, ti);
}
@Test(expected=InvalidTypesException.class)
public void testEitherFromObjectException() {
Either<String, Tuple1<Integer>> either = Either.left("test");
TypeExtractor.getForObject(either);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册