From 53f2c1c25b7866bb494602f9e68c2b237b2cf2c4 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Fri, 21 Jul 2017 13:47:13 -0400 Subject: [PATCH] [hotfix] [optimizer] Normalize job plan operator formatting When printing the job plan the operator description is typically formatted as the operator name followed by the user given or generated function name in parenthesis. For example, "Reduce (My Function)". This normalizes the node names to include a space between the operator and function names. This closes #4383 --- .../java/org/apache/flink/optimizer/dag/SolutionSetNode.java | 2 +- .../optimizer/operators/AbstractSortMergeJoinDescriptor.java | 2 +- .../flink/optimizer/operators/CartesianProductDescriptor.java | 2 +- .../optimizer/operators/GroupReduceWithCombineProperties.java | 2 +- .../operators/HashFullOuterJoinBuildFirstDescriptor.java | 2 +- .../operators/HashFullOuterJoinBuildSecondDescriptor.java | 2 +- .../flink/optimizer/operators/HashJoinBuildFirstProperties.java | 2 +- .../operators/HashLeftOuterJoinBuildFirstDescriptor.java | 2 +- .../operators/HashLeftOuterJoinBuildSecondDescriptor.java | 2 +- .../operators/HashRightOuterJoinBuildFirstDescriptor.java | 2 +- .../operators/HashRightOuterJoinBuildSecondDescriptor.java | 2 +- .../flink/optimizer/operators/PartialGroupProperties.java | 2 +- .../apache/flink/optimizer/plantranslate/JobGraphGenerator.java | 2 +- 13 files changed, 13 insertions(+), 13 deletions(-) diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java index 9b5399923ca..62cb4ca74f9 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java @@ -46,7 +46,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode { // -------------------------------------------------------------------------------------------- public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) { - this.cachedPlans = Collections.singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", gProps, lProps, initialInput)); + this.cachedPlans = Collections.singletonList(new SolutionSetPlanNode(this, "SolutionSet ("+this.getOperator().getName()+")", gProps, lProps, initialInput)); } public SolutionSetPlanNode getCurrentSolutionSetPlanNode() { diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java index d54b5cf7cda..4aced8bb478 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java @@ -67,7 +67,7 @@ public abstract class AbstractSortMergeJoinDescriptor extends AbstractJoinDescri inputOrders = tmp; } - String nodeName = String.format("%s(%s)", getNodeName(), node.getOperator().getName()); + String nodeName = String.format("%s (%s)", getNodeName(), node.getOperator().getName()); return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2, inputOrders); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java index f48e297407c..d50c9b49333 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java @@ -93,7 +93,7 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - return new DualInputPlanNode(node, "Cross("+node.getOperator().getName()+")", in1, in2, getStrategy()); + return new DualInputPlanNode(node, "Cross ("+node.getOperator().getName()+")", in1, in2, getStrategy()); } @Override diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java index accd11bc0bc..5521be12f6b 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java @@ -116,7 +116,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); combinerNode.setParallelism(in.getSource().getParallelism()); - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getOperator() .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java index 49852484b61..5ad0c7e6670 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java @@ -54,7 +54,7 @@ public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescripto @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "FullOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "FullOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java index d605a19179a..64681293489 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java @@ -53,7 +53,7 @@ public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescript @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "FullOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "FullOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java index fec72a95c6c..fc74b39b876 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java @@ -79,7 +79,7 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor { else { strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST; } - return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); + return new DualInputPlanNode(node, "Join ("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); } @Override diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java index ab4e1065911..f62a85d0894 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java @@ -54,7 +54,7 @@ public class HashLeftOuterJoinBuildFirstDescriptor extends AbstractJoinDescripto @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "LeftOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java index 8ed7969a871..85cf8f2ca36 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java @@ -57,7 +57,7 @@ public class HashLeftOuterJoinBuildSecondDescriptor extends AbstractJoinDescript @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "LeftOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java index 5ddba1ce99c..ca19902b4a4 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java @@ -57,7 +57,7 @@ public class HashRightOuterJoinBuildFirstDescriptor extends AbstractJoinDescript @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "RightOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "RightOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java index 7bb8f1e972e..7e476cd726b 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java @@ -54,7 +54,7 @@ public class HashRightOuterJoinBuildSecondDescriptor extends AbstractJoinDescrip @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "RightOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "RightOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java index e4e6a7f1d10..bb061718943 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java @@ -51,7 +51,7 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle { GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase) node.getOperator()); combinerNode.setParallelism(in.getSource().getParallelism()); - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in, + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_COMBINE); // sorting key info combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index e3dcfad11c1..248049ed260 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -1283,7 +1283,7 @@ public class JobGraphGenerator implements Visitor { headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel); // --------------------------- create the sync task --------------------------- - final JobVertex sync = new JobVertex("Sync(" + bulkNode.getNodeName() + ")"); + final JobVertex sync = new JobVertex("Sync (" + bulkNode.getNodeName() + ")"); sync.setResources(bulkNode.getMinResources(), bulkNode.getPreferredResources()); sync.setInvokableClass(IterationSynchronizationSinkTask.class); sync.setParallelism(1); -- GitLab