提交 fd0be2ff 编写于 作者: R Robert Metzger

Various fixes

- improved error messages
- the composite vs atomic bug aljoscha found
- Comparator test for Pojo Comparator enabled
- TODO removed
- string-based key expression for group sorting fields definition
- support for specifying "select all" using * and now also _ (for scala fans)
- Exception if user is having multiple fields with the same name in the class
上级 6b493fb0
......@@ -85,7 +85,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
&& logicalKeyField <= logicalField + (localFieldType.getTotalFields() - 1) ) // check if logical field + lookahead could contain our key
) {
// we found a compositeType that is containing the logicalKeyField we are looking for --> create comparator
addCompareField(localFieldId, ((CompositeType<?>) localFieldType).createComparator(new int[] {logicalKeyField}, orders, logicalField));
addCompareField(localFieldId, ((CompositeType<?>) localFieldType).createComparator(new int[] {logicalKeyField}, new boolean[] {orders[logicalKeyFieldIndex]}, logicalField));
}
// maintain logicalField
......
......@@ -35,7 +35,7 @@ import org.apache.flink.util.Collector;
public class PojoExample {
/**
* This is the POJO (Plain Old Java Object) that is bein used
* This is the POJO (Plain Old Java Object) that is being used
* for all the operations.
* As long as all fields are public or have a getter/setter, the system can handle them
*/
......
......@@ -139,8 +139,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
return po;
}
else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { //was field position key
//TODO ask stephan
else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
......@@ -167,19 +166,6 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
return po;
}
// else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
//
// int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
// UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
// GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po =
// new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
//
// po.setCombinable(combinable);
// po.setInput(input);
// po.setDegreeOfParallelism(this.getParallelism());
//
// return po;
// }
else {
throw new UnsupportedOperationException("Unrecognized key type.");
}
......
......@@ -160,6 +160,7 @@ public abstract class Keys<T> {
public static class ExpressionKeys<T> extends Keys<T> {
public static final String SELECT_ALL_CHAR = "*";
public static final String SELECT_ALL_CHAR_SCALA = "_";
/**
* Flattened fields representing keys fields
......@@ -245,7 +246,9 @@ public abstract class Keys<T> {
*/
public ExpressionKeys(String[] expressionsIn, TypeInformation<T> type) {
if(!(type instanceof CompositeType<?>)) {
throw new IllegalArgumentException("Type "+type+" is not a composite type. Key expressions are not supported.");
throw new IllegalArgumentException("Type "+type+" is not a composite type. "
+ "Key expressions are only supported on POJO types and Tuples. "
+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
}
CompositeType<T> cType = (CompositeType<T>) type;
......@@ -259,7 +262,7 @@ public abstract class Keys<T> {
List<FlatFieldDescriptor> keys = new ArrayList<FlatFieldDescriptor>(); // use separate list to do a size check
cType.getKey(expressions[i], 0, keys);
if(keys.size() == 0) {
throw new IllegalArgumentException("Unable to extract key from expression "+expressions[i]+" on key "+cType);
throw new IllegalArgumentException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
}
keyFields.addAll(keys);
}
......
......@@ -19,15 +19,20 @@
package org.apache.flink.api.java.operators;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.functions.FirstReducer;
import java.util.Arrays;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import com.google.common.base.Preconditions;
/**
* SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/>
......@@ -43,6 +48,9 @@ public class SortedGrouping<T> extends Grouping<T> {
private int[] groupSortKeyPositions;
private Order[] groupSortOrders ;
/*
* int sorting keys for tuples
*/
public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order order) {
super(set, keys);
......@@ -52,10 +60,27 @@ public class SortedGrouping<T> extends Grouping<T> {
if (field >= dataSet.getType().getArity()) {
throw new IllegalArgumentException("Order key out of tuple bounds.");
}
// use int-based expression key to properly resolve nested tuples for grouping
ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType());
this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
this.groupSortOrders = new Order[groupSortKeyPositions.length];
Arrays.fill(this.groupSortOrders, order);
}
/*
* String sorting for Pojos and tuples
*/
public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order order) {
super(set, keys);
this.groupSortKeyPositions = new int[]{field};
this.groupSortOrders = new Order[]{order};
if (!(dataSet.getType() instanceof CompositeType)) {
throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
}
// resolve String-field to int using the expression keys
ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType());
this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
this.groupSortOrders = new Order[groupSortKeyPositions.length];
Arrays.fill(this.groupSortOrders, order); // if field == "*"
}
protected int[] getGroupSortKeyPositions() {
......@@ -108,7 +133,7 @@ public class SortedGrouping<T> extends Grouping<T> {
/**
* Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group on the specified field in the specified {@link Order}.</br>
* <b>Note: Only groups of Tuple elements can be sorted.</b><br/>
* <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br/>
* Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls.
*
* @param field The Tuple field on which the group is sorted.
......@@ -120,22 +145,52 @@ public class SortedGrouping<T> extends Grouping<T> {
*/
public SortedGrouping<T> sortGroup(int field, Order order) {
int pos;
if (!dataSet.getType().isTupleType()) {
throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
}
if (field >= dataSet.getType().getArity()) {
throw new IllegalArgumentException("Order key out of tuple bounds.");
}
ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType());
addSortGroupInternal(ek, order);
return this;
}
private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
Preconditions.checkArgument(order != null, "Order can not be null");
int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
int newLength = this.groupSortKeyPositions.length + 1;
int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length;
this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength);
this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength);
pos = newLength - 1;
int pos = newLength - additionalKeyPositions.length;
int off = newLength - additionalKeyPositions.length;
for(;pos < newLength; pos++) {
this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off];
this.groupSortOrders[pos] = order; // use the same order
}
}
/**
* Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements within a group on the specified field in the specified {@link Order}.</br>
* <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br/>
* Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls.
*
* @param field The Tuple or Pojo field on which the group is sorted.
* @param order The Order in which the specified field is sorted.
* @return A SortedGrouping with specified order of group element.
*
* @see org.apache.flink.api.java.tuple.Tuple
* @see Order
*/
public SortedGrouping<T> sortGroup(String field, Order order) {
this.groupSortKeyPositions[pos] = field;
this.groupSortOrders[pos] = order;
if (! (dataSet.getType() instanceof CompositeType)) {
throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
}
ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType());
addSortGroupInternal(ek, order);
return this;
}
}
......@@ -196,7 +196,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
/**
* Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group on the specified field in the specified {@link Order}.</br>
* <b>Note: Only groups of Tuple elements can be sorted.</b><br/>
* <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br/>
* Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls.
*
* @param field The Tuple field on which the group is sorted.
......@@ -210,4 +210,19 @@ public class UnsortedGrouping<T> extends Grouping<T> {
return new SortedGrouping<T>(this.dataSet, this.keys, field, order);
}
/**
* Sorts Pojos within a group on the specified field in the specified {@link Order}.</br>
* <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br/>
* Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls.
*
* @param field The Tuple or Pojo field on which the group is sorted.
* @param order The Order in which the specified field is sorted.
* @return A SortedGrouping with specified order of group element.
*
* @see Order
*/
public SortedGrouping<T> sortGroup(String field, Order order) {
return new SortedGrouping<T>(this.dataSet, this.keys, field, order);
}
}
......@@ -118,7 +118,7 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
@Override
public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
// handle 'select all' first
if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) {
if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
int keyPosition = 0;
for(PojoField field : fields) {
if(field.type instanceof AtomicType) {
......@@ -145,6 +145,10 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
fieldId += fields[i].type.getTotalFields()-1;
}
if (fields[i].field.getName().equals(fieldExpression)) {
if(fields[i].type instanceof CompositeType) {
throw new IllegalArgumentException("The specified field '"+fieldExpression+"' is refering to a composite type.\n"
+ "Either select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' operator or specify a field in the sub-type");
}
result.add(new FlatFieldDescriptor(offset + fieldId, fields[i].type));
return;
}
......@@ -158,13 +162,13 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
for (int i = 0; i < fields.length; i++) {
if (fields[i].field.getName().equals(firstField)) {
if (!(fields[i].type instanceof CompositeType<?>)) {
throw new RuntimeException("Field "+fields[i].type+" is not composite type");
throw new RuntimeException("Field "+fields[i].type+" (specified by '"+fieldExpression+"') is not a composite type");
}
CompositeType<?> cType = (CompositeType<?>) fields[i].type;
cType.getKey(rest, offset + fieldId, result); // recurse
return;
}
fieldId++;
fieldId += fields[i].type.getTotalFields();
}
throw new RuntimeException("Unable to find field "+fieldExpression+" in type "+this+" (looking for '"+firstField+"')");
}
......
......@@ -95,7 +95,7 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
@Override
public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
// handle 'select all'
if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) {
if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
int keyPosition = 0;
for(TypeInformation<?> type : types) {
if(type instanceof AtomicType) {
......@@ -157,7 +157,10 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
for(int i = 0; i < pos; i++) {
offset += types[i].getTotalFields() - 1; // this adds only something to offset if its a composite type.
}
if(types[pos] instanceof CompositeType) {
throw new IllegalArgumentException("The specified field '"+fieldExpression+"' is refering to a composite type.\n"
+ "Either select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' operator or specify a field in the sub-type");
}
result.add(new FlatFieldDescriptor(offset + pos, types[pos]));
}
......
......@@ -337,7 +337,7 @@ public class TypeExtractor {
int fieldCount = countFieldsInClass(tAsClass);
if(fieldCount != tupleSubTypes.length) {
// the class is not a real tuple because it contains additional fields. treat as a pojo
return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>() ); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
}
return new TupleTypeInfo(tAsClass, tupleSubTypes);
......@@ -399,7 +399,7 @@ public class TypeExtractor {
}
// objects with generics are treated as raw type
else if (t instanceof ParameterizedType) { //TODO
return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType(), typeHierarchy);
return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType(), typeHierarchy, (ParameterizedType) t);
}
// no tuple, no TypeVariable, no generic type
else if (t instanceof Class) {
......@@ -859,8 +859,11 @@ public class TypeExtractor {
return new TypeExtractor().privateGetForClass(clazz, new ArrayList<Type>());
}
@SuppressWarnings("unchecked")
private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) {
return privateGetForClass(clazz, typeHierarchy, null);
}
@SuppressWarnings("unchecked")
private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
Validate.notNull(clazz);
// check for abstract classes or interfaces
......@@ -931,7 +934,7 @@ public class TypeExtractor {
// this is a type generated by Avro. GenericTypeInfo is able to handle this case because its using Avro.
return new GenericTypeInfo<X>(clazz);
}
TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy);
TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy, clazzTypeHint);
if (pojoType != null) {
return pojoType;
}
......@@ -968,7 +971,7 @@ public class TypeExtractor {
// check for getter
if( // The name should be "get<FieldName>" or "<fieldName>" (for scala).
(m.getName().toLowerCase().contains("get"+fieldNameLow) || m.getName().toLowerCase().contains(fieldNameLow)) &&
(m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) &&
// no arguments for the getter
m.getParameterTypes().length == 0 &&
// return type is same as field type (or the generic variant of it)
......@@ -980,7 +983,7 @@ public class TypeExtractor {
hasGetter = true;
}
// check for setters (<FieldName>_$eq for scala)
if((m.getName().toLowerCase().contains("set"+fieldNameLow) || m.getName().toLowerCase().contains(fieldNameLow+"_$eq")) &&
if((m.getName().toLowerCase().equals("set"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow+"_$eq")) &&
m.getParameterTypes().length == 1 && // one parameter of the field's type
( m.getParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
// return type is void.
......@@ -1006,13 +1009,16 @@ public class TypeExtractor {
}
}
private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy) {
private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
// try to create Type hierarchy, if the incoming one is empty.
if(typeHierarchy.size() == 0) {
recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class);
}
if(clazzTypeHint != null) {
recursivelyGetTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
}
List<Field> fields = removeNonObjectFields(getAllDeclaredFields(clazz));
List<Field> fields = getAllDeclaredFields(clazz);
List<PojoField> pojoFields = new ArrayList<PojoField>();
for (Field field : fields) {
Type fieldType = field.getGenericType();
......@@ -1057,32 +1063,40 @@ public class TypeExtractor {
return pojoType;
}
// recursively determine all declared fields
private static List<Field> getAllDeclaredFields(Class<?> clazz) {
/**
* recursively determine all declared fields
* This is required because getFields() is not returning
*/
public static List<Field> getAllDeclaredFields(Class<?> clazz) {
List<Field> result = new ArrayList<Field>();
while (clazz != null) {
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if(Modifier.isTransient(field.getModifiers()) || Modifier.isStatic(field.getModifiers())) {
continue; // we have no use for transient or static fields
}
if(hasFieldWithSameName(field.getName(), result)) {
throw new RuntimeException("The field "+field+" is already contained in the hierarchy of the class "+clazz+"."
+ "Please use unique field names through your classes hierarchy");
}
result.add(field);
}
clazz = clazz.getSuperclass();
}
return result;
}
/**
* Remove transient and static fields from a list of fields.
*/
private static List<Field> removeNonObjectFields(List<Field> fields) {
List<Field> result = new ArrayList<Field>();
for(Field field: fields) {
if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) {
result.add(field);
private static boolean hasFieldWithSameName(String name, List<Field> fields) {
for(Field field : fields) {
if(name.equals(field.getName())) {
return true;
}
}
return result;
return false;
}
// recursively determine all declared methods
private static List<Method> getAllDeclaredMethods(Class<?> clazz) {
List<Method> result = new ArrayList<Method>();
......@@ -1111,7 +1125,7 @@ public class TypeExtractor {
int numFields = t.getArity();
if(numFields != countFieldsInClass(value.getClass())) {
// not a tuple since it has more fields.
return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>()); // we immediately call analyze Pojo here, because
return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>(), null); // we immediately call analyze Pojo here, because
// there is currently no other type that can handle such a class.
}
......
......@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.operators;
import java.lang.reflect.InvocationTargetException;
......@@ -24,10 +23,13 @@ import java.util.Arrays;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -93,7 +95,7 @@ public class KeysTest {
}
@Test
public void testInvalid() throws Throwable {
public void testInvalidTuple() throws Throwable {
TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, String>> typeInfo = new TupleTypeInfo<Tuple3<String,Tuple3<String,String,String>,String>>(
BasicTypeInfo.STRING_TYPE_INFO,
new TupleTypeInfo<Tuple3<String, String, String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),
......@@ -101,7 +103,12 @@ public class KeysTest {
ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>> fpk;
String[][] tests = new String[][] {
new String[] {"f11"},new String[] {"f-35"}, new String[] {"f0.f33"}, new String[] {"f1.f33"}
new String[] {"f0.f1"}, // nesting into unnested
new String[] {"f11"},
new String[] {"f-35"},
new String[] {"f0.f33"},
new String[] {"f1.f33"},
new String[] {"f1"} // select full tuple without saying "f1.*"
};
for(int i = 0; i < tests.length; i++) {
Throwable e = null;
......@@ -115,6 +122,28 @@ public class KeysTest {
}
}
@Test
public void testInvalidPojo() throws Throwable {
TypeInformation<ComplexNestedClass> ti = TypeExtractor.getForClass(ComplexNestedClass.class);
ExpressionKeys<ComplexNestedClass> ek;
String[][] tests = new String[][] {
new String[] {"nonexistent"},
new String[] {"date.abc"}, // nesting into unnested
new String[] {"word"} // select full tuple without saying "f1.*"
};
for(int i = 0; i < tests.length; i++) {
Throwable e = null;
try {
ek = new ExpressionKeys<ComplexNestedClass>(tests[i], ti);
} catch(Throwable t) {
// System.err.println("Message: "+t.getMessage()); t.printStackTrace();
e = t;
}
Assert.assertNotNull(e);
}
}
@Test
public void testTupleKeyExpansion() {
TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, String>> typeInfo = new TupleTypeInfo<Tuple3<String,Tuple3<String,String,String>,String>>(
......@@ -144,6 +173,10 @@ public class KeysTest {
fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"*"}, typeInfo);
Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions());
// scala style "select all"
fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"_"}, typeInfo);
Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions());
// this was a bug:
fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"f2"}, typeInfo);
Assert.assertArrayEquals(new int[] {4}, fpk.computeLogicalKeyPositions());
......@@ -177,10 +210,45 @@ public class KeysTest {
complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"*"}, complexTypeInfo);
Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, complexFpk.computeLogicalKeyPositions());
// scala style select all
complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"_"}, complexTypeInfo);
Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, complexFpk.computeLogicalKeyPositions());
complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"f1.f0.*"}, complexTypeInfo);
Assert.assertArrayEquals(new int[] {1,2,3}, complexFpk.computeLogicalKeyPositions());
complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"f2"}, complexTypeInfo);
Assert.assertArrayEquals(new int[] {6}, complexFpk.computeLogicalKeyPositions());
}
public static class Pojo1 {
public String a;
public String b;
}
public static class Pojo2 {
public String a2;
public String b2;
}
public static class PojoWithMultiplePojos {
public Pojo1 p1;
public Pojo2 p2;
public Integer i0;
}
@Test
public void testPojoKeys() {
TypeInformation<PojoWithMultiplePojos> ti = TypeExtractor.getForClass(PojoWithMultiplePojos.class);
ExpressionKeys<PojoWithMultiplePojos> ek;
ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"*"}, ti);
Assert.assertArrayEquals(new int[] {0,1,2,3,4}, ek.computeLogicalKeyPositions());
ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"p1.*"}, ti);
Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions());
ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"p2.*"}, ti);
Assert.assertArrayEquals(new int[] {3,4}, ek.computeLogicalKeyPositions());
ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"i0"}, ti);
Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions());
}
}
......@@ -50,6 +50,14 @@ import com.google.common.collect.HashMultiset;
*/
public class PojoTypeExtractionTest {
public static class HasDuplicateField extends WC {
private int count; // duplicate
}
@Test(expected=RuntimeException.class)
public void testDuplicateFieldException() {
TypeExtractor.createTypeInfo(HasDuplicateField.class);
}
// test with correct pojo types
public static class WC { // is a pojo
......@@ -210,6 +218,10 @@ public class PojoTypeExtractionTest {
}
ffd.clear();
// scala style full tuple selection for pojos
pojoType.getKey("complex.word._", 0, ffd);
Assert.assertEquals(3, ffd.size());
ffd.clear();
pojoType.getKey("complex.*", 0, ffd);
Assert.assertEquals(8, ffd.size());
......
......@@ -1247,7 +1247,6 @@ public class TypeExtractorTest {
public static class InType extends MyObject<String> {}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
// @Ignore
public void testParamertizedCustomObject() {
RichMapFunction<?, ?> function = new RichMapFunction<InType, MyObject<String>>() {
private static final long serialVersionUID = 1L;
......
......@@ -28,10 +28,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.junit.Assert;
import org.junit.Ignore;
//@Ignore // TODO
public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple> {
TypeInformation<PojoContainingTuple> type = TypeExtractor.getForClass(PojoContainingTuple.class);
......@@ -39,7 +37,7 @@ public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple>
new PojoContainingTuple(1, 1L, 1L),
new PojoContainingTuple(2, 2L, 2L),
new PojoContainingTuple(8519, 85190L, 85190L),
new PojoContainingTuple(-51498, 85191L, 85191L),
new PojoContainingTuple(8520, 85191L, 85191L),
};
@Override
......@@ -48,7 +46,7 @@ public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple>
CompositeType<PojoContainingTuple> cType = (CompositeType<PojoContainingTuple>) type;
ExpressionKeys<PojoContainingTuple> keys = new ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType);
boolean[] orders = new boolean[keys.getNumberOfKeyFields()];
Arrays.fill(orders, true);
Arrays.fill(orders, ascending);
return cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0);
}
......
......@@ -18,7 +18,7 @@
package org.apache.flink.test.exampleScalaPrograms;
import org.apache.flink.examples.java.graph.EnumTrianglesOpt;
import org.apache.flink.examples.scala.graph.EnumTrianglesOpt;
import org.apache.flink.test.testdata.EnumTriangleData;
import org.apache.flink.test.util.JavaProgramTestBase;
......
......@@ -24,7 +24,7 @@ import java.util.Collection;
import java.util.LinkedList;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.examples.java.graph.PageRankBasic;
import org.apache.flink.examples.scala.graph.PageRankBasic;
import org.apache.flink.test.testdata.PageRankData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.runner.RunWith;
......
......@@ -53,7 +53,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class GroupReduceITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 19;
private static int NUM_PROGRAMS = 26;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
......@@ -598,7 +598,175 @@ public class GroupReduceITCase extends JavaProgramTestBase {
// return expected result
return "3\n1\n";
}
}
case 20: {
/*
* Test string-based definition on group sort, based on test:
* check correctness of groupReduce with descending group sort
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
groupBy(1).sortGroup("f2", Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
reduceDs.writeAsCsv(resultPath);
env.execute();
// return expected result
return "1,1,Hi\n" +
"5,2,Hello world-Hello\n" +
"15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
}
case 21: {
/*
* Test int-based definition on group sort, for (full) nested Tuple
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
DataSet<String> reduceDs = ds.groupBy("f1").sortGroup(0, Order.DESCENDING).reduceGroup(new NestedTupleReducer());
reduceDs.writeAsText(resultPath);
env.execute();
// return expected result
return "a--(1,1)-(1,2)-(1,3)-\n" +
"b--(2,2)-\n"+
"c--(3,3)-(3,6)-(3,9)-\n";
}
case 22: {
/*
* Test int-based definition on group sort, for (partial) nested Tuple ASC
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.ASCENDING).reduceGroup(new NestedTupleReducer());
reduceDs.writeAsText(resultPath);
env.execute();
// return expected result
return "a--(1,3)-(1,2)-(2,1)-\n" +
"b--(2,2)-\n"+
"c--(3,3)-(3,6)-(4,9)-\n";
}
case 23: {
/*
* Test string-based definition on group sort, for (partial) nested Tuple DESC
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).reduceGroup(new NestedTupleReducer());
reduceDs.writeAsText(resultPath);
env.execute();
// return expected result
return "a--(2,1)-(1,3)-(1,2)-\n" +
"b--(2,2)-\n"+
"c--(4,9)-(3,3)-(3,6)-\n";
}
case 24: {
/*
* Test string-based definition on group sort, for two grouping keys
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).sortGroup("f0.f1", Order.DESCENDING).reduceGroup(new NestedTupleReducer());
reduceDs.writeAsText(resultPath);
env.execute();
// return expected result
return "a--(2,1)-(1,3)-(1,2)-\n" +
"b--(2,2)-\n"+
"c--(4,9)-(3,6)-(3,3)-\n";
}
case 25: {
/*
* Test string-based definition on group sort, for two grouping keys with Pojos
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("hadoopFan").sortGroup("theTuple.f0", Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, String>() {
@Override
public void reduce(
Iterable<PojoContainingTupleAndWritable> values,
Collector<String> out) throws Exception {
boolean once = false;
StringBuilder concat = new StringBuilder();
for(PojoContainingTupleAndWritable value : values) {
if(!once) {
concat.append(value.hadoopFan.get());
concat.append("---");
once = true;
}
concat.append(value.theTuple);
concat.append("-");
}
out.collect(concat.toString());
}
});
reduceDs.writeAsText(resultPath);
env.execute();
// return expected result
return "1---(10,100)-\n" +
"2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
}
case 26: {
/*
* Test grouping with pojo containing multiple pojos (was a bug)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getPojoWithMultiplePojos(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("hadoopFan").sortGroup("theTuple.f0", Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, String>() {
@Override
public void reduce(
Iterable<PojoContainingTupleAndWritable> values,
Collector<String> out) throws Exception {
boolean once = false;
StringBuilder concat = new StringBuilder();
for(PojoContainingTupleAndWritable value : values) {
if(!once) {
concat.append(value.hadoopFan.get());
concat.append("---");
once = true;
}
concat.append(value.theTuple);
concat.append("-");
}
out.collect(concat.toString());
}
});
reduceDs.writeAsText(resultPath);
env.execute();
// return expected result
return "1---(10,100)-\n" +
"2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
}
default: {
throw new IllegalArgumentException("Invalid program id");
}
......@@ -607,6 +775,27 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
public static class NestedTupleReducer implements GroupReduceFunction<Tuple2<Tuple2<Integer,Integer>,String>, String> {
@Override
public void reduce(
Iterable<Tuple2<Tuple2<Integer, Integer>, String>> values,
Collector<String> out)
throws Exception {
boolean once = false;
StringBuilder concat = new StringBuilder();
for(Tuple2<Tuple2<Integer, Integer>, String> value : values) {
if(!once) {
concat.append(value.f1).append("--");
once = true;
}
concat.append(value.f0); // the tuple with the sorted groups
concat.append("-");
}
out.collect(concat.toString());
}
}
public static class Tuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
......
......@@ -155,6 +155,26 @@ public class CollectionDataSets {
return env.fromCollection(data, type);
}
public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<Tuple2<Tuple2<Integer, Integer>, String>>();
data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(1,3), "a"));
data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(1,2), "a"));
data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(2,1), "a"));
data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(2,2), "b"));
data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(3,3), "c"));
data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(3,6), "c"));
data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(4,9), "c"));
TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new
TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>>(
new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO),
BasicTypeInfo.STRING_TYPE_INFO
);
return env.fromCollection(data, type);
}
public static DataSet<String> getStringDataSet(ExecutionEnvironment env) {
List<String> data = new ArrayList<String>();
......@@ -418,5 +438,37 @@ public class CollectionDataSets {
data.add(new Tuple3<Integer,CrazyNested, POJO>(2, new CrazyNested("two", "duo", 2L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) )); // 1x
return env.fromCollection(data);
}
public static class Pojo1 {
public String a;
public String b;
}
public static class Pojo2 {
public String a2;
public String b2;
}
public static class PojoWithMultiplePojos {
public Pojo1 p1;
public Pojo2 p2;
public Integer i0;
public PojoWithMultiplePojos() {}
public PojoWithMultiplePojos(String a, String b, String a1, String b1, Integer i0) {
p1 = new Pojo1();
p1.a = a;
p1.b = b;
p2 = new Pojo2();
p2.a2 = a1;
p2.a2 = b1;
this.i0 = i0;
}
}
public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) {
List<PojoWithMultiplePojos> data = new ArrayList<PojoWithMultiplePojos>();
data.add(new PojoWithMultiplePojos("a","aa","b","bb", 1));
data.add(new PojoWithMultiplePojos("b","bb","c","cc", 2));
data.add(new PojoWithMultiplePojos("d","dd","e","ee", 3));
return env.fromCollection(data);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册