提交 aca6fbcd 编写于 作者: R Robert Metzger 提交者: Robert Metzger

Added expression keys to distinct and partition operator and addressed some of the comments

上级 926f835a
......@@ -433,6 +433,20 @@ public abstract class DataSet<T> {
return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType(), true));
}
/**
* Returns a distinct set of a {@link Tuple} {@link DataSet} using expression keys.
* <p/>
* The field position keys specify the fields of Tuples or Pojos on which the decision is made if two elements are distinct or
* not.
* <p/>
*
* @param fields One or more field positions on which the distinction of the DataSet is decided.
* @return A DistinctOperator that represents the distinct DataSet.
*/
public DistinctOperator<T> distinct(String... fields) {
return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType()));
}
/**
* Returns a distinct set of a {@link Tuple} {@link DataSet} using all fields of the tuple.
* <p/>
......@@ -866,6 +880,18 @@ public abstract class DataSet<T> {
return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType(), false));
}
/**
* Hash-partitions a DataSet on the specified key fields.
* <p>
* <b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
*
* @param fields The field expressions on which the DataSet is hash-partitioned.
* @return The partitioned DataSet.
*/
public PartitionOperator<T> partitionByHash(String... fields) {
return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType()));
}
/**
* Partitions a DataSet using the specified KeySelector.
* <p>
......
......@@ -26,6 +26,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
......@@ -67,8 +68,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
// FieldPositionKeys can only be applied on Tuples
if (keys instanceof Keys.ExpressionKeys && !input.getType().isTupleType()) {
throw new InvalidProgramException("Distinction on field positions is only possible on tuple data types.");
if (keys instanceof Keys.ExpressionKeys && !(input.getType() instanceof CompositeType)) {
throw new InvalidProgramException("Distinction on field positions is only possible on composite type DataSets.");
}
this.keys = keys;
......
......@@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
......@@ -50,8 +51,8 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
throw new UnsupportedOperationException("Range Partitioning not yet supported");
}
if(pKeys instanceof Keys.ExpressionKeys<?> && !input.getType().isTupleType()) {
throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Tuple DataSets");
if(pKeys instanceof Keys.ExpressionKeys<?> && !(input.getType() instanceof CompositeType) ) {
throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Composite-type DataSets");
}
this.pMethod = pMethod;
......
......@@ -399,8 +399,7 @@ public class TypeExtractor {
}
// objects with generics are treated as raw type
else if (t instanceof ParameterizedType) {
return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType(), new ArrayList<Type>()); // pass new type hierarchies here because
// while creating the TH here, we assumed a tuple type.
return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType(), typeHierarchy);
}
// no tuple, no TypeVariable, no generic type
else if (t instanceof Class) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.type.extractor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.PojoField;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.HashMultiset;
/**
* Pojo Type tests
*
* A Pojo is a bean-style class with getters, setters and empty ctor
* OR a class with all fields public (or for every private field, there has to be a public getter/setter)
* everything else is a generic type (that can't be used for field selection)
*/
public class PojoTypeExtractionTest {
// test with correct pojo types
public static class WC { // is a pojo
public ComplexNestedClass complex; // is a pojo
private int count; // is a BasicType
public WC() {
}
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass { // pojo
public static int ignoreStaticField;
public transient int ignoreTransientField;
public Date date; // generic type
public Integer someNumber; // BasicType
public float someFloat; // BasicType
public Tuple3<Long, Long, String> word; //Tuple Type with three basic types
public Object nothing; // generic type
public MyWritable hadoopCitizen; // writableType
}
// all public test
public static class AllPublic extends ComplexNestedClass {
public ArrayList<String> somethingFancy; // generic type
public HashMultiset<Integer> fancyIds; // generic type
public String[] fancyArray; // generic type
}
public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> {
public String field3;
}
public static class PojoWithGenerics<T1, T2> {
public int key;
public T1 field1;
public T2 field2;
}
public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {}
public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {}
// extends from Tuple and adds a field
public static class FromTuple extends Tuple3<String, String, Long> {
private static final long serialVersionUID = 1L;
public int special;
}
public static class IncorrectPojo {
private int isPrivate;
public int getIsPrivate() {
return isPrivate;
}
// setter is missing (intentional)
}
// correct pojo
public static class BeanStylePojo {
public String abc;
private int field;
public int getField() {
return this.field;
}
public void setField(int f) {
this.field = f;
}
}
public static class WrongCtorPojo {
public int a;
public WrongCtorPojo(int a) {
this.a = a;
}
}
// in this test, the location of the getters and setters is mixed across the type hierarchy.
public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> {
public void setPackageProtected(String in) {
this.packageProtected = in;
}
}
public static class GenericPojoGetterSetterCheck<T> {
T packageProtected;
public T getPackageProtected() {
return packageProtected;
}
}
@Test
public void testIncorrectPojos() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class);
Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class);
Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
}
@Test
public void testCorrectPojos() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
}
@Test
public void testPojoWC() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class);
checkWCPojoAsserts(typeForClass);
WC t = new WC();
t.complex = new ComplexNestedClass();
TypeInformation<?> typeForObject = TypeExtractor.getForObject(t);
checkWCPojoAsserts(typeForObject);
}
private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
Assert.assertFalse(typeInfo.isBasicType());
Assert.assertFalse(typeInfo.isTupleType());
Assert.assertEquals(9, typeInfo.getTotalFields());
Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>();
String[] fields = {"count","complex.date", "complex.hadoopCitizen", "complex.nothing",
"complex.someFloat", "complex.someNumber", "complex.word.f0",
"complex.word.f1", "complex.word.f2"};
int[] positions = {8,0,1,2,
3,4,5,
6,7};
Assert.assertEquals(fields.length, positions.length);
for(int i = 0; i < fields.length; i++) {
pojoType.getKey(fields[i], 0, ffd);
Assert.assertEquals("Too many keys returned", 1, ffd.size());
Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition());
ffd.clear();
}
pojoType.getKey("complex.word.*", 0, ffd);
Assert.assertEquals(3, ffd.size());
// check if it returns 5,6,7
for(FlatFieldDescriptor ffdE : ffd) {
final int pos = ffdE.getPosition();
Assert.assertTrue(pos <= 7 );
Assert.assertTrue(5 <= pos );
if(pos == 5) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 6) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 7) {
Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
}
}
ffd.clear();
pojoType.getKey("complex.*", 0, ffd);
Assert.assertEquals(8, ffd.size());
// check if it returns 0-7
for(FlatFieldDescriptor ffdE : ffd) {
final int pos = ffdE.getPosition();
Assert.assertTrue(ffdE.getPosition() <= 7 );
Assert.assertTrue(0 <= ffdE.getPosition() );
if(pos == 0) {
Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
}
if(pos == 1) {
Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
}
if(pos == 2) {
Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
}
if(pos == 3) {
Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
}
if(pos == 4) {
Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
}
if(pos == 5) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 6) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 7) {
Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
}
}
ffd.clear();
pojoType.getKey("*", 0, ffd);
Assert.assertEquals(9, ffd.size());
// check if it returns 0-8
for(FlatFieldDescriptor ffdE : ffd) {
Assert.assertTrue(ffdE.getPosition() <= 8 );
Assert.assertTrue(0 <= ffdE.getPosition() );
if(ffdE.getPosition() == 8) {
Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
}
}
ffd.clear();
TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex
Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
Assert.assertEquals(6, typeComplexNested.getArity());
Assert.assertEquals(8, typeComplexNested.getTotalFields());
PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested;
boolean dateSeen = false, intSeen = false, floatSeen = false,
tupleSeen = false, objectSeen = false, writableSeen = false;
for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
PojoField field = pojoTypeComplexNested.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("date")) {
if(dateSeen) {
Assert.fail("already seen");
}
dateSeen = true;
Assert.assertEquals(new GenericTypeInfo<Date>(Date.class), field.type);
Assert.assertEquals(Date.class, field.type.getTypeClass());
} else if(name.equals("someNumber")) {
if(intSeen) {
Assert.fail("already seen");
}
intSeen = true;
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
Assert.assertEquals(Integer.class, field.type.getTypeClass());
} else if(name.equals("someFloat")) {
if(floatSeen) {
Assert.fail("already seen");
}
floatSeen = true;
Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type);
Assert.assertEquals(Float.class, field.type.getTypeClass());
} else if(name.equals("word")) {
if(tupleSeen) {
Assert.fail("already seen");
}
tupleSeen = true;
Assert.assertTrue(field.type instanceof TupleTypeInfo<?>);
Assert.assertEquals(Tuple3.class, field.type.getTypeClass());
// do some more advanced checks on the tuple
TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.type;
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2));
} else if(name.equals("nothing")) {
if(objectSeen) {
Assert.fail("already seen");
}
objectSeen = true;
Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
Assert.assertEquals(Object.class, field.type.getTypeClass());
} else if(name.equals("hadoopCitizen")) {
if(writableSeen) {
Assert.fail("already seen");
}
writableSeen = true;
Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
Assert.assertEquals(MyWritable.class, field.type.getTypeClass());
} else {
Assert.fail("field "+field+" is not expected");
}
}
Assert.assertTrue("Field was not present", dateSeen);
Assert.assertTrue("Field was not present", intSeen);
Assert.assertTrue("Field was not present", floatSeen);
Assert.assertTrue("Field was not present", tupleSeen);
Assert.assertTrue("Field was not present", objectSeen);
Assert.assertTrue("Field was not present", writableSeen);
TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count
Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
Assert.assertEquals(typeInfo.getArity(), 2);
}
@Test
public void testPojoAllPublic() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class);
checkAllPublicAsserts(typeForClass);
TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() );
checkAllPublicAsserts(typeForObject);
}
private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
Assert.assertEquals(9, typeInformation.getArity());
Assert.assertEquals(11, typeInformation.getTotalFields());
// check if the three additional fields are identified correctly
boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false;
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("somethingFancy")) {
if(arrayListSeen) {
Assert.fail("already seen");
}
arrayListSeen = true;
Assert.assertTrue(field.type instanceof GenericTypeInfo);
Assert.assertEquals(ArrayList.class, field.type.getTypeClass());
} else if(name.equals("fancyIds")) {
if(multisetSeen) {
Assert.fail("already seen");
}
multisetSeen = true;
Assert.assertTrue(field.type instanceof GenericTypeInfo);
Assert.assertEquals(HashMultiset.class, field.type.getTypeClass());
} else if(name.equals("fancyArray")) {
if(strArraySeen) {
Assert.fail("already seen");
}
strArraySeen = true;
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
Assert.assertEquals(String[].class, field.type.getTypeClass());
} else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen").contains(name)) {
// ignore these, they are inherited from the ComplexNestedClass
}
else {
Assert.fail("field "+field+" is not expected");
}
}
Assert.assertTrue("Field was not present", arrayListSeen);
Assert.assertTrue("Field was not present", multisetSeen);
Assert.assertTrue("Field was not present", strArraySeen);
}
@Test
public void testPojoExtendingTuple() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class);
checkFromTuplePojo(typeForClass);
FromTuple ft = new FromTuple();
ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft);
checkFromTuplePojo(typeForObject);
}
private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
Assert.assertEquals(4, typeInformation.getTotalFields());
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("special")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else if(name.equals("f0") || name.equals("f1")) {
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
} else if(name.equals("f2")) {
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
} else {
Assert.fail("unexpected field");
}
}
}
@Test
public void testPojoWithGenerics() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("field1")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else if (name.equals("field2")) {
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
} else if (name.equals("field3")) {
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
} else if (name.equals("key")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else {
Assert.fail("Unexpected field "+field);
}
}
}
/**
* Test if the TypeExtractor is accepting untyped generics,
* making them GenericTypes
*/
@Test
@Ignore // kryo needed.
public void testPojoWithGenericsSomeFieldsGeneric() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("field1")) {
Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
} else if (name.equals("field2")) {
Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
} else if (name.equals("key")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else {
Assert.fail("Unexpected field "+field);
}
}
}
@Test
public void testPojoWithComplexHierarchy() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("field1")) {
Assert.assertTrue(field.type instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
} else if (name.equals("field2")) {
Assert.assertTrue(field.type instanceof TupleTypeInfo<?>);
Assert.assertTrue( ((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) );
} else if (name.equals("key")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else {
Assert.fail("Unexpected field "+field);
}
}
}
}
......@@ -22,8 +22,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.flink.api.common.functions.InvalidTypesException;
......@@ -46,9 +44,7 @@ import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.PojoField;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
......@@ -67,7 +63,6 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.HashMultiset;
public class TypeExtractorTest {
......@@ -350,449 +345,7 @@ public class TypeExtractorTest {
Assert.assertEquals(ti2.getTypeClass(), CustomType.class);
}
//
// Pojo Type tests
// A Pojo is a bean-style class with getters, setters and empty ctor
// OR a class with all fields public (or for every private field, there has to be a public getter/setter)
// everything else is a generic type (that can't be used for field selection)
//
// test with correct pojo types
public static class WC { // is a pojo
public ComplexNestedClass complex; // is a pojo
private int count; // is a BasicType
public WC() {
}
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass { // pojo
public static int ignoreStaticField;
public transient int ignoreTransientField;
public Date date; // generic type
public Integer someNumber; // BasicType
public float someFloat; // BasicType
public Tuple3<Long, Long, String> word; //Tuple Type with three basic types
public Object nothing; // generic type
public MyWritable hadoopCitizen; // writableType
}
// all public test
public static class AllPublic extends ComplexNestedClass {
public ArrayList<String> somethingFancy; // generic type
public HashMultiset<Integer> fancyIds; // generic type
public String[] fancyArray; // generic type
}
public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> {
public String field3;
}
public static class PojoWithGenerics<T1, T2> {
public int key;
public T1 field1;
public T2 field2;
}
public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {}
public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {}
// extends from Tuple and adds a field
public static class FromTuple extends Tuple3<String, String, Long> {
private static final long serialVersionUID = 1L;
public int special;
}
public static class IncorrectPojo {
private int isPrivate;
public int getIsPrivate() {
return isPrivate;
}
// setter is missing (intentional)
}
// correct pojo
public static class BeanStylePojo {
public String abc;
private int field;
public int getField() {
return this.field;
}
public void setField(int f) {
this.field = f;
}
}
public static class WrongCtorPojo {
public int a;
public WrongCtorPojo(int a) {
this.a = a;
}
}
// in this test, the location of the getters and setters is mixed across the type hierarchy.
public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> {
public void setPackageProtected(String in) {
this.packageProtected = in;
}
}
public static class GenericPojoGetterSetterCheck<T> {
T packageProtected;
public T getPackageProtected() {
return packageProtected;
}
}
@Test
public void testIncorrectPojos() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class);
Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class);
Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
}
@Test
public void testCorrectPojos() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
}
@Test
public void testPojoWC() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class);
checkWCPojoAsserts(typeForClass);
WC t = new WC();
t.complex = new ComplexNestedClass();
TypeInformation<?> typeForObject = TypeExtractor.getForObject(t);
checkWCPojoAsserts(typeForObject);
}
private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
Assert.assertFalse(typeInfo.isBasicType());
Assert.assertFalse(typeInfo.isTupleType());
Assert.assertEquals(9, typeInfo.getTotalFields());
Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>();
String[] fields = {"count","complex.date", "complex.hadoopCitizen", "complex.nothing",
"complex.someFloat", "complex.someNumber", "complex.word.f0",
"complex.word.f1", "complex.word.f2"};
int[] positions = {8,0,1,2,
3,4,5,
6,7};
Assert.assertEquals(fields.length, positions.length);
for(int i = 0; i < fields.length; i++) {
pojoType.getKey(fields[i], 0, ffd);
Assert.assertEquals("Too many keys returned", 1, ffd.size());
Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition());
ffd.clear();
}
pojoType.getKey("complex.word.*", 0, ffd);
Assert.assertEquals(3, ffd.size());
// check if it returns 5,6,7
for(FlatFieldDescriptor ffdE : ffd) {
final int pos = ffdE.getPosition();
Assert.assertTrue(pos <= 7 );
Assert.assertTrue(5 <= pos );
if(pos == 5) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 6) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 7) {
Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
}
}
ffd.clear();
pojoType.getKey("complex.*", 0, ffd);
Assert.assertEquals(8, ffd.size());
// check if it returns 0-7
for(FlatFieldDescriptor ffdE : ffd) {
final int pos = ffdE.getPosition();
Assert.assertTrue(ffdE.getPosition() <= 7 );
Assert.assertTrue(0 <= ffdE.getPosition() );
if(pos == 0) {
Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
}
if(pos == 1) {
Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
}
if(pos == 2) {
Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
}
if(pos == 3) {
Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
}
if(pos == 4) {
Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
}
if(pos == 5) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 6) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 7) {
Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
}
}
ffd.clear();
pojoType.getKey("*", 0, ffd);
Assert.assertEquals(9, ffd.size());
// check if it returns 0-8
for(FlatFieldDescriptor ffdE : ffd) {
Assert.assertTrue(ffdE.getPosition() <= 8 );
Assert.assertTrue(0 <= ffdE.getPosition() );
if(ffdE.getPosition() == 8) {
Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
}
}
ffd.clear();
TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex
Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
Assert.assertEquals(6, typeComplexNested.getArity());
Assert.assertEquals(8, typeComplexNested.getTotalFields());
PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested;
boolean dateSeen = false, intSeen = false, floatSeen = false,
tupleSeen = false, objectSeen = false, writableSeen = false;
for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
PojoField field = pojoTypeComplexNested.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("date")) {
if(dateSeen) {
Assert.fail("already seen");
}
dateSeen = true;
Assert.assertEquals(new GenericTypeInfo<Date>(Date.class), field.type);
Assert.assertEquals(Date.class, field.type.getTypeClass());
} else if(name.equals("someNumber")) {
if(intSeen) {
Assert.fail("already seen");
}
intSeen = true;
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
Assert.assertEquals(Integer.class, field.type.getTypeClass());
} else if(name.equals("someFloat")) {
if(floatSeen) {
Assert.fail("already seen");
}
floatSeen = true;
Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type);
Assert.assertEquals(Float.class, field.type.getTypeClass());
} else if(name.equals("word")) {
if(tupleSeen) {
Assert.fail("already seen");
}
tupleSeen = true;
Assert.assertTrue(field.type instanceof TupleTypeInfo<?>);
Assert.assertEquals(Tuple3.class, field.type.getTypeClass());
// do some more advanced checks on the tuple
TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.type;
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2));
} else if(name.equals("nothing")) {
if(objectSeen) {
Assert.fail("already seen");
}
objectSeen = true;
Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
Assert.assertEquals(Object.class, field.type.getTypeClass());
} else if(name.equals("hadoopCitizen")) {
if(writableSeen) {
Assert.fail("already seen");
}
writableSeen = true;
Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
Assert.assertEquals(MyWritable.class, field.type.getTypeClass());
} else {
Assert.fail("field "+field+" is not expected");
}
}
Assert.assertTrue("Field was not present", dateSeen);
Assert.assertTrue("Field was not present", intSeen);
Assert.assertTrue("Field was not present", floatSeen);
Assert.assertTrue("Field was not present", tupleSeen);
Assert.assertTrue("Field was not present", objectSeen);
Assert.assertTrue("Field was not present", writableSeen);
TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count
Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
Assert.assertEquals(typeInfo.getArity(), 2);
}
@Test
public void testPojoAllPublic() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class);
checkAllPublicAsserts(typeForClass);
TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() );
checkAllPublicAsserts(typeForObject);
}
private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
Assert.assertEquals(9, typeInformation.getArity());
Assert.assertEquals(11, typeInformation.getTotalFields());
// check if the three additional fields are identified correctly
boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false;
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("somethingFancy")) {
if(arrayListSeen) {
Assert.fail("already seen");
}
arrayListSeen = true;
Assert.assertTrue(field.type instanceof GenericTypeInfo);
Assert.assertEquals(ArrayList.class, field.type.getTypeClass());
} else if(name.equals("fancyIds")) {
if(multisetSeen) {
Assert.fail("already seen");
}
multisetSeen = true;
Assert.assertTrue(field.type instanceof GenericTypeInfo);
Assert.assertEquals(HashMultiset.class, field.type.getTypeClass());
} else if(name.equals("fancyArray")) {
if(strArraySeen) {
Assert.fail("already seen");
}
strArraySeen = true;
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
Assert.assertEquals(String[].class, field.type.getTypeClass());
} else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen").contains(name)) {
// ignore these, they are inherited from the ComplexNestedClass
}
else {
Assert.fail("field "+field+" is not expected");
}
}
Assert.assertTrue("Field was not present", arrayListSeen);
Assert.assertTrue("Field was not present", multisetSeen);
Assert.assertTrue("Field was not present", strArraySeen);
}
@Test
public void testPojoExtendingTuple() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class);
checkFromTuplePojo(typeForClass);
FromTuple ft = new FromTuple();
ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft);
checkFromTuplePojo(typeForObject);
}
private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
Assert.assertEquals(4, typeInformation.getTotalFields());
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("special")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else if(name.equals("f0") || name.equals("f1")) {
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
} else if(name.equals("f2")) {
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
} else {
Assert.fail("unexpected field");
}
}
}
@Test
public void testPojoWithGenerics() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("field1")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else if (name.equals("field2")) {
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
} else if (name.equals("field3")) {
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
} else if (name.equals("key")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else {
Assert.fail("Unexpected field "+field);
}
}
}
/**
* Test if the TypeExtractor is accepting untyped generics,
* making them GenericTypes
*/
@Test
@Ignore // kryo needed.
public void testPojoWithGenericsSomeFieldsGeneric() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("field1")) {
Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
} else if (name.equals("field2")) {
Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
} else if (name.equals("key")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else {
Assert.fail("Unexpected field "+field);
}
}
}
@Test
public void testPojoWithComplexHierarchy() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
String name = field.field.getName();
if(name.equals("field1")) {
Assert.assertTrue(field.type instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
} else if (name.equals("field2")) {
Assert.assertTrue(field.type instanceof TupleTypeInfo<?>);
Assert.assertTrue( ((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) );
} else if (name.equals("key")) {
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
} else {
Assert.fail("Unexpected field "+field);
}
}
}
// End of Pojo type tests
public static class CustomType {
public String myField1;
......@@ -1691,20 +1244,21 @@ public class TypeExtractorTest {
public T myField;
}
public static class InType extends MyObject<String> {}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
@Ignore
public void testParamertizedCustomObject() {
RichMapFunction<?, ?> function = new RichMapFunction<MyObject<String>, MyObject<String>>() {
RichMapFunction<?, ?> function = new RichMapFunction<InType, MyObject<String>>() {
private static final long serialVersionUID = 1L;
@Override
public MyObject<String> map(MyObject<String> value) throws Exception {
public MyObject<String> map(InType value) throws Exception {
return null;
}
};
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.TypeExtractorTest$MyObject"));
TypeInformation<?> inType = TypeExtractor.createTypeInfo(InType.class);
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) inType);
Assert.assertTrue(ti instanceof PojoTypeInfo);
}
......
......@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java;
package org.apache.flink.api.java.typeutils.runtime;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Field;
public class SerializationSpeedTest {
public class FieldAccessMinibenchmark {
static Field wordDescField;
static Field wordField;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import java.util.Arrays;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.junit.Assert;
import org.junit.Ignore;
@Ignore // TODO
public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple> {
TypeInformation<PojoContainingTuple> type = TypeExtractor.getForClass(PojoContainingTuple.class);
PojoContainingTuple[] data = new PojoContainingTuple[]{
new PojoContainingTuple(1, 1L, 1L),
new PojoContainingTuple(2, 2L, 2L),
new PojoContainingTuple(8519, 85190L, 85190L),
new PojoContainingTuple(-51498, 85191L, 85191L),
};
@Override
protected TypeComparator<PojoContainingTuple> createComparator(boolean ascending) {
Assert.assertTrue(type instanceof CompositeType);
CompositeType<PojoContainingTuple> cType = (CompositeType<PojoContainingTuple>) type;
ExpressionKeys<PojoContainingTuple> keys = new ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType);
boolean[] orders = new boolean[keys.getNumberOfKeyFields()];
Arrays.fill(orders, true);
return cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0);
}
@Override
protected TypeSerializer<PojoContainingTuple> createSerializer() {
return type.createSerializer();
}
@Override
protected PojoContainingTuple[] getSortedTestData() {
return data;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* This file belongs to the PojoComparatorTest test
*
*/
public class PojoContainingTuple {
public int someInt;
public String someString = "abc";
public Tuple2<Long, Long> theTuple;
public PojoContainingTuple() {}
public PojoContainingTuple(int i, long l1, long l2) {
someInt = i;
theTuple = new Tuple2<Long, Long>(l1, l2);
}
@Override
public boolean equals(Object obj) {
if(obj instanceof PojoContainingTuple) {
PojoContainingTuple other = (PojoContainingTuple) obj;
return someInt == other.someInt && theTuple.equals(other.theTuple);
}
return false;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
public class PojoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
@Override
protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
TypeInformation<T> typeInfo = TypeExtractor.getForClass(type);
return typeInfo.createSerializer();
}
}
......@@ -25,6 +25,7 @@ import java.util.LinkedList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -33,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
......@@ -42,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class DistinctITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 6;
private static int NUM_PROGRAMS = 8;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
......@@ -237,6 +239,49 @@ public class DistinctITCase extends JavaProgramTestBase {
"5,2\n" +
"5,3\n";
}
case 7: {
/*
* check correctness of distinct on tuples with field expressions
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
.distinct("f0").project(0).types(Integer.class);
reduceDs.writeAsCsv(resultPath);
env.execute();
// return expected result
return "1\n" +
"2\n";
}
case 8: {
/*
* check correctness of distinct on Pojos
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
DataSet<Integer> reduceDs = ds.distinct("nestedPojo.longNumber").map(new MapFunction<CollectionDataSets.POJO, Integer>() {
@Override
public Integer map(POJO value) throws Exception {
return (int) value.nestedPojo.longNumber;
}
});
reduceDs.writeAsText(resultPath);
env.execute();
// return expected result
return "10000\n20000\n30000\n";
}
default:
throw new IllegalArgumentException("Invalid program id");
}
......
......@@ -36,6 +36,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
......@@ -45,7 +46,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class PartitionITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 3;
private static int NUM_PROGRAMS = 4;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
......@@ -224,6 +225,29 @@ public class PartitionITCase extends JavaProgramTestBase {
"5\n" +
"6\n";
}
case 4: {
/*
* Test hash partition with key expression
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(3);
DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
DataSet<Long> uniqLongs = ds
.partitionByHash("nestedPojo.longNumber").setParallelism(4)
.mapPartition(new UniqueNestedPojoLongMapper());
uniqLongs.writeAsText(resultPath);
env.execute();
// return expected result
return "10000\n" +
"20000\n" +
"30000\n";
}
default:
throw new IllegalArgumentException("Invalid program id");
......@@ -246,6 +270,21 @@ public class PartitionITCase extends JavaProgramTestBase {
}
}
public static class UniqueNestedPojoLongMapper implements MapPartitionFunction<POJO, Long> {
private static final long serialVersionUID = 1L;
@Override
public void mapPartition(Iterable<POJO> records, Collector<Long> out) throws Exception {
HashSet<Long> uniq = new HashSet<Long>();
for(POJO t : records) {
uniq.add(t.nestedPojo.longNumber);
}
for(Long l : uniq) {
out.collect(l);
}
}
}
public static class PartitionIndexMapper extends RichMapFunction<Long, Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
......
......@@ -278,6 +278,19 @@ public class CollectionDataSets {
return env.fromCollection(data);
}
public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) {
List<POJO> data = new ArrayList<POJO>();
data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L)); // 5x
data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
data.add(new POJO(2, "Second",20, 200, 2000L, "Two", 20000L));
data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L)); // 2x
data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L));
return env.fromCollection(data);
}
public static class POJO {
public int number;
public String str;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册