diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 67b2a51303f5f16a357e04f2b4b5ecf8d2e0ca84..3bceac5dcd8d98c4fb40279dc83232f25cecd49c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -232,29 +232,6 @@ public class TypeExtractor { // get info from hierarchy return (TypeInformation) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type); } - - - /** - * @param curT : start type - * @return Type The immediate child of the top class - */ - private Type recursivelyGetTypeHierarchy(ArrayList typeHierarchy, Type curT, Class stopAtClass) { - while (!(curT instanceof ParameterizedType && ((Class) ((ParameterizedType) curT).getRawType()).equals( - stopAtClass)) - && !(curT instanceof Class && ((Class) curT).equals(stopAtClass))) { - typeHierarchy.add(curT); - - // parameterized type - if (curT instanceof ParameterizedType) { - curT = ((Class) ((ParameterizedType) curT).getRawType()).getGenericSuperclass(); - } - // class - else { - curT = ((Class) curT).getGenericSuperclass(); - } - } - return curT; - } @SuppressWarnings({ "unchecked", "rawtypes" }) private TypeInformation createTypeInfoWithTypeHierarchy(ArrayList typeHierarchy, Type t, @@ -330,7 +307,7 @@ public class TypeExtractor { int fieldCount = countFieldsInClass(tAsClass); if(fieldCount != tupleSubTypes.length) { // the class is not a real tuple because it contains additional fields. treat as a pojo - return (TypeInformation) analyzePojo(tAsClass, new ArrayList(), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class. + return (TypeInformation) analyzePojo(tAsClass, new ArrayList(typeHierarchy), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class. } return new TupleTypeInfo(tAsClass, tupleSubTypes); @@ -396,23 +373,11 @@ public class TypeExtractor { } // no tuple, no TypeVariable, no generic type else if (t instanceof Class) { - return privateGetForClass((Class) t, new ArrayList()); + return privateGetForClass((Class) t, typeHierarchy); } throw new InvalidTypesException("Type Information could not be created."); } - - private int countFieldsInClass(Class clazz) { - int fieldCount = 0; - for(Field field : clazz.getFields()) { // get all fields - if( !Modifier.isStatic(field.getModifiers()) && - !Modifier.isTransient(field.getModifiers()) - ) { - fieldCount++; - } - } - return fieldCount; - } private TypeInformation createTypeInfoFromInputs(TypeVariable returnTypeVar, ArrayList returnTypeHierarchy, TypeInformation in1TypeInfo, TypeInformation in2TypeInfo) { @@ -427,6 +392,11 @@ public class TypeExtractor { returnTypeVar = (TypeVariable) matReturnTypeVar; } + // no input information exists + if (in1TypeInfo == null && in2TypeInfo == null) { + return null; + } + // create a new type hierarchy for the input ArrayList inputTypeHierarchy = new ArrayList(); // copy the function part of the type hierarchy @@ -753,6 +723,34 @@ public class TypeExtractor { // Utility methods // -------------------------------------------------------------------------------------------- + /** + * @param curT : start type + * @return Type The immediate child of the top class + */ + private Type getTypeHierarchy(ArrayList typeHierarchy, Type curT, Class stopAtClass) { + // skip first one + if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) { + curT = typeToClass(curT).getGenericSuperclass(); + } + while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) { + typeHierarchy.add(curT); + curT = typeToClass(curT).getGenericSuperclass(); + } + return curT; + } + + private int countFieldsInClass(Class clazz) { + int fieldCount = 0; + for(Field field : clazz.getFields()) { // get all fields + if( !Modifier.isStatic(field.getModifiers()) && + !Modifier.isTransient(field.getModifiers()) + ) { + fieldCount++; + } + } + return fieldCount; + } + private static Type removeGenericWrapper(Type t) { if(t instanceof ParameterizedType && (Collector.class.isAssignableFrom(typeToClass(t)) @@ -954,7 +952,7 @@ public class TypeExtractor { return new GenericTypeInfo(clazz); } try { - TypeInformation pojoType = analyzePojo(clazz, typeHierarchy, clazzTypeHint); + TypeInformation pojoType = analyzePojo(clazz, new ArrayList(typeHierarchy), clazzTypeHint); if (pojoType != null) { return pojoType; } @@ -1032,12 +1030,12 @@ public class TypeExtractor { } private TypeInformation analyzePojo(Class clazz, ArrayList typeHierarchy, ParameterizedType clazzTypeHint) { - // try to create Type hierarchy, if the incoming one is empty. - if(typeHierarchy.size() == 0) { - recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class); + // try to create Type hierarchy, if the incoming only contains the most bottom one or none. + if(typeHierarchy.size() <= 1) { + getTypeHierarchy(typeHierarchy, clazz, Object.class); } if(clazzTypeHint != null) { - recursivelyGetTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class); + getTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class); } List fields = getAllDeclaredFields(clazz); @@ -1049,8 +1047,9 @@ public class TypeExtractor { return null; } try { - typeHierarchy.add(fieldType); - pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(typeHierarchy, fieldType, null, null) )); + ArrayList fieldTypeHierarchy = new ArrayList(typeHierarchy); + fieldTypeHierarchy.add(fieldType); + pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) )); } catch (InvalidTypesException e) { //pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+""