提交 23289d6e 编写于 作者: S Stephan Ewen

[FLINK-1039] Fix pojo expression keys for group reduce

上级 32d168f4
......@@ -197,8 +197,8 @@ public abstract class Keys<T> {
public ExpressionKeys(String[] expressions, TypeInformation<T> type) {
if (!(type instanceof PojoTypeInfo<?>)) {
throw new UnsupportedOperationException("Key expressions can only be used on POJOs." + " " +
"A POCO must have a default constructor without arguments and not have readObject" +
" and/or writeObject methods. Also, it can only have nested POJOs or primitive (also boxed)" +
"A POJO must have a default constructor without arguments and not have readObject" +
" and/or writeObject methods. A current restriction is that it can only have nested POJOs or primitive (also boxed)" +
" fields.");
}
PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) type;
......@@ -212,7 +212,6 @@ public abstract class Keys<T> {
" type " + type.toString() + ".");
}
}
}
@Override
......
......@@ -166,6 +166,19 @@ public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
return po;
}
else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
po.setCombinable(combinable);
po.setInput(input);
po.setDegreeOfParallelism(this.getParallelism());
return po;
}
else {
throw new UnsupportedOperationException("Unrecognized key type.");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册