From 6bbe2a05ab4937a5a989aa2ed7451c1126d9faaa Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 22 Aug 2014 18:17:48 +0200 Subject: [PATCH] Perform TypeExtraction outside of Java Operators Before, FlatMapOperator, GroupReduceOperator, MapOperator, and MapPartitionOperator performed the Type extraction themselves while the other Operators had TypeInformation parameters. Now the are all unified, which makes it possible to use them from the Scala API. Also Key extraction for selector functions is moved outside of Keys.java --- .../org/apache/flink/api/java/DataSet.java | 23 +++++++++++++------ .../api/java/operators/CoGroupOperator.java | 6 +++-- .../api/java/operators/FlatMapOperator.java | 6 ++--- .../java/operators/GroupReduceOperator.java | 9 ++++---- .../api/java/operators/JoinOperator.java | 6 +++-- .../apache/flink/api/java/operators/Keys.java | 11 ++++----- .../flink/api/java/operators/MapOperator.java | 6 ++--- .../java/operators/MapPartitionOperator.java | 6 ++--- .../api/java/operators/SortedGrouping.java | 6 ++++- .../api/java/operators/UnsortedGrouping.java | 6 ++++- 10 files changed, 52 insertions(+), 33 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index de86eee8bad..f960abdb40a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -66,6 +66,8 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.types.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; + /** @@ -145,7 +147,10 @@ public abstract class DataSet { if (FunctionUtils.isLambdaFunction(mapper)) { throw new UnsupportedLambdaExpressionException(); } - return new MapOperator(this, mapper); + + TypeInformation resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType()); + + return new MapOperator(this, resultType, mapper); } @@ -172,7 +177,8 @@ public abstract class DataSet { if (mapPartition == null) { throw new NullPointerException("MapPartition function must not be null."); } - return new MapPartitionOperator(this, mapPartition); + TypeInformation resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, this.getType()); + return new MapPartitionOperator(this, resultType, mapPartition); } /** @@ -194,7 +200,8 @@ public abstract class DataSet { if (FunctionUtils.isLambdaFunction(flatMapper)) { throw new UnsupportedLambdaExpressionException(); } - return new FlatMapOperator(this, flatMapper); + TypeInformation resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType()); + return new FlatMapOperator(this, resultType, flatMapper); } /** @@ -340,7 +347,8 @@ public abstract class DataSet { if (FunctionUtils.isLambdaFunction(reducer)) { throw new UnsupportedLambdaExpressionException(); } - return new GroupReduceOperator(this, reducer); + TypeInformation resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getType()); + return new GroupReduceOperator(this, resultType, reducer); } /** @@ -400,7 +408,8 @@ public abstract class DataSet { * @return A DistinctOperator that represents the distinct DataSet. */ public DistinctOperator distinct(KeySelector keyExtractor) { - return new DistinctOperator(this, new Keys.SelectorFunctionKeys(keyExtractor, getType())); + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type); + return new DistinctOperator(this, new Keys.SelectorFunctionKeys(keyExtractor, getType(), keyType)); } /** @@ -456,9 +465,9 @@ public abstract class DataSet { * @see org.apache.flink.api.java.operators.GroupReduceOperator * @see DataSet */ - public UnsortedGrouping groupBy(KeySelector keyExtractor) { - return new UnsortedGrouping(this, new Keys.SelectorFunctionKeys(keyExtractor, getType())); + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type); + return new UnsortedGrouping(this, new Keys.SelectorFunctionKeys(keyExtractor, getType(), keyType)); } /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index 899fc0936c2..91ed28d2914 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -367,7 +367,8 @@ public class CoGroupOperator extends TwoInputUdfOperator CoGroupOperatorSetsPredicate where(KeySelector keyExtractor) { - return new CoGroupOperatorSetsPredicate(new Keys.SelectorFunctionKeys(keyExtractor, input1.getType())); + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input1.getType()); + return new CoGroupOperatorSetsPredicate(new Keys.SelectorFunctionKeys(keyExtractor, input1.getType(), keyType)); } // ---------------------------------------------------------------------------------------- @@ -431,7 +432,8 @@ public class CoGroupOperator extends TwoInputUdfOperator CoGroupOperatorWithoutFunction equalTo(KeySelector keyExtractor) { - return createCoGroupOperator(new Keys.SelectorFunctionKeys(keyExtractor, input2.getType())); + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input2.getType()); + return createCoGroupOperator(new Keys.SelectorFunctionKeys(keyExtractor, input2.getType(), keyType)); } /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java index 8e531d4df69..4d6d30a3bd6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java @@ -22,9 +22,9 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.FlatMapOperatorBase; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.DataSet; +import org.apache.flink.types.TypeInformation; /** * This operator represents the application of a "flatMap" function on a data set, and the @@ -38,8 +38,8 @@ public class FlatMapOperator extends SingleInputUdfOperator function; - public FlatMapOperator(DataSet input, FlatMapFunction function) { - super(input, TypeExtractor.getFlatMapReturnTypes(function, input.getType())); + public FlatMapOperator(DataSet input, TypeInformation resultType, FlatMapFunction function) { + super(input, resultType); this.function = function; extractSemanticAnnotationsFromUdf(function.getClass()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index 7b0ad4dbc3b..9de8bd366b5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -32,7 +32,6 @@ import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.types.TypeInformation; import org.apache.flink.api.java.DataSet; @@ -58,8 +57,8 @@ public class GroupReduceOperator extends SingleInputUdfOperator input, GroupReduceFunction function) { - super(input, TypeExtractor.getGroupReduceReturnTypes(function, input.getType())); + public GroupReduceOperator(DataSet input, TypeInformation resultType, GroupReduceFunction function) { + super(input, resultType); this.function = function; this.grouper = null; @@ -73,8 +72,8 @@ public class GroupReduceOperator extends SingleInputUdfOperator input, GroupReduceFunction function) { - super(input != null ? input.getDataSet() : null, TypeExtractor.getGroupReduceReturnTypes(function, input.getDataSet().getType())); + public GroupReduceOperator(Grouping input, TypeInformation resultType, GroupReduceFunction function) { + super(input != null ? input.getDataSet() : null, resultType); this.function = function; this.grouper = input; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index 1ca2ec92368..15447d81514 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -757,7 +757,8 @@ public abstract class JoinOperator extends TwoInputUdfOperator JoinOperatorSetsPredicate where(KeySelector keySelector) { - return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys(keySelector, input1.getType())); + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); + return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys(keySelector, input1.getType(), keyType)); } // ---------------------------------------------------------------------------------------- @@ -829,7 +830,8 @@ public abstract class JoinOperator extends TwoInputUdfOperator DefaultJoin equalTo(KeySelector keySelector) { - return createJoinOperator(new Keys.SelectorFunctionKeys(keySelector, input2.getType())); + TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); + return createJoinOperator(new Keys.SelectorFunctionKeys(keySelector, input2.getType(), keyType)); } protected DefaultJoin createJoinOperator(Keys keys2) { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index 8019cc82122..9ac2b2b0bf5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -126,19 +126,18 @@ public abstract class Keys { private final KeySelector keyExtractor; private final TypeInformation keyType; - - public SelectorFunctionKeys(KeySelector keyExtractor, TypeInformation type) { + + public SelectorFunctionKeys(KeySelector keyExtractor, TypeInformation inputType, TypeInformation keyType) { if (keyExtractor == null) { throw new NullPointerException("Key extractor must not be null."); } - + this.keyExtractor = keyExtractor; - this.keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type); - + this.keyType = keyType; + if (!this.keyType.isKeyType()) { throw new IllegalArgumentException("Invalid type of KeySelector keys"); } - } public TypeInformation getKeyType() { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java index eccdeeca09f..b27201dadf8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java @@ -22,9 +22,9 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.DataSet; +import org.apache.flink.types.TypeInformation; /** * This operator represents the application of a "map" function on a data set, and the @@ -40,9 +40,9 @@ public class MapOperator extends SingleInputUdfOperator function; - public MapOperator(DataSet input, MapFunction function) { + public MapOperator(DataSet input, TypeInformation resultType, MapFunction function) { - super(input, TypeExtractor.getMapReturnTypes(function, input.getType())); + super(input, resultType); this.function = function; extractSemanticAnnotationsFromUdf(function.getClass()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java index caf55f93c7c..5b999bf96cd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.DataSet; +import org.apache.flink.types.TypeInformation; /** * This operator represents the application of a "mapPartition" function on a data set, and the @@ -39,8 +39,8 @@ public class MapPartitionOperator extends SingleInputUdfOperator function; - public MapPartitionOperator(DataSet input, MapPartitionFunction function) { - super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType())); + public MapPartitionOperator(DataSet input, TypeInformation resultType, MapPartitionFunction function) { + super(input, resultType); this.function = function; extractSemanticAnnotationsFromUdf(function.getClass()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index 767f75adf27..1647055efe3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -27,6 +27,9 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.apache.flink.types.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; + /** * SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.
@@ -85,7 +88,8 @@ public class SortedGrouping extends Grouping { if (FunctionUtils.isLambdaFunction(reducer)) { throw new UnsupportedLambdaExpressionException(); } - return new GroupReduceOperator(this, reducer); + TypeInformation resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType()); + return new GroupReduceOperator(this, resultType, reducer); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 702f14937cd..fdc86deaaac 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -29,6 +29,8 @@ import org.apache.flink.api.java.functions.SelectByMaxFunction; import org.apache.flink.api.java.functions.SelectByMinFunction; import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.types.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; public class UnsortedGrouping extends Grouping { @@ -133,7 +135,9 @@ public class UnsortedGrouping extends Grouping { if (FunctionUtils.isLambdaFunction(reducer)) { throw new UnsupportedLambdaExpressionException(); } - return new GroupReduceOperator(this, reducer); + TypeInformation resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType()); + + return new GroupReduceOperator(this, resultType, reducer); } /** -- GitLab