提交 a2f9aaba 编写于 作者: S Stephan Ewen

[FLINK-4316] [core] [hadoop compatibility] Make flink-core independent of Hadoop

This commit moves all 'Writable' related code to the 'flink-hadoop-compatibility' project
and uses reflection in 'flink-core' to instantiate WritableTypeInfo when needed.

This closes #2338
上级 2ab6d462
......@@ -55,17 +55,19 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests_2.10</artifactId>
<artifactId>flink-test-utils_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
......
......@@ -18,8 +18,8 @@
package org.apache.flink.api.java.typeutils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.AtomicType;
......@@ -28,11 +28,10 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
import org.apache.hadoop.io.Writable;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Type information for data types that extend Hadoop's {@link Writable} interface. The Writable
......
......@@ -18,8 +18,7 @@
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
import com.esotericsoftware.kryo.Kryo;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
......@@ -27,10 +26,10 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.NormalizableKey;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import com.esotericsoftware.kryo.Kryo;
import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.IOException;
public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
private static final long serialVersionUID = 1L;
......
......@@ -19,14 +19,13 @@
package org.apache.flink.api.java.typeutils.runtime;
import com.esotericsoftware.kryo.Kryo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import com.esotericsoftware.kryo.Kryo;
import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.IOException;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparator;
import org.junit.Test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import static org.junit.Assert.*;
@SuppressWarnings("serial")
public class WritableExtractionTest {
@Test
public void testDetectWritable() {
// writable interface itself must not be writable
assertFalse(TypeExtractor.isHadoopWritable(Writable.class));
// various forms of extension
assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class));
assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class));
assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class));
// some non-writables
assertFalse(TypeExtractor.isHadoopWritable(String.class));
assertFalse(TypeExtractor.isHadoopWritable(List.class));
assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class));
}
@Test
public void testCreateWritableInfo() {
TypeInformation<DirectWritable> info1 =
TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class);
assertEquals(DirectWritable.class, info1.getTypeClass());
TypeInformation<ViaInterfaceExtension> info2 =
TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
TypeInformation<ViaAbstractClassExtension> info3 =
TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass());
}
@Test
public void testValidateTypeInfo() {
// validate unrelated type info
TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, String.class);
// validate writable type info correctly
TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
DirectWritable.class), DirectWritable.class);
TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
ViaInterfaceExtension.class), ViaInterfaceExtension.class);
TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
ViaAbstractClassExtension.class), ViaAbstractClassExtension.class);
// incorrect case: not writable at all
try {
TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
DirectWritable.class), String.class);
fail("should have failed with an exception");
} catch (InvalidTypesException e) {
// expected
}
// incorrect case: wrong writable
try {
TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
ViaInterfaceExtension.class), DirectWritable.class);
fail("should have failed with an exception");
} catch (InvalidTypesException e) {
// expected
}
}
@Test
public void testExtractFromFunction() {
RichMapFunction<DirectWritable, DirectWritable> function = new RichMapFunction<DirectWritable, DirectWritable>() {
@Override
public DirectWritable map(DirectWritable value) throws Exception {
return null;
}
};
TypeInformation<DirectWritable> outType =
TypeExtractor.getMapReturnTypes(function, new WritableTypeInfo<>(DirectWritable.class));
assertTrue(outType instanceof WritableTypeInfo);
assertEquals(DirectWritable.class, outType.getTypeClass());
}
@Test
public void testExtractAsPartOfPojo() {
PojoTypeInfo<PojoWithWritable> pojoInfo =
(PojoTypeInfo<PojoWithWritable>) TypeExtractor.getForClass(PojoWithWritable.class);
boolean foundWritable = false;
for (int i = 0; i < pojoInfo.getArity(); i++) {
PojoField field = pojoInfo.getPojoFieldAt(i);
String name = field.getField().getName();
if (name.equals("hadoopCitizen")) {
if (foundWritable) {
fail("already seen");
}
foundWritable = true;
assertEquals(new WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation());
assertEquals(DirectWritable.class, field.getTypeInformation().getTypeClass());
}
}
assertTrue("missed the writable type", foundWritable);
}
@Test
public void testInputValidationError() {
RichMapFunction<Writable, String> function = new RichMapFunction<Writable, String>() {
@Override
public String map(Writable value) throws Exception {
return null;
}
};
@SuppressWarnings("unchecked")
TypeInformation<Writable> inType =
(TypeInformation<Writable>) (TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class);
try {
TypeExtractor.getMapReturnTypes(function, inType);
fail("exception expected");
}
catch (InvalidTypesException e) {
// right
}
}
// ------------------------------------------------------------------------
// test type classes
// ------------------------------------------------------------------------
public interface ExtendedWritable extends Writable {}
public static abstract class AbstractWritable implements Writable {}
public static class DirectWritable implements Writable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
@Override
public void readFields(DataInput dataInput) throws IOException {}
}
public static class ViaInterfaceExtension implements ExtendedWritable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
@Override
public void readFields(DataInput dataInput) throws IOException {}
}
public static class ViaAbstractClassExtension extends AbstractWritable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
@Override
public void readFields(DataInput dataInput) throws IOException {}
}
public static class PojoWithWritable {
public String str;
public DirectWritable hadoopCitizen;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.io.Writable;
import org.junit.Assert;
import org.junit.Test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class WritableInfoParserTest {
@Test
public void testWritableType() {
TypeInformation<?> ti = TypeInfoParser.parse(
"Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>");
Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass());
}
@Test
public void testPojoWithWritableType() {
TypeInformation<?> ti = TypeInfoParser.parse(
"org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<"
+ "basic=Integer,"
+ "tuple=Tuple2<String, Integer>,"
+ "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>,"
+ "array=String[]"
+ ">");
Assert.assertTrue(ti instanceof PojoTypeInfo);
PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
}
// ------------------------------------------------------------------------
// Test types
// ------------------------------------------------------------------------
public static class MyWritable implements Writable {
@Override
public void write(DataOutput out) throws IOException {}
@Override
public void readFields(DataInput in) throws IOException {}
}
public static class MyPojo {
public Integer basic;
public Tuple2<String, Integer> tuple;
public MyWritable hadoopCitizen;
public String[] array;
}
}
......@@ -26,35 +26,11 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
public class WritableTypeInfoTest extends TestLogger {
public static class TestClass implements Writable {
@Override
public void write(DataOutput dataOutput) throws IOException {
}
@Override
public void readFields(DataInput dataInput) throws IOException {
}
}
public static class AlternateClass implements Writable {
@Override
public void write(DataOutput dataOutput) throws IOException {
}
@Override
public void readFields(DataInput dataInput) throws IOException {
}
}
@Test
public void testWritableTypeInfoEquality() {
WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
......@@ -71,4 +47,26 @@ public class WritableTypeInfoTest extends TestLogger {
assertNotEquals(tpeInfo1, tpeInfo2);
}
// ------------------------------------------------------------------------
// test types
// ------------------------------------------------------------------------
public static class TestClass implements Writable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
@Override
public void readFields(DataInput dataInput) throws IOException {}
}
public static class AlternateClass implements Writable {
@Override
public void write(DataOutput dataOutput) throws IOException {}
@Override
public void readFields(DataInput dataInput) throws IOException {}
}
}
......@@ -18,12 +18,12 @@
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class StringArrayWritable implements Writable, Comparable<StringArrayWritable> {
private String[] array = new String[0];
......
......@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> {
public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> {
StringArrayWritable[] data = new StringArrayWritable[]{
new StringArrayWritable(new String[]{}),
......
......@@ -42,6 +42,12 @@ under the License.
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hcatalog-core</artifactId>
......
......@@ -70,13 +70,12 @@ under the License.
</exclusions>
</dependency>
<!-- Hadoop is only needed here for serialization interoperability with the Writable type -->
<!-- The common collections are needed for some hash tables used in the collection execution -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${shading-artifact.name}</artifactId>
<version>${project.version}</version>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<!-- test dependencies -->
<dependency>
......@@ -117,6 +116,7 @@ under the License.
<parameter>
<excludes combine.children="append">
<exclude>org.apache.flink.api.common.ExecutionConfig#CONFIG_KEY</exclude>
<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
</excludes>
</parameter>
</configuration>
......
......@@ -21,12 +21,14 @@ package org.apache.flink.api.java.typeutils;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.avro.specific.SpecificRecordBase;
......@@ -62,8 +64,6 @@ import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.types.Either;
import org.apache.flink.types.Value;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -97,7 +97,12 @@ public class TypeExtractor {
* Field type: String.class
*
*/
/** The name of the class representing Hadoop's writable */
private static final String HADOOP_WRITABLE_CLASS = "org.apache.hadoop.io.Writable";
private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = "org.apache.flink.api.java.typeutils.WritableTypeInfo";
private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
protected TypeExtractor() {
......@@ -1119,21 +1124,6 @@ public class TypeExtractor {
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType());
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType());
}
// check for Writable
else if (typeInfo instanceof WritableTypeInfo<?>) {
// check if writable at all
if (!(type instanceof Class<?> && Writable.class.isAssignableFrom((Class<?>) type))) {
throw new InvalidTypesException("Writable type expected.");
}
// check writable type contents
Class<?> clazz;
if (((WritableTypeInfo<?>) typeInfo).getTypeClass() != (clazz = (Class<?>) type)) {
throw new InvalidTypesException("Writable type '"
+ ((WritableTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '"
+ clazz.getCanonicalName() + "'.");
}
}
// check for primitive array
else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
Type component;
......@@ -1237,6 +1227,10 @@ public class TypeExtractor {
+ clazz.getCanonicalName() + "'.");
}
}
// check for Writable
else {
validateIfWritable(typeInfo, type);
}
} else {
type = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) type);
if (!(type instanceof TypeVariable)) {
......@@ -1546,8 +1540,8 @@ public class TypeExtractor {
}
// check for writable types
if(Writable.class.isAssignableFrom(clazz) && !Writable.class.equals(clazz)) {
return (TypeInformation<OUT>) WritableTypeInfo.getWritableTypeInfo((Class<? extends Writable>) clazz);
if (isHadoopWritable(clazz)) {
return createHadoopWritableTypeInfo(clazz);
}
// check for basic types
......@@ -1904,4 +1898,93 @@ public class TypeExtractor {
return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
}
}
// ------------------------------------------------------------------------
// Utilities to handle Hadoop's 'Writable' type via reflection
// ------------------------------------------------------------------------
// visible for testing
static boolean isHadoopWritable(Class<?> typeClass) {
// check if this is directly the writable interface
if (typeClass.getName().equals(HADOOP_WRITABLE_CLASS)) {
return false;
}
final HashSet<Class<?>> alreadySeen = new HashSet<>();
alreadySeen.add(typeClass);
return hasHadoopWritableInterface(typeClass, alreadySeen);
}
private static boolean hasHadoopWritableInterface(Class<?> clazz, HashSet<Class<?>> alreadySeen) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> c : interfaces) {
if (c.getName().equals("org.apache.hadoop.io.Writable")) {
return true;
}
else if (alreadySeen.add(c) && hasHadoopWritableInterface(c, alreadySeen)) {
return true;
}
}
Class<?> superclass = clazz.getSuperclass();
return superclass != null && alreadySeen.add(superclass) && hasHadoopWritableInterface(superclass, alreadySeen);
}
// visible for testing
public static <T> TypeInformation<T> createHadoopWritableTypeInfo(Class<T> clazz) {
checkNotNull(clazz);
Class<?> typeInfoClass;
try {
typeInfoClass = Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader());
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load the TypeInformation for the class '"
+ HADOOP_WRITABLE_CLASS + "'. You may be missing the 'flink-hadoop-compatibility' dependency.");
}
try {
Constructor<?> constr = typeInfoClass.getConstructor(Class.class);
@SuppressWarnings("unchecked")
TypeInformation<T> typeInfo = (TypeInformation<T>) constr.newInstance(clazz);
return typeInfo;
}
catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
throw new RuntimeException("Incompatible versions of the Hadoop Compatibility classes found.");
}
catch (InvocationTargetException e) {
throw new RuntimeException("Cannot create Hadoop Writable Type info", e.getTargetException());
}
}
// visible for testing
static void validateIfWritable(TypeInformation<?> typeInfo, Type type) {
try {
// try to load the writable type info
Class<?> writableTypeInfoClass = Class
.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, typeInfo.getClass().getClassLoader());
if (writableTypeInfoClass.isAssignableFrom(typeInfo.getClass())) {
// this is actually a writable type info
// check if the type is a writable
if (!(type instanceof Class && isHadoopWritable((Class<?>) type))) {
throw new InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected");
}
// check writable type contents
Class<?> clazz = (Class<?>) type;
if (typeInfo.getTypeClass() != clazz) {
throw new InvalidTypesException("Writable type '"
+ typeInfo.getTypeClass().getCanonicalName() + "' expected but was '"
+ clazz.getCanonicalName() + "'.");
}
}
}
catch (ClassNotFoundException e) {
// class not present at all, so cannot be that type info
// ignore
}
}
}
......@@ -168,7 +168,7 @@ public class TypeInfoParser {
String fullyQualifiedName = writableMatcher.group(3);
sb.delete(0, className.length() + 1 + fullyQualifiedName.length() + 1);
Class<?> clazz = loadClass(fullyQualifiedName);
returnType = WritableTypeInfo.getWritableTypeInfo((Class) clazz);
returnType = TypeExtractor.createHadoopWritableTypeInfo(clazz);
}
// enum types
else if (enumMatcher.find()) {
......
......@@ -33,7 +33,7 @@ import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyValue;
import org.junit.Assert;
import org.junit.Test;
......@@ -79,7 +79,7 @@ public class PojoTypeExtractionTest {
public float someFloat; // BasicType
public Tuple3<Long, Long, String> word; //Tuple Type with three basic types
public Object nothing; // generic type
public MyWritable hadoopCitizen; // writableType
public MyValue valueType; // writableType
public List<String> collection;
}
......@@ -219,18 +219,18 @@ public class PojoTypeExtractionTest {
List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>();
String[] fields = {"count",
"complex.date",
"complex.hadoopCitizen",
"complex.collection",
"complex.nothing",
"complex.someFloat",
"complex.someNumberWithÜnicödeNäme",
"complex.valueType",
"complex.word.f0",
"complex.word.f1",
"complex.word.f2"};
int[] positions = {9,
1,
2,
0,
2,
3,
4,
5,
......@@ -284,16 +284,16 @@ public class PojoTypeExtractionTest {
Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
}
if(pos == 2) {
Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
}
if(pos == 3) {
Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
}
if(pos == 4) {
Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
}
if(pos == 5) {
Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
Assert.assertEquals(MyValue.class, ffdE.getType().getTypeClass());
}
if(pos == 6) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
......@@ -374,13 +374,13 @@ public class PojoTypeExtractionTest {
objectSeen = true;
Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
Assert.assertEquals(Object.class, field.getTypeInformation().getTypeClass());
} else if(name.equals("hadoopCitizen")) {
} else if(name.equals("valueType")) {
if(writableSeen) {
Assert.fail("already seen");
}
writableSeen = true;
Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation());
Assert.assertEquals(MyWritable.class, field.getTypeInformation().getTypeClass());
Assert.assertEquals(new ValueTypeInfo<>(MyValue.class), field.getTypeInformation());
Assert.assertEquals(MyValue.class, field.getTypeInformation().getTypeClass());
} else if(name.equals("collection")) {
if(collectionSeen) {
Assert.fail("already seen");
......@@ -447,7 +447,7 @@ public class PojoTypeExtractionTest {
strArraySeen = true;
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.getTypeInformation());
Assert.assertEquals(String[].class, field.getTypeInformation().getTypeClass());
} else if(Arrays.asList("date", "someNumberWithÜnicödeNäme", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) {
} else if(Arrays.asList("date", "someNumberWithÜnicödeNäme", "someFloat", "word", "nothing", "valueType", "collection").contains(name)) {
// ignore these, they are inherited from the ComplexNestedClass
}
else {
......
......@@ -18,9 +18,6 @@
package org.apache.flink.api.java.typeutils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
......@@ -64,8 +61,6 @@ import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.hadoop.io.Writable;
import org.junit.Assert;
import org.junit.Test;
......@@ -99,38 +94,6 @@ public class TypeExtractorTest {
// use getForObject()
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeExtractor.getForObject(true));
}
public static class MyWritable implements Writable {
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void readFields(DataInput in) throws IOException {
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testWritableType() {
RichMapFunction<?, ?> function = new RichMapFunction<MyWritable, MyWritable>() {
private static final long serialVersionUID = 1L;
@Override
public MyWritable map(MyWritable value) throws Exception {
return null;
}
};
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) new WritableTypeInfo<MyWritable>(MyWritable.class));
Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
......@@ -1407,22 +1370,6 @@ public class TypeExtractorTest {
} catch (InvalidTypesException e) {
// right
}
RichMapFunction<?, ?> function4 = new RichMapFunction<Writable, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(Writable value) throws Exception {
return null;
}
};
try {
TypeExtractor.getMapReturnTypes(function4, (TypeInformation) new WritableTypeInfo<MyWritable>(MyWritable.class));
Assert.fail("exception expected");
} catch (InvalidTypesException e) {
// right
}
}
public static class DummyFlatMapFunction<A,B,C,D> extends RichFlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> {
......
......@@ -18,10 +18,9 @@
package org.apache.flink.api.java.typeutils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Value;
import org.junit.Assert;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
......@@ -40,9 +39,11 @@ import org.apache.flink.types.MapValue;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
import org.apache.hadoop.io.Writable;
import org.junit.Test;
import java.io.IOException;
public class TypeInfoParserTest {
@Test
......@@ -109,7 +110,9 @@ public class TypeInfoParserTest {
Assert.assertEquals(clazz, vti.getTypeClass());
}
@Test
@SuppressWarnings("AssertEqualsBetweenInconvertibleTypes")
public void testBasicArrays() {
Assert.assertEquals(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Integer[]"));
Assert.assertEquals(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, TypeInfoParser.parse("Double[]"));
......@@ -166,11 +169,21 @@ public class TypeInfoParserTest {
Assert.assertTrue(ti instanceof GenericTypeInfo);
Assert.assertEquals(Class.class, ((GenericTypeInfo<?>) ti).getTypeClass());
}
public static class MyValue implements Value {
private static final long serialVersionUID = 8607223484689147046L;
@Override
public void write(DataOutputView out) throws IOException {}
@Override
public void read(DataInputView in) throws IOException {}
}
public static class MyPojo {
public Integer basic;
public Tuple2<String, Integer> tuple;
public MyWritable hadoopCitizen;
public Value valueType;
public String[] array;
}
......@@ -180,7 +193,7 @@ public class TypeInfoParserTest {
"org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<"
+ "basic=Integer,"
+ "tuple=Tuple2<String, Integer>,"
+ "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>,"
+ "valueType=org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyValue,"
+ "array=String[]"
+ ">");
Assert.assertTrue(ti instanceof PojoTypeInfo);
......@@ -189,10 +202,12 @@ public class TypeInfoParserTest {
Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
Assert.assertEquals("tuple", pti.getPojoFieldAt(2).getField().getName());
Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof TupleTypeInfo);
Assert.assertEquals("valueType", pti.getPojoFieldAt(3).getField().getName());
// this currently fails but should not
// Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof ValueTypeInfo);
}
@Test
......@@ -209,28 +224,7 @@ public class TypeInfoParserTest {
Assert.assertEquals("basic", pti.getPojoFieldAt(0).getField().getName());
Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicTypeInfo);
}
public static class MyWritable implements Writable {
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void readFields(DataInput in) throws IOException {
}
}
@Test
public void testWritableType() {
TypeInformation<?> ti = TypeInfoParser.parse("Writable<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>");
Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass());
}
@Test
public void testObjectArrays() {
TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class[]");
......@@ -327,11 +321,15 @@ public class TypeInfoParserTest {
ti = TypeInfoParser.parse("IntValue[][][]");
Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<ValueType<IntValue>>>>", ti.toString());
// writable types
ti = TypeInfoParser.parse("Writable<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>[][][]");
Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
+ "WritableType<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>"
+ ">>>", ti.toString());
// value types
ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyValue[][][]");
// this fails because value types are parsed in a wrong way
// Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
// + "ValueType<TypeInfoParserTest$MyValue>"
// + ">>>", ti.toString());
// enum types
ti = TypeInfoParser.parse("Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>[][][]");
......
......@@ -291,7 +291,7 @@ private[flink] trait TypeInformationGen[C <: Context] {
desc: UDTDescriptor): c.Expr[TypeInformation[T]] = {
val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe)))
reify {
new WritableTypeInfo[T](tpeClazz.splice)
TypeExtractor.createHadoopWritableTypeInfo[T](tpeClazz.splice)
}
}
......
......@@ -78,6 +78,13 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
......
......@@ -131,6 +131,13 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer_2.10</artifactId>
......
......@@ -233,6 +233,13 @@ under the License.
<version>2.4</version>
</dependency>
<!--- commons collections needs to be pinned to this critical security fix version -->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<!-- common-beanutils-bean-collections is used by flink-shaded-hadoop2 -->
<dependency>
<groupId>commons-beanutils</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册