提交 434e88fd 编写于 作者: T twalthr 提交者: Stephan Ewen

[FLINK-3566] [FLINK-3563] [core] TypeExtraction input type validation fixes

This closes #1759
上级 b0befc4c
......@@ -890,8 +890,8 @@ public class TypeExtractor {
}
if (!(type instanceof TypeVariable<?>)) {
// check for basic type
if (typeInfo.isBasicType()) {
// check for Java Basic Types
if (typeInfo instanceof BasicTypeInfo) {
TypeInformation<?> actual;
// check if basic type at all
......@@ -904,8 +904,8 @@ public class TypeExtractor {
}
}
// check for tuple
else if (typeInfo.isTupleType()) {
// check for Java Tuples
else if (typeInfo instanceof TupleTypeInfo) {
// check if tuple at all
if (!(isClassType(type) && Tuple.class.isAssignableFrom(typeToClass(type)))) {
throw new InvalidTypesException("Tuple type expected.");
......@@ -1079,9 +1079,9 @@ public class TypeExtractor {
// check for generic object
else if (typeInfo instanceof GenericTypeInfo<?>) {
Class<?> clazz = null;
if (!(isClassType(type) && ((GenericTypeInfo<?>) typeInfo).getTypeClass() == (clazz = typeToClass(type)))) {
throw new InvalidTypesException("Generic object type '"
+ ((GenericTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '"
if (!(isClassType(type) && (clazz = typeToClass(type)).isAssignableFrom(((GenericTypeInfo<?>) typeInfo).getTypeClass()))) {
throw new InvalidTypesException("Generic type '"
+ ((GenericTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' or a subclass of it expected but was '"
+ clazz.getCanonicalName() + "'.");
}
}
......
......@@ -22,8 +22,11 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
......@@ -38,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
......@@ -1904,4 +1908,112 @@ public class TypeExtractorTest {
Either<String, Tuple1<Integer>> either = Either.Left("test");
TypeExtractor.getForObject(either);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testGenericTypeWithSubclassInput() {
Map<String, Object> inputMap = new HashMap<>();
inputMap.put("a", "b");
TypeInformation<?> inputType = TypeExtractor.createTypeInfo(inputMap.getClass());
MapFunction<?, ?> function = new MapFunction<Map<String, Object>,Map<String, Object>>(){
@Override
public Map<String, Object> map(Map<String, Object> stringObjectMap) throws Exception {
return stringObjectMap;
}
};
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType);
TypeInformation<?> expected = TypeExtractor.createTypeInfo(Map.class);
Assert.assertEquals(expected, ti);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(expected=InvalidTypesException.class)
public void testGenericTypeWithSuperclassInput() {
TypeInformation<?> inputType = TypeExtractor.createTypeInfo(Map.class);
MapFunction<?, ?> function = new MapFunction<HashMap<String, Object>,Map<String, Object>>(){
@Override
public Map<String, Object> map(HashMap<String, Object> stringObjectMap) throws Exception {
return stringObjectMap;
}
};
TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testInputWithCustomTypeInfo() {
TypeInformation<?> customTypeInfo = new TypeInformation<Object>() {
@Override
public boolean isBasicType() {
return true;
}
@Override
public boolean isTupleType() {
return true;
}
@Override
public int getArity() {
return 0;
}
@Override
public int getTotalFields() {
return 0;
}
@Override
public Class getTypeClass() {
return null;
}
@Override
public boolean isKeyType() {
return false;
}
@Override
public TypeSerializer<Object> createSerializer(ExecutionConfig config) {
return null;
}
@Override
public String toString() {
return null;
}
@Override
public boolean equals(Object obj) {
return false;
}
@Override
public int hashCode() {
return 0;
}
@Override
public boolean canEqual(Object obj) {
return false;
}
};
MapFunction<?, ?> function = new MapFunction<Tuple1<String>, Tuple1<Object>>() {
@Override
public Tuple1<Object> map(Tuple1<String> value) throws Exception {
return null;
}
};
TypeExtractor.getMapReturnTypes(function,
(TypeInformation) new TupleTypeInfo<Tuple1<Object>>(customTypeInfo));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册