diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java index 9b7d5674d4a269bf80c2569a5aca3276eac401a7..55e182fa477dbda78e46bbe6fbbc9acbfad68e74 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java @@ -82,7 +82,7 @@ public class ProjectOperator * Continues a Project transformation on a {@link Tuple} {@link DataSet}.
* Note: Only Tuple DataSets can be projected using field indexes.
* The transformation projects each Tuple of the DataSet onto a (sub)set of fields.
- * Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}. + * Additional fields can be added to the projection by calling this method repeatedly. * * Note: With the current implementation, the Project transformation looses type information. * @@ -94,8 +94,7 @@ public class ProjectOperator * @see DataSet * @see ProjectOperator */ - @SuppressWarnings("hiding") - public ProjectOperator project(int... fieldIndexes) { + public ProjectOperator project(int... fieldIndexes) { proj.acceptAdditionalIndexes(fieldIndexes); return proj.projectTupleX(); @@ -103,10 +102,10 @@ public class ProjectOperator /** * Deprecated method only kept for compatibility. */ - @SuppressWarnings({ "unchecked", "hiding" }) + @SuppressWarnings("unchecked") @Deprecated - public ProjectOperator types(Class... types) { - TupleTypeInfo typeInfo = (TupleTypeInfo)this.getResultType(); + public ProjectOperator types(Class... types) { + TupleTypeInfo typeInfo = (TupleTypeInfo)this.getResultType(); if(types.length != typeInfo.getArity()) { throw new InvalidProgramException("Provided types do not match projection."); @@ -117,12 +116,12 @@ public class ProjectOperator throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection"); } } - return (ProjectOperator) this; + return (ProjectOperator) this; } public static class Projection { - private final DataSet ds; + private final DataSet ds; private int[] fieldIndexes; public Projection(DataSet ds, int[] fieldIndexes) { @@ -138,9 +137,9 @@ public class ProjectOperator "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields."); } - int maxFieldIndex = ((TupleTypeInfo)ds.getType()).getArity(); - for(int i=0; i this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length); - int maxFieldIndex = ((TupleTypeInfo)ds.getType()).getArity(); - for(int i=0; i */ @SuppressWarnings("unchecked") public ProjectOperator projectTupleX() { - ProjectOperator projOperator = null; + ProjectOperator projOperator; switch (fieldIndexes.length) { case 1: projOperator = (ProjectOperator) projectTuple1(); break; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java index 959b9297e45ba310b7103739ef24ded18d2c660a..101b89ba7bfebdea8965c6ade71f73e4b92f1070 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java @@ -28,32 +28,47 @@ import org.apache.flink.api.java.tuple.Tuple; public class PlanProjectOperator extends MapOperatorBase> { - public PlanProjectOperator(int[] fields, String name, TypeInformation inType, TypeInformation outType, ExecutionConfig executionConfig) { - super(new MapProjector(fields, outType.createSerializer(executionConfig).createInstance()), new UnaryOperatorInformation(inType, outType), name); + public PlanProjectOperator(int[] fields, String name, + TypeInformation inType, TypeInformation outType, + ExecutionConfig executionConfig) + { + super(PlanProjectOperator.createTypedProjector(fields), new UnaryOperatorInformation(inType, outType), name); + } + + @SuppressWarnings("unchecked") + private static MapFunction createTypedProjector(int[] fields) { + return (MapFunction) new MapProjector(fields); } - public static final class MapProjector - extends AbstractRichFunction - implements MapFunction + + public static final class MapProjector + extends AbstractRichFunction implements MapFunction { private static final long serialVersionUID = 1L; private final int[] fields; - private final R outTuple; + private final Tuple outTuple; - private MapProjector(int[] fields, R outTupleInstance) { + private MapProjector(int[] fields) { this.fields = fields; - this.outTuple = outTupleInstance; + try { + this.outTuple = Tuple.getTupleClass(fields.length).newInstance(); + } + catch (Exception e) { + // this should never happen + throw new RuntimeException(e); + } } // TODO We should use code generation for this. + @SuppressWarnings("unchecked") @Override - public R map(T inTuple) throws Exception { - - for(int i=0; i