diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java index 14f8fb1b2dac6a766f9bd7ac199f705415d0e0a8..c3555603518a190a073f9944eb8ea6184f78e116 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java @@ -46,6 +46,7 @@ import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase; +import org.apache.flink.api.common.operators.base.PartitionOperatorBase; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder; import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder; @@ -69,6 +70,7 @@ import org.apache.flink.compiler.dag.MapPartitionNode; import org.apache.flink.compiler.dag.MatchNode; import org.apache.flink.compiler.dag.OptimizerNode; import org.apache.flink.compiler.dag.PactConnection; +import org.apache.flink.compiler.dag.PartitionNode; import org.apache.flink.compiler.dag.ReduceNode; import org.apache.flink.compiler.dag.SinkJoiner; import org.apache.flink.compiler.dag.SolutionSetNode; @@ -708,6 +710,9 @@ public class PactCompiler { else if (c instanceof Union){ n = new BinaryUnionNode((Union) c); } + else if (c instanceof PartitionOperatorBase) { + n = new PartitionNode((PartitionOperatorBase) c); + } else if (c instanceof PartialSolutionPlaceHolder) { if (this.parent == null) { throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations."); diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java index 19d232ff1cceaeddf18ee1a47aae3b540a29f484..99a9b124ee87ffdf2e9a32f3ddad14a67f816ae5 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java @@ -113,6 +113,9 @@ public abstract class CostEstimator { case BROADCAST: addBroadcastCost(channel, channel.getReplicationFactor(), costs); break; + case PARTITION_FORCED_REBALANCE: + addRandomPartitioningCost(channel, costs); + break; default: throw new CompilerException("Unknown shipping strategy for input: " + channel.getShipStrategy()); } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java new file mode 100644 index 0000000000000000000000000000000000000000..ccd48c5e68b972c362bad35c0293761129258e94 --- /dev/null +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.compiler.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.base.PartitionOperatorBase; +import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.compiler.DataStatistics; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.LocalProperties; +import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties; +import org.apache.flink.compiler.dataproperties.RequestedLocalProperties; +import org.apache.flink.compiler.operators.OperatorDescriptorSingle; +import org.apache.flink.compiler.plan.Channel; +import org.apache.flink.compiler.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +/** + * The optimizer's internal representation of a Partition operator node. + */ +public class PartitionNode extends SingleInputNode { + + public PartitionNode(PartitionOperatorBase operator) { + super(operator); + } + + @Override + public PartitionOperatorBase getPactContract() { + return (PartitionOperatorBase) super.getPactContract(); + } + + @Override + public String getName() { + return "Partition"; + } + + @Override + protected List getPossibleProperties() { + return Collections.singletonList(new PartitionDescriptor(this.getPactContract().getPartitionMethod(), this.keys)); + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // partitioning does not change the number of records + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + } + + @Override + public boolean isFieldConstant(int input, int fieldNumber) { + // Partition does not change any data + return true; + } + + + public static class PartitionDescriptor extends OperatorDescriptorSingle { + + private final PartitionMethod pMethod; + private final FieldSet pKeys; + + public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys) { + this.pMethod = pMethod; + this.pKeys = pKeys; + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.UNARY_NO_OP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "Partition", in, DriverStrategy.UNARY_NO_OP); + } + + @Override + protected List createPossibleGlobalProperties() { + RequestedGlobalProperties rgps = new RequestedGlobalProperties(); + + switch (this.pMethod) { + case HASH: + rgps.setHashPartitioned(pKeys.toFieldList()); + break; + case REBALANCE: + rgps.setForceRebalancing(); + break; + case RANGE: + throw new UnsupportedOperationException("Not yet supported"); + default: + throw new IllegalArgumentException("Invalid partition method"); + } + + return Collections.singletonList(rgps); + } + + @Override + protected List createPossibleLocalProperties() { + // partitioning does not require any local property. + return Collections.singletonList(new RequestedLocalProperties()); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + // the partition node is a no-operation operation, such that all global properties are preserved. + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + // the partition node is a no-operation operation, such that all global properties are preserved. + return lProps; + } + } + +} diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java index 1641d9d767bd989d0b6ef96c4aac91e915f96a8f..f3d9c2d5fecb488a9ff91516757bd377f069a4d1 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java @@ -97,6 +97,12 @@ public class GlobalProperties implements Cloneable { this.ordering = null; } + public void setForcedRebalanced() { + this.partitioning = PartitioningProperty.FORCED_REBALANCED; + this.partitioningFields = null; + this.ordering = null; + } + public void addUniqueFieldCombination(FieldSet fields) { if (this.uniqueFieldCombinations == null) { this.uniqueFieldCombinations = new HashSet
(); diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java index 62002a268d555a9f8ba47acb4095fa6007ed56ca..f73f4917eba28c4ea98232bfa7260d340a465815 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java @@ -47,8 +47,13 @@ public enum PartitioningProperty { /** * Constant indicating full replication of the data to each parallel instance. */ - FULL_REPLICATION; + FULL_REPLICATION, + /** + * Constant indicating a forced even rebalancing. + */ + FORCED_REBALANCED; + /** * Checks, if this property represents in fact a partitioning. That is, * whether this property is not equal to PartitionProperty.FULL_REPLICATION. @@ -57,7 +62,7 @@ public enum PartitioningProperty { * false otherwise. */ public boolean isPartitioned() { - return this != FULL_REPLICATION; + return this != FULL_REPLICATION && this != FORCED_REBALANCED; } /** diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java index 721bc8408d859c0c1cf6700adb554e6bbe7dc5c5..dcf0afaa41480b912e21d6c6f9a7474e331b3960 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java @@ -106,6 +106,12 @@ public final class RequestedGlobalProperties implements Cloneable { this.ordering = null; } + public void setForceRebalancing() { + this.partitioning = PartitioningProperty.FORCED_REBALANCED; + this.partitioningFields = null; + this.ordering = null; + } + /** * Gets the partitioning property. * @@ -211,6 +217,8 @@ public final class RequestedGlobalProperties implements Cloneable { } else if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) { return props.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && props.matchesOrderedPartitioning(this.ordering); + } else if (this.partitioning == PartitioningProperty.FORCED_REBALANCED) { + return props.getPartitioning() == PartitioningProperty.FORCED_REBALANCED; } else { throw new CompilerException("Bug in properties matching logic."); } @@ -253,6 +261,9 @@ public final class RequestedGlobalProperties implements Cloneable { channel.setDataDistribution(this.dataDistribution); } break; + case FORCED_REBALANCED: + channel.setShipStrategy(ShipStrategyType.PARTITION_FORCED_REBALANCE); + break; default: throw new CompilerException(); } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java index bce09b804b9d7978aca1fa78974ed3810206d892..45daceb7e3ab2805f0c8d60934768889f074a590 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java @@ -67,13 +67,37 @@ public abstract class OperatorDescriptorSingle implements AbstractOperatorDescri return this.localProps; } + /** + * Returns a list of global properties that are required by this operator descriptor. + * + * @return A list of global properties that are required by this operator descriptor. + */ protected abstract List createPossibleGlobalProperties(); + /** + * Returns a list of local properties that are required by this operator descriptor. + * + * @return A list of local properties that are required by this operator descriptor. + */ protected abstract List createPossibleLocalProperties(); public abstract SingleInputPlanNode instantiate(Channel in, SingleInputNode node); + /** + * Returns the global properties which are present after the operator was applied on the + * provided global properties. + * + * @param in The global properties on which the operator is applied. + * @return The global properties which are valid after the operator has been applied. + */ public abstract GlobalProperties computeGlobalProperties(GlobalProperties in); + /** + * Returns the local properties which are present after the operator was applied on the + * provided local properties. + * + * @param in The local properties on which the operator is applied. + * @return The local properties which are valid after the operator has been applied. + */ public abstract LocalProperties computeLocalProperties(LocalProperties in); } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java index cb67c047702c4733139f5a5f76f3a8f3373ad1a3..fec9c807e6d32c10197a6fe105f32bcb9daf0a78 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java @@ -378,6 +378,9 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection< case PARTITION_RANDOM: this.globalProps.reset(); break; + case PARTITION_FORCED_REBALANCE: + this.globalProps.setForcedRebalanced(); + break; case NONE: throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set."); } @@ -410,6 +413,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection< case PARTITION_HASH: case PARTITION_RANGE: case PARTITION_RANDOM: + case PARTITION_FORCED_REBALANCE: this.localProps = new LocalProperties(); break; case FORWARD: @@ -417,6 +421,8 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection< break; case NONE: throw new CompilerException("ShipStrategy has not yet been set."); + default: + throw new CompilerException("Unknown ShipStrategy."); } } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java index 106384941bd6375995ef013bf7634f5c62e2287e..00e2bc21e9dccaccd92131ff7370605341ab9452 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java @@ -327,6 +327,9 @@ public class PlanJSONDumpGenerator { case PARTITION_RANDOM: shipStrategy = "Redistribute"; break; + case PARTITION_FORCED_REBALANCE: + shipStrategy = "Rebalance"; + break; default: throw new CompilerException("Unknown ship strategy '" + conn.getShipStrategy().name() + "' in JSON generator."); diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java index 5dcbba9f2b7bf62c002069c9dd493a1ac74486a9..78ec67161421a045692f1e1b9a0b015d6630614c 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java @@ -1023,6 +1023,7 @@ public class NepheleJobGraphGenerator implements Visitor { case BROADCAST: case PARTITION_HASH: case PARTITION_RANGE: + case PARTITION_FORCED_REBALANCE: distributionPattern = DistributionPattern.BIPARTITE; break; default: diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java index d922a01acedb87c4c5edbd77125ae617fb6173d8..f5558ef3b572ef73487707e9f0662595df04ac1d 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java @@ -18,6 +18,7 @@ package org.apache.flink.compiler.util; +import org.apache.flink.api.common.functions.util.NoOpFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputOperator; import org.apache.flink.api.common.operators.RecordOperator; diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java index 3f051affc6e35dd8ba7520ec97c9ba0984b0054c..bafa249d40ceab235a63b713fd5112918585f386 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java @@ -18,6 +18,7 @@ package org.apache.flink.compiler.util; +import org.apache.flink.api.common.functions.util.NoOpFunction; import org.apache.flink.api.common.operators.RecordOperator; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java similarity index 94% rename from flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java rename to flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java index 39d2a0578a174aebfdd652c0720f082030229d7a..1cc3ebde595d40e9df9df0e9f6ab94f0ebadb17c 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/NoOpFunction.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.compiler.util; +package org.apache.flink.api.common.functions.util; import org.apache.flink.api.common.functions.AbstractRichFunction; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java new file mode 100644 index 0000000000000000000000000000000000000000..1f17db09091abb4b785ae13e3a6e4d7643055634 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import org.apache.flink.api.common.functions.util.NoOpFunction; +import org.apache.flink.api.common.operators.SingleInputOperator; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; + +/** + * + * @param The input and result type. + */ +public class PartitionOperatorBase extends SingleInputOperator { + + private final PartitionMethod partitionMethod; + + public PartitionOperatorBase(UnaryOperatorInformation operatorInfo, PartitionMethod pMethod, int[] keys, String name) { + super(new UserCodeObjectWrapper(new NoOpFunction()), operatorInfo, keys, name); + this.partitionMethod = pMethod; + } + + public PartitionOperatorBase(UnaryOperatorInformation operatorInfo, PartitionMethod pMethod, String name) { + super(new UserCodeObjectWrapper(new NoOpFunction()), operatorInfo, name); + this.partitionMethod = pMethod; + } + + public PartitionMethod getPartitionMethod() { + return this.partitionMethod; + } + + public static enum PartitionMethod { + REBALANCE, + HASH, + RANGE; + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 122abf39bf8c66f01dba7f4d19cac7e227247066..ff487a269c705ce7e83f9ab0906741f2d31570bc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.functions.FormattingMapper; import org.apache.flink.api.java.functions.KeySelector; @@ -56,6 +57,7 @@ import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.operators.MapPartitionOperator; +import org.apache.flink.api.java.operators.PartitionedDataSet; import org.apache.flink.api.java.operators.ProjectOperator.Projection; import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.ReduceOperator; @@ -845,6 +847,49 @@ public abstract class DataSet { return new UnionOperator(this, other); } + // -------------------------------------------------------------------------------------------- + // Partitioning + // -------------------------------------------------------------------------------------------- + + /** + * Hash-partitions a DataSet on the specified key fields. + *

+ * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. + * + * @param fields The field indexes on which the DataSet is hash-partitioned. + * @return The partitioned DataSet. + */ + public PartitionedDataSet partitionByHash(int... fields) { + return new PartitionedDataSet(this, PartitionMethod.HASH, new Keys.FieldPositionKeys(fields, getType(), false)); + } + + /** + * Partitions a DataSet using the specified KeySelector. + *

+ * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. + * + * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned. + * @return The partitioned DataSet. + * + * @see KeySelector + */ + public > PartitionedDataSet partitionByHash(KeySelector keyExtractor) { + final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type); + return new PartitionedDataSet(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys(keyExtractor, this.getType(), keyType)); + } + + /** + * Enforces a rebalancing of the DataSet, i.e., the DataSet is evenly distributed over all parallel instances of the + * following task. This can help to improve performance in case of heavy data skew and compute intensive operations. + *

+ * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. + * + * @return The rebalanced DataSet. + */ + public PartitionedDataSet rebalance() { + return new PartitionedDataSet(this, PartitionMethod.REBALANCE); + } + // -------------------------------------------------------------------------------------------- // Top-K // -------------------------------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java index e9394381d7ee396a101e718d7dbcd8da4b4b5b8c..727820aa1a9ea95237a8578d17a527bac26b2074 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.java.operators.translation.PlanFilterOperator; - import org.apache.flink.api.java.DataSet; /** @@ -35,7 +34,8 @@ public class FilterOperator extends SingleInputUdfOperator function; - + protected PartitionedDataSet partitionedDataSet; + public FilterOperator(DataSet input, FilterFunction function) { super(input, input.getType()); @@ -43,9 +43,19 @@ public class FilterOperator extends SingleInputUdfOperator input, FilterFunction function) { + this(input.getDataSet(), function); + this.partitionedDataSet = input; + } + @Override protected org.apache.flink.api.common.operators.base.FilterOperatorBase> translateToDataFlow(Operator input) { + // inject partition operator if necessary + if(this.partitionedDataSet != null) { + input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism()); + } + String name = getName() != null ? getName() : function.getClass().getName(); // create operator PlanFilterOperator po = new PlanFilterOperator(function, name, getInputType()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java index ff9076b2caf8ca2bd4f971e2759d5a6990652792..0dc401ec3c78f62028f22ddd0f55e6e846c89d12 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java @@ -36,6 +36,7 @@ public class FlatMapOperator extends SingleInputUdfOperator function; + protected PartitionedDataSet partitionedDataSet; public FlatMapOperator(DataSet input, TypeInformation resultType, FlatMapFunction function) { super(input, resultType); @@ -43,10 +44,20 @@ public class FlatMapOperator extends SingleInputUdfOperator input, TypeInformation resultType, FlatMapFunction function) { + this(input.getDataSet(), resultType, function); + this.partitionedDataSet = input; + } @Override protected org.apache.flink.api.common.operators.base.FlatMapOperatorBase> translateToDataFlow(Operator input) { + // inject partition operator if necessary + if(this.partitionedDataSet != null) { + input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism()); + } + String name = getName() != null ? getName() : function.getClass().getName(); // create operator FlatMapOperatorBase> po = new FlatMapOperatorBase>(function, new UnaryOperatorInformation(getInputType(), getResultType()), name); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java index 2db0b3b8e8fa5af1ae7b053a2ac6b7eb108d19aa..e45cc96afaa8614b778ef560a61793751b07d5ed 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java @@ -38,6 +38,8 @@ public class MapOperator extends SingleInputUdfOperator function; + protected PartitionedDataSet partitionedDataSet; + public MapOperator(DataSet input, TypeInformation resultType, MapFunction function) { @@ -46,10 +48,20 @@ public class MapOperator extends SingleInputUdfOperator input, TypeInformation resultType, MapFunction function) { + this(input.getDataSet(), resultType, function); + this.partitionedDataSet = input; + } @Override protected org.apache.flink.api.common.operators.base.MapOperatorBase> translateToDataFlow(Operator input) { + // inject partition operator if necessary + if(this.partitionedDataSet != null) { + input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism()); + } + String name = getName() != null ? getName() : function.getClass().getName(); // create operator MapOperatorBase> po = new MapOperatorBase>(function, new UnaryOperatorInformation(getInputType(), getResultType()), name); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java index fc0503631199c01fab0cb79a61c00d0ab5adf0fb..9c896f9644382cbbe61da9fef6a010f0db5a7f55 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java @@ -38,6 +38,7 @@ public class MapPartitionOperator extends SingleInputUdfOperator function; + protected PartitionedDataSet partitionedDataSet; public MapPartitionOperator(DataSet input, TypeInformation resultType, MapPartitionFunction function) { super(input, resultType); @@ -45,10 +46,20 @@ public class MapPartitionOperator extends SingleInputUdfOperator input, TypeInformation resultType, MapPartitionFunction function) { + this(input.getDataSet(), resultType, function); + this.partitionedDataSet = input; + } @Override protected MapPartitionOperatorBase> translateToDataFlow(Operator input) { + // inject partition operator if necessary + if(this.partitionedDataSet != null) { + input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism()); + } + String name = getName() != null ? getName() : function.getClass().getName(); // create operator MapPartitionOperatorBase> po = new MapPartitionOperatorBase>(function, new UnaryOperatorInformation(getInputType(), getResultType()), name); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java index 7030be59ddb8a378fb8d283ac0339b2b9b1f5e6c..eb17134717d75a59b452608422bf7b2c093f0518 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java @@ -67,6 +67,7 @@ public class OperatorTranslation { private Operator translate(DataSet dataSet) { + // check if we have already translated that data set (operation or source) Operator previous = (Operator) this.translated.get(dataSet); if (previous != null) { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java new file mode 100644 index 0000000000000000000000000000000000000000..0ab984df03ff31a713d6f3949348d1b1ff7b88d2 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operators; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.common.operators.base.PartitionOperatorBase; +import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; +import org.apache.flink.api.java.operators.translation.KeyRemovingMapper; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +public class PartitionedDataSet { + + private final DataSet dataSet; + + private final Keys pKeys; + private final PartitionMethod pMethod; + + public PartitionedDataSet(DataSet input, PartitionMethod pMethod, Keys pKeys) { + this.dataSet = input; + + if(pMethod == PartitionMethod.HASH && pKeys == null) { + throw new IllegalArgumentException("Hash Partitioning requires keys"); + } else if(pMethod == PartitionMethod.RANGE) { + throw new UnsupportedOperationException("Range Partitioning not yet supported"); + } + + if(pKeys instanceof Keys.FieldPositionKeys && !input.getType().isTupleType()) { + throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Tuple DataSets"); + } + + this.pMethod = pMethod; + this.pKeys = pKeys; + } + + public PartitionedDataSet(DataSet input, PartitionMethod pMethod) { + this(input, pMethod, null); + } + + public DataSet getDataSet() { + return this.dataSet; + } + + + /** + * Applies a Map transformation on a {@link DataSet}.
+ * The transformation calls a {@link org.apache.flink.api.java.functions.RichMapFunction} for each element of the DataSet. + * Each MapFunction call returns exactly one element. + * + * @param mapper The MapFunction that is called for each element of the DataSet. + * @return A MapOperator that represents the transformed DataSet. + * + * @see org.apache.flink.api.java.functions.RichMapFunction + * @see MapOperator + * @see DataSet + */ + public MapOperator map(MapFunction mapper) { + if (mapper == null) { + throw new NullPointerException("Map function must not be null."); + } + if (FunctionUtils.isLambdaFunction(mapper)) { + throw new UnsupportedLambdaExpressionException(); + } + + final TypeInformation resultType = TypeExtractor.getMapReturnTypes(mapper, dataSet.getType()); + + return new MapOperator(this, resultType, mapper); + } + + /** + * Applies a Map-style operation to the entire partition of the data. + * The function is called once per parallel partition of the data, + * and the entire partition is available through the given Iterator. + * The number of elements that each instance of the MapPartition function + * sees is non deterministic and depends on the degree of parallelism of the operation. + * + * This function is intended for operations that cannot transform individual elements, + * requires no grouping of elements. To transform individual elements, + * the use of {@code map()} and {@code flatMap()} is preferable. + * + * @param mapPartition The MapPartitionFunction that is called for the full DataSet. + * @return A MapPartitionOperator that represents the transformed DataSet. + * + * @see MapPartitionFunction + * @see MapPartitionOperator + * @see DataSet + */ + public MapPartitionOperator mapPartition(MapPartitionFunction mapPartition ){ + if (mapPartition == null) { + throw new NullPointerException("MapPartition function must not be null."); + } + + final TypeInformation resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, dataSet.getType()); + + return new MapPartitionOperator(this, resultType, mapPartition); + } + + /** + * Applies a FlatMap transformation on a {@link DataSet}.
+ * The transformation calls a {@link org.apache.flink.api.java.functions.RichFlatMapFunction} for each element of the DataSet. + * Each FlatMapFunction call can return any number of elements including none. + * + * @param flatMapper The FlatMapFunction that is called for each element of the DataSet. + * @return A FlatMapOperator that represents the transformed DataSet. + * + * @see org.apache.flink.api.java.functions.RichFlatMapFunction + * @see FlatMapOperator + * @see DataSet + */ + public FlatMapOperator flatMap(FlatMapFunction flatMapper) { + if (flatMapper == null) { + throw new NullPointerException("FlatMap function must not be null."); + } + if (FunctionUtils.isLambdaFunction(flatMapper)) { + throw new UnsupportedLambdaExpressionException(); + } + + TypeInformation resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, dataSet.getType()); + + return new FlatMapOperator(this, resultType, flatMapper); + } + + /** + * Applies a Filter transformation on a {@link DataSet}.
+ * The transformation calls a {@link org.apache.flink.api.java.functions.RichFilterFunction} for each element of the DataSet + * and retains only those element for which the function returns true. Elements for + * which the function returns false are filtered. + * + * @param filter The FilterFunction that is called for each element of the DataSet. + * @return A FilterOperator that represents the filtered DataSet. + * + * @see org.apache.flink.api.java.functions.RichFilterFunction + * @see FilterOperator + * @see DataSet + */ + public FilterOperator filter(FilterFunction filter) { + if (filter == null) { + throw new NullPointerException("Filter function must not be null."); + } + return new FilterOperator(this, filter); + } + + + /* + * Translation of partitioning + */ + + protected org.apache.flink.api.common.operators.SingleInputOperator translateToDataFlow(Operator input, int partitionDop) { + + String name = "Partition"; + + // distinguish between partition types + if (pMethod == PartitionMethod.REBALANCE) { + + UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(dataSet.getType(), dataSet.getType()); + PartitionOperatorBase noop = new PartitionOperatorBase(operatorInfo, pMethod, name); + // set input + noop.setInput(input); + // set DOP + noop.setDegreeOfParallelism(partitionDop); + + return noop; + } + else if (pMethod == PartitionMethod.HASH) { + + if (pKeys instanceof Keys.FieldPositionKeys) { + + int[] logicalKeyPositions = pKeys.computeLogicalKeyPositions(); + UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(dataSet.getType(), dataSet.getType()); + PartitionOperatorBase noop = new PartitionOperatorBase(operatorInfo, pMethod, logicalKeyPositions, name); + // set input + noop.setInput(input); + // set DOP + noop.setDegreeOfParallelism(partitionDop); + + return noop; + } else if (pKeys instanceof Keys.SelectorFunctionKeys) { + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys = (Keys.SelectorFunctionKeys) pKeys; + MapOperatorBase po = translateSelectorFunctionReducer(selectorKeys, pMethod, dataSet.getType(), name, input, partitionDop); + return po; + } + else { + throw new UnsupportedOperationException("Unrecognized key type."); + } + + } + else if (pMethod == PartitionMethod.RANGE) { + throw new UnsupportedOperationException("Range partitioning not yet supported"); + } + + return null; + } + + // -------------------------------------------------------------------------------------------- + + private static MapOperatorBase, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys rawKeys, + PartitionMethod pMethod, TypeInformation inputType, String name, Operator input, int partitionDop) + { + @SuppressWarnings("unchecked") + final Keys.SelectorFunctionKeys keys = (Keys.SelectorFunctionKeys) rawKeys; + + TypeInformation> typeInfoWithKey = new TupleTypeInfo>(keys.getKeyType(), inputType); + UnaryOperatorInformation, Tuple2> operatorInfo = new UnaryOperatorInformation, Tuple2>(typeInfoWithKey, typeInfoWithKey); + + KeyExtractingMapper extractor = new KeyExtractingMapper(keys.getKeyExtractor()); + + MapOperatorBase, MapFunction>> keyExtractingMap = new MapOperatorBase, MapFunction>>(extractor, new UnaryOperatorInformation>(inputType, typeInfoWithKey), "Key Extractor"); + PartitionOperatorBase> noop = new PartitionOperatorBase>(operatorInfo, pMethod, new int[]{0}, name); + MapOperatorBase, T, MapFunction, T>> keyRemovingMap = new MapOperatorBase, T, MapFunction, T>>(new KeyRemovingMapper(), new UnaryOperatorInformation, T>(typeInfoWithKey, inputType), "Key Extractor"); + + keyExtractingMap.setInput(input); + noop.setInput(keyExtractingMap); + keyRemovingMap.setInput(noop); + + // set dop + keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism()); + noop.setDegreeOfParallelism(partitionDop); + keyRemovingMap.setDegreeOfParallelism(partitionDop); + + return keyRemovingMap; + } + + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java index 202a95197df66bdecb21e4ff1ca6baee5b8d675f..4f297b001f650c988e978ba98bb9c7ced07f045f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java @@ -86,6 +86,7 @@ public class OutputEmitter implements ChannelSelector implements ChannelSelector getConfigurations() throws FileNotFoundException, IOException { + + LinkedList tConfigs = new LinkedList(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class PartitionProgs { + + public static String runProgram(int progId, String resultPath) throws Exception { + + switch(progId) { + case 1: { + /* + * Test hash partition by key field + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet uniqLongs = ds + .partitionByHash(1) + .mapPartition(new UniqueLongMapper()); + uniqLongs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "1\n" + + "2\n" + + "3\n" + + "4\n" + + "5\n" + + "6\n"; + } + case 2: { + /* + * Test hash partition by key selector + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet uniqLongs = ds + .partitionByHash(new KeySelector, Long>() { + private static final long serialVersionUID = 1L; + + @Override + public Long getKey(Tuple3 value) throws Exception { + return value.f1; + } + + }) + .mapPartition(new UniqueLongMapper()); + uniqLongs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "1\n" + + "2\n" + + "3\n" + + "4\n" + + "5\n" + + "6\n"; + } + case 3: { + /* + * Test forced rebalancing + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // generate some number in parallel + DataSet ds = env.generateSequence(1,3000); + DataSet> uniqLongs = ds + // introduce some partition skew by filtering + .filter(new FilterFunction() { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Long value) throws Exception { + if (value <= 780) { + return false; + } else { + return true; + } + } + }) + // rebalance + .rebalance() + // count values in each partition + .map(new PartitionIndexMapper()) + .groupBy(0) + .reduce(new ReduceFunction>() { + private static final long serialVersionUID = 1L; + + public Tuple2 reduce(Tuple2 v1, Tuple2 v2) { + return new Tuple2(v1.f0, v1.f1+v2.f1); + } + }) + // round counts to mitigate runtime scheduling effects (lazy split assignment) + .map(new MapFunction, Tuple2>(){ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2 map(Tuple2 value) throws Exception { + value.f1 = (value.f1 / 10); + return value; + } + + }); + + uniqLongs.writeAsText(resultPath); + + env.execute(); + + // return expected result + return "(0,55)\n" + + "(1,55)\n" + + "(2,55)\n" + + "(3,55)\n"; + } + case 4: { + /* + * Test hash partition by key field and different DOP + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(3); + + DataSet> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet uniqLongs = ds + .partitionByHash(1) + .mapPartition(new UniqueLongMapper()).setParallelism(4); + uniqLongs.writeAsText(resultPath); + + env.execute(); + + // return expected result + return "1\n" + + "2\n" + + "3\n" + + "4\n" + + "5\n" + + "6\n"; + } + + default: + throw new IllegalArgumentException("Invalid program id"); + } + + } + + } + + public static class UniqueLongMapper implements MapPartitionFunction, Long> { + private static final long serialVersionUID = 1L; + + @Override + public void mapPartition(Iterable> records, Collector out) throws Exception { + HashSet uniq = new HashSet(); + for(Tuple3 t : records) { + uniq.add(t.f1); + } + for(Long l : uniq) { + out.collect(l); + } + } + } + + public static class PartitionIndexMapper extends RichMapFunction> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2 map(Long value) throws Exception { + return new Tuple2(this.getRuntimeContext().getIndexOfThisSubtask(), 1); + } + + } + +}