提交 ad1d9362 编写于 作者: S Stephan Ewen

[FLINK-2071] [java api] Fix serializability issue with projectsion function.

Improve type safety.
Minor cleanups in ProjectOperator.
上级 6220f34b
...@@ -82,7 +82,7 @@ public class ProjectOperator<IN, OUT extends Tuple> ...@@ -82,7 +82,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
* Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/> * Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
* <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br> * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
* The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br> * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
* Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}. * Additional fields can be added to the projection by calling this method repeatedly.
* *
* <b>Note: With the current implementation, the Project transformation looses type information.</b> * <b>Note: With the current implementation, the Project transformation looses type information.</b>
* *
...@@ -94,8 +94,7 @@ public class ProjectOperator<IN, OUT extends Tuple> ...@@ -94,8 +94,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
* @see DataSet * @see DataSet
* @see ProjectOperator * @see ProjectOperator
*/ */
@SuppressWarnings("hiding") public <R extends Tuple> ProjectOperator<?, R> project(int... fieldIndexes) {
public <OUT extends Tuple> ProjectOperator<?, OUT> project(int... fieldIndexes) {
proj.acceptAdditionalIndexes(fieldIndexes); proj.acceptAdditionalIndexes(fieldIndexes);
return proj.projectTupleX(); return proj.projectTupleX();
...@@ -103,10 +102,10 @@ public class ProjectOperator<IN, OUT extends Tuple> ...@@ -103,10 +102,10 @@ public class ProjectOperator<IN, OUT extends Tuple>
/** /**
* Deprecated method only kept for compatibility. * Deprecated method only kept for compatibility.
*/ */
@SuppressWarnings({ "unchecked", "hiding" }) @SuppressWarnings("unchecked")
@Deprecated @Deprecated
public <OUT extends Tuple> ProjectOperator<IN, OUT> types(Class<?>... types) { public <R extends Tuple> ProjectOperator<IN, R> types(Class<?>... types) {
TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType(); TupleTypeInfo<R> typeInfo = (TupleTypeInfo<R>)this.getResultType();
if(types.length != typeInfo.getArity()) { if(types.length != typeInfo.getArity()) {
throw new InvalidProgramException("Provided types do not match projection."); throw new InvalidProgramException("Provided types do not match projection.");
...@@ -117,12 +116,12 @@ public class ProjectOperator<IN, OUT extends Tuple> ...@@ -117,12 +116,12 @@ public class ProjectOperator<IN, OUT extends Tuple>
throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection"); throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
} }
} }
return (ProjectOperator<IN, OUT>) this; return (ProjectOperator<IN, R>) this;
} }
public static class Projection<T> { public static class Projection<T> {
private final DataSet<T> ds; private final DataSet<T> ds;
private int[] fieldIndexes; private int[] fieldIndexes;
public Projection(DataSet<T> ds, int[] fieldIndexes) { public Projection(DataSet<T> ds, int[] fieldIndexes) {
...@@ -138,9 +137,9 @@ public class ProjectOperator<IN, OUT extends Tuple> ...@@ -138,9 +137,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields."); "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
} }
int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity(); int maxFieldIndex = ds.getType().getArity();
for(int i=0; i<fieldIndexes.length; i++) { for (int fieldIndexe : fieldIndexes) {
Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex); Preconditions.checkElementIndex(fieldIndexe, maxFieldIndex);
} }
this.ds = ds; this.ds = ds;
...@@ -160,8 +159,8 @@ public class ProjectOperator<IN, OUT extends Tuple> ...@@ -160,8 +159,8 @@ public class ProjectOperator<IN, OUT extends Tuple>
this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length); this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length);
int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity(); int maxFieldIndex = ds.getType().getArity();
for(int i=0; i<additionalIndexes.length; i++) { for (int i = 0; i < additionalIndexes.length; i++) {
Preconditions.checkElementIndex(additionalIndexes[i], maxFieldIndex); Preconditions.checkElementIndex(additionalIndexes[i], maxFieldIndex);
this.fieldIndexes[offset + i] = additionalIndexes[i]; this.fieldIndexes[offset + i] = additionalIndexes[i];
...@@ -186,7 +185,7 @@ public class ProjectOperator<IN, OUT extends Tuple> ...@@ -186,7 +185,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <OUT extends Tuple> ProjectOperator<T, OUT> projectTupleX() { public <OUT extends Tuple> ProjectOperator<T, OUT> projectTupleX() {
ProjectOperator<T, OUT> projOperator = null; ProjectOperator<T, OUT> projOperator;
switch (fieldIndexes.length) { switch (fieldIndexes.length) {
case 1: projOperator = (ProjectOperator<T, OUT>) projectTuple1(); break; case 1: projOperator = (ProjectOperator<T, OUT>) projectTuple1(); break;
......
...@@ -28,32 +28,47 @@ import org.apache.flink.api.java.tuple.Tuple; ...@@ -28,32 +28,47 @@ import org.apache.flink.api.java.tuple.Tuple;
public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> { public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType, ExecutionConfig executionConfig) { public PlanProjectOperator(int[] fields, String name,
super(new MapProjector<T, R>(fields, outType.createSerializer(executionConfig).createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name); TypeInformation<T> inType, TypeInformation<R> outType,
ExecutionConfig executionConfig)
{
super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
}
@SuppressWarnings("unchecked")
private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
return (MapFunction<T, R>) new MapProjector<X, R>(fields);
} }
public static final class MapProjector<T, R extends Tuple>
extends AbstractRichFunction public static final class MapProjector<T extends Tuple, R extends Tuple>
implements MapFunction<T, R> extends AbstractRichFunction implements MapFunction<T, R>
{ {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final int[] fields; private final int[] fields;
private final R outTuple; private final Tuple outTuple;
private MapProjector(int[] fields, R outTupleInstance) { private MapProjector(int[] fields) {
this.fields = fields; this.fields = fields;
this.outTuple = outTupleInstance; try {
this.outTuple = Tuple.getTupleClass(fields.length).newInstance();
}
catch (Exception e) {
// this should never happen
throw new RuntimeException(e);
}
} }
// TODO We should use code generation for this. // TODO We should use code generation for this.
@SuppressWarnings("unchecked")
@Override @Override
public R map(T inTuple) throws Exception { public R map(Tuple inTuple) throws Exception {
for (int i = 0; i < fields.length; i++) {
for(int i=0; i<fields.length; i++) { outTuple.setField(inTuple.getField(fields[i]), i);
outTuple.setField(((Tuple)inTuple).getField(fields[i]), i);
} }
return outTuple;
return (R) outTuple;
} }
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册