diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java index 9b5399923cac836f0311991f0acaa5d08b4e9978..62cb4ca74f9e65dc5082a127e536d3a4004a5313 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java @@ -46,7 +46,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode { // -------------------------------------------------------------------------------------------- public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) { - this.cachedPlans = Collections.singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", gProps, lProps, initialInput)); + this.cachedPlans = Collections.singletonList(new SolutionSetPlanNode(this, "SolutionSet ("+this.getOperator().getName()+")", gProps, lProps, initialInput)); } public SolutionSetPlanNode getCurrentSolutionSetPlanNode() { diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java index d54b5cf7cda3289a8fbb898f7defaaa7f656cab0..4aced8bb4784d165a14752f90ede7c5d3d44d034 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java @@ -67,7 +67,7 @@ public abstract class AbstractSortMergeJoinDescriptor extends AbstractJoinDescri inputOrders = tmp; } - String nodeName = String.format("%s(%s)", getNodeName(), node.getOperator().getName()); + String nodeName = String.format("%s (%s)", getNodeName(), node.getOperator().getName()); return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2, inputOrders); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java index f48e297407c4a88d535be4221ee09198f61daef8..d50c9b493337d675180e47a5b559e4b3750a0455 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java @@ -93,7 +93,7 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - return new DualInputPlanNode(node, "Cross("+node.getOperator().getName()+")", in1, in2, getStrategy()); + return new DualInputPlanNode(node, "Cross ("+node.getOperator().getName()+")", in1, in2, getStrategy()); } @Override diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java index accd11bc0bc27426ba4b84fff7bd0c74d9fbb53b..5521be12f6b74a470b782a9d51d8cadd25451682 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java @@ -116,7 +116,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); combinerNode.setParallelism(in.getSource().getParallelism()); - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getOperator() .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java index 49852484b617a9ffe420ba1bd28c3a4b146a9d34..5ad0c7e66701ee9a7aee6591cbb25deae707853a 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java @@ -54,7 +54,7 @@ public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescripto @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "FullOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "FullOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java index d605a19179af48aa04c7d8906963b80b7548e6bd..64681293489ec3043eb3e7b80d19f0d36c3ce8a7 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java @@ -53,7 +53,7 @@ public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescript @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "FullOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "FullOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java index fec72a95c6c35c85e9da0c05cfe5668bebac82ca..fc74b39b876174abeed577d9463a1b14ee560804 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java @@ -79,7 +79,7 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor { else { strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST; } - return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); + return new DualInputPlanNode(node, "Join ("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); } @Override diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java index ab4e1065911833abd0008ed692536dd7e624daf6..f62a85d08946374e324e9f934a9dff7fae764c33 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java @@ -54,7 +54,7 @@ public class HashLeftOuterJoinBuildFirstDescriptor extends AbstractJoinDescripto @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "LeftOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java index 8ed7969a871965840c6664d4e342513d01192c9c..85cf8f2ca36cab5d204a3e2844c8521bdb4b0261 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java @@ -57,7 +57,7 @@ public class HashLeftOuterJoinBuildSecondDescriptor extends AbstractJoinDescript @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "LeftOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java index 5ddba1ce99c763187614f21d26717f913ca341e5..ca19902b4a41be0b864c0b484fdf9536f2d21b47 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java @@ -57,7 +57,7 @@ public class HashRightOuterJoinBuildFirstDescriptor extends AbstractJoinDescript @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "RightOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "RightOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java index 7bb8f1e972e8c447c25a98d7e4959b5a04a72430..7e476cd726b9fe4797815c1c684be62efe737544 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java @@ -54,7 +54,7 @@ public class HashRightOuterJoinBuildSecondDescriptor extends AbstractJoinDescrip @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - String nodeName = "RightOuterJoin("+node.getOperator().getName()+")"; + String nodeName = "RightOuterJoin ("+node.getOperator().getName()+")"; return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java index e4e6a7f1d10bc19898191df50b906217e3ab10cb..bb061718943a3fe8644f84e677a74c33040e6d89 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java @@ -51,7 +51,7 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle { GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase) node.getOperator()); combinerNode.setParallelism(in.getSource().getParallelism()); - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in, + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_COMBINE); // sorting key info combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index e3dcfad11c1fec10417855d7f9b849ef8b85f13d..248049ed26021d88f7c6ec521ff489c1ea094988 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -1283,7 +1283,7 @@ public class JobGraphGenerator implements Visitor { headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel); // --------------------------- create the sync task --------------------------- - final JobVertex sync = new JobVertex("Sync(" + bulkNode.getNodeName() + ")"); + final JobVertex sync = new JobVertex("Sync (" + bulkNode.getNodeName() + ")"); sync.setResources(bulkNode.getMinResources(), bulkNode.getPreferredResources()); sync.setInvokableClass(IterationSynchronizationSinkTask.class); sync.setParallelism(1);