提交 5546a1ef 编写于 作者: T twalthr 提交者: Fabian Hueske

[FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has...

[FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

This closes #986
上级 941ac6df
......@@ -26,9 +26,7 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.functions.CoGroupFunction;
......@@ -66,15 +64,33 @@ import com.google.common.base.Preconditions;
* functions.
*/
public class TypeExtractor {
/*
* NOTE: Most methods of the TypeExtractor work with a so-called "typeHierarchy".
* The type hierarchy describes all types (Classes, ParameterizedTypes, TypeVariables etc. ) and intermediate
* types from a given type of a function or type (e.g. MyMapper, Tuple2) until a current type
* (depends on the method, e.g. MyPojoFieldType).
*
* Thus, it fully qualifies types until tuple/POJO field level.
*
* A typical typeHierarchy could look like:
*
* UDF: MyMapFunction.class
* top-level UDF: MyMapFunctionBase.class
* RichMapFunction: RichMapFunction.class
* MapFunction: MapFunction.class
* Function's OUT: Tuple1<MyPojo>
* user-defined POJO: MyPojo.class
* user-defined top-level POJO: MyPojoBase.class
* POJO field: Tuple1<String>
* Field type: String.class
*
*/
private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
// We need this to detect recursive types and not get caught
// in an endless recursion
private Set<Class<?>> alreadySeen;
protected TypeExtractor() {
alreadySeen = new HashSet<Class<?>>();
// only create instances for special use cases
}
// --------------------------------------------------------------------------------------------
......@@ -416,10 +432,12 @@ public class TypeExtractor {
TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length];
for (int i = 0; i < subtypes.length; i++) {
ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
subTypeHierarchy.add(subtypes[i]);
// sub type could not be determined with materializing
// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
if (subtypes[i] instanceof TypeVariable<?>) {
tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], typeHierarchy, in1Type, in2Type);
tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
// variable could not be determined
if (tupleSubTypes[i] == null) {
......@@ -430,7 +448,7 @@ public class TypeExtractor {
+ "all variables in the return type can be deduced from the input type(s).");
}
} else {
tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(new ArrayList<Type>(typeHierarchy), subtypes[i], in1Type, in2Type);
tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
}
}
......@@ -912,6 +930,19 @@ public class TypeExtractor {
// --------------------------------------------------------------------------------------------
// Utility methods
// --------------------------------------------------------------------------------------------
/**
* @return number of items with equal type or same raw type
*/
private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) {
int count = 0;
for (Type t : typeHierarchy) {
if (t == type || (isClassType(type) && t == typeToClass(type))) {
count++;
}
}
return count;
}
/**
* @param curT : start type
......@@ -1183,12 +1214,10 @@ public class TypeExtractor {
return (TypeInformation<OUT>) new AvroTypeInfo(clazz);
}
if (alreadySeen.contains(clazz)) {
if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
return new GenericTypeInfo<OUT>(clazz);
}
alreadySeen.add(clazz);
if (Modifier.isInterface(clazz.getModifiers())) {
// Interface has no members and is therefore not handled as POJO
return new GenericTypeInfo<OUT>(clazz);
......
......@@ -810,4 +810,67 @@ public class PojoTypeExtractionTest {
+ ">"));
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
}
public static class RecursivePojo1 {
public RecursivePojo1 field;
}
public static class RecursivePojo2 {
public Tuple1<RecursivePojo2> field;
}
public static class RecursivePojo3 {
public NestedPojo field;
}
public static class NestedPojo {
public RecursivePojo3 field;
}
@Test
public void testRecursivePojo1() {
TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class);
Assert.assertTrue(ti instanceof PojoTypeInfo);
Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).type.getClass());
}
@Test
public void testRecursivePojo2() {
TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class);
Assert.assertTrue(ti instanceof PojoTypeInfo);
PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
Assert.assertTrue(pf.type instanceof TupleTypeInfo);
Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.type).getTypeAt(0).getClass());
}
@Test
public void testRecursivePojo3() {
TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class);
Assert.assertTrue(ti instanceof PojoTypeInfo);
PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
Assert.assertTrue(pf.type instanceof PojoTypeInfo);
Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.type).getPojoFieldAt(0).type.getClass());
}
public static class FooBarPojo {
public int foo, bar;
public FooBarPojo() {}
}
public static class DuplicateMapper implements MapFunction<FooBarPojo, Tuple2<FooBarPojo, FooBarPojo>> {
@Override
public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) throws Exception {
return null;
}
}
@Test
public void testDualUseOfPojo() {
MapFunction<?, ?> function = new DuplicateMapper();
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeExtractor.createTypeInfo(FooBarPojo.class));
Assert.assertTrue(ti instanceof TupleTypeInfo);
TupleTypeInfo<?> tti = ((TupleTypeInfo) ti);
Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册