提交 1c1c83b7 编写于 作者: T twalthr 提交者: mbalassi

Fix invalid type hierarchy creation by Pojo logic

上级 5f780070
......@@ -232,29 +232,6 @@ public class TypeExtractor {
// get info from hierarchy
return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
}
/**
* @param curT : start type
* @return Type The immediate child of the top class
*/
private Type recursivelyGetTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
while (!(curT instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) curT).getRawType()).equals(
stopAtClass))
&& !(curT instanceof Class<?> && ((Class<?>) curT).equals(stopAtClass))) {
typeHierarchy.add(curT);
// parameterized type
if (curT instanceof ParameterizedType) {
curT = ((Class<?>) ((ParameterizedType) curT).getRawType()).getGenericSuperclass();
}
// class
else {
curT = ((Class<?>) curT).getGenericSuperclass();
}
}
return curT;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
......@@ -330,7 +307,7 @@ public class TypeExtractor {
int fieldCount = countFieldsInClass(tAsClass);
if(fieldCount != tupleSubTypes.length) {
// the class is not a real tuple because it contains additional fields. treat as a pojo
return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
}
return new TupleTypeInfo(tAsClass, tupleSubTypes);
......@@ -396,23 +373,11 @@ public class TypeExtractor {
}
// no tuple, no TypeVariable, no generic type
else if (t instanceof Class) {
return privateGetForClass((Class<OUT>) t, new ArrayList<Type>());
return privateGetForClass((Class<OUT>) t, typeHierarchy);
}
throw new InvalidTypesException("Type Information could not be created.");
}
private int countFieldsInClass(Class<?> clazz) {
int fieldCount = 0;
for(Field field : clazz.getFields()) { // get all fields
if( !Modifier.isStatic(field.getModifiers()) &&
!Modifier.isTransient(field.getModifiers())
) {
fieldCount++;
}
}
return fieldCount;
}
private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy,
TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) {
......@@ -427,6 +392,11 @@ public class TypeExtractor {
returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
}
// no input information exists
if (in1TypeInfo == null && in2TypeInfo == null) {
return null;
}
// create a new type hierarchy for the input
ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
// copy the function part of the type hierarchy
......@@ -753,6 +723,34 @@ public class TypeExtractor {
// Utility methods
// --------------------------------------------------------------------------------------------
/**
* @param curT : start type
* @return Type The immediate child of the top class
*/
private Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
// skip first one
if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) {
curT = typeToClass(curT).getGenericSuperclass();
}
while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) {
typeHierarchy.add(curT);
curT = typeToClass(curT).getGenericSuperclass();
}
return curT;
}
private int countFieldsInClass(Class<?> clazz) {
int fieldCount = 0;
for(Field field : clazz.getFields()) { // get all fields
if( !Modifier.isStatic(field.getModifiers()) &&
!Modifier.isTransient(field.getModifiers())
) {
fieldCount++;
}
}
return fieldCount;
}
private static Type removeGenericWrapper(Type t) {
if(t instanceof ParameterizedType &&
(Collector.class.isAssignableFrom(typeToClass(t))
......@@ -954,7 +952,7 @@ public class TypeExtractor {
return new GenericTypeInfo<X>(clazz);
}
try {
TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy, clazzTypeHint);
TypeInformation<X> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), clazzTypeHint);
if (pojoType != null) {
return pojoType;
}
......@@ -1032,12 +1030,12 @@ public class TypeExtractor {
}
private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
// try to create Type hierarchy, if the incoming one is empty.
if(typeHierarchy.size() == 0) {
recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class);
// try to create Type hierarchy, if the incoming only contains the most bottom one or none.
if(typeHierarchy.size() <= 1) {
getTypeHierarchy(typeHierarchy, clazz, Object.class);
}
if(clazzTypeHint != null) {
recursivelyGetTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
getTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
}
List<Field> fields = getAllDeclaredFields(clazz);
......@@ -1049,8 +1047,9 @@ public class TypeExtractor {
return null;
}
try {
typeHierarchy.add(fieldType);
pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(typeHierarchy, fieldType, null, null) ));
ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy);
fieldTypeHierarchy.add(fieldType);
pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) ));
} catch (InvalidTypesException e) {
//pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this
throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册