From dc0b143bb85bd674ea8fe186d1d2bb72d3bd70a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E4=B8=89?= Date: Mon, 28 Dec 2020 15:05:04 +0800 Subject: [PATCH] [FLINK-20782][table-planner-blink] Introduce BatchPhysicalRank, and make BatchExecRank only extended from ExecNode This closes #14506 --- .../plan/nodes/exec/batch/BatchExecRank.java | 121 ++++++++++++++++++ ...ExecRank.scala => BatchPhysicalRank.scala} | 74 ++--------- .../plan/rules/FlinkBatchRuleSets.scala | 2 +- ...Rule.scala => BatchPhysicalRankRule.scala} | 22 ++-- .../batch/RemoveRedundantLocalRankRule.scala | 16 +-- .../FlinkRelMdColumnIntervalTest.scala | 5 +- .../FlinkRelMdDistinctRowCountTest.scala | 4 +- .../metadata/FlinkRelMdHandlerTestBase.scala | 8 +- .../FlinkRelMdPopulationSizeTest.scala | 4 +- .../metadata/FlinkRelMdSelectivityTest.scala | 4 +- 10 files changed, 168 insertions(+), 92 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/{BatchExecRank.scala => BatchPhysicalRank.scala} (78%) rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/{BatchExecRankRule.scala => BatchPhysicalRankRule.scala} (90%) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java new file mode 100644 index 00000000000..4da5998770c --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.runtime.operators.sort.RankOperator; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.stream.IntStream; + +/** + * {@link BatchExecNode} for Rank. + * + *

This node supports two-stage(local and global) rank to reduce data-shuffling. + */ +public class BatchExecRank extends ExecNodeBase implements BatchExecNode { + + private final int[] partitionFields; + private final int[] sortFields; + private final long rankStart; + private final long rankEnd; + private final boolean outputRankNumber; + + public BatchExecRank( + int[] partitionFields, + int[] sortFields, + long rankStart, + long rankEnd, + boolean outputRankNumber, + ExecEdge inputEdge, + RowType outputType, + String description) { + super(Collections.singletonList(inputEdge), outputType, description); + this.partitionFields = partitionFields; + this.sortFields = sortFields; + this.rankStart = rankStart; + this.rankEnd = rankEnd; + this.outputRankNumber = outputRankNumber; + } + + @SuppressWarnings("unchecked") + @Override + protected Transformation translateToPlanInternal(PlannerBase planner) { + ExecNode inputNode = (ExecNode) getInputNodes().get(0); + Transformation inputTransform = inputNode.translateToPlan(planner); + + RowType inputType = (RowType) inputNode.getOutputType(); + LogicalType[] partitionTypes = + IntStream.of(partitionFields) + .mapToObj(inputType::getTypeAt) + .toArray(LogicalType[]::new); + LogicalType[] sortTypes = + IntStream.of(sortFields).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new); + + // operator needn't cache data + // The collation for the partition-by and order-by fields is inessential here, + // we only use the comparator to distinguish fields change. + RankOperator operator = + new RankOperator( + ComparatorCodeGenerator.gen( + planner.getTableConfig(), + "PartitionByComparator", + partitionFields, + partitionTypes, + new boolean[partitionFields.length], + new boolean[partitionFields.length]), + ComparatorCodeGenerator.gen( + planner.getTableConfig(), + "OrderByComparator", + sortFields, + sortTypes, + new boolean[sortFields.length], + new boolean[sortFields.length]), + rankStart, + rankEnd, + outputRankNumber); + + OneInputTransformation transform = + new OneInputTransformation<>( + inputTransform, + getDesc(), + SimpleOperatorFactory.of(operator), + InternalTypeInfo.of((RowType) getOutputType()), + inputTransform.getParallelism()); + + if (inputsContainSingleton()) { + transform.setParallelism(1); + transform.setMaxParallelism(1); + } + + return transform; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala similarity index 78% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala index afc811ef858..881be79fca6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala @@ -18,23 +18,16 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch -import org.apache.flink.api.dag.Transformation -import org.apache.flink.streaming.api.operators.SimpleOperatorFactory import org.apache.flink.table.api.TableException -import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator -import org.apache.flink.table.planner.delegation.BatchPlanner import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef} import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory} import org.apache.flink.table.planner.plan.nodes.calcite.Rank -import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil -import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode, ExecEdge} +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecRank import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecJoinRuleBase import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil} import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankRange, RankType} -import org.apache.flink.table.runtime.operators.sort.RankOperator -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.calcite.plan._ import org.apache.calcite.rel.RelDistribution.Type @@ -53,7 +46,7 @@ import scala.collection.JavaConversions._ * * This node supports two-stage(local and global) rank to reduce data-shuffling. */ -class BatchExecRank( +class BatchPhysicalRank( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -74,8 +67,7 @@ class BatchExecRank( rankRange, rankNumberType, outputRankNumber) - with BatchPhysicalRel - with LegacyBatchExecNode[RowData] { + with BatchPhysicalRel { require(rankType == RankType.RANK, "Only RANK is supported now") val (rankStart, rankEnd) = rankRange match { @@ -84,7 +76,7 @@ class BatchExecRank( } override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchExecRank( + new BatchPhysicalRank( cluster, traitSet, inputs.get(0), @@ -235,54 +227,16 @@ class BatchExecRank( } } - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT) - - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { - val input = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val outputType = FlinkTypeFactory.toLogicalRowType(getRowType) - val partitionBySortingKeys = partitionKey.toArray - // The collation for the partition-by fields is inessential here, we only use the - // comparator to distinguish different groups. - // (order[is_asc], null_is_last) - val partitionBySortCollation = partitionBySortingKeys.map(_ => (true, true)) - - // The collation for the order-by fields is inessential here, we only use the - // comparator to distinguish order-by fields change. - // (order[is_asc], null_is_last) - val orderByCollation = orderKey.getFieldCollations.map(_ => (true, true)).toArray - val orderByKeys = orderKey.getFieldCollations.map(_.getFieldIndex).toArray - - val inputType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType) - //operator needn't cache data - val operator = new RankOperator( - ComparatorCodeGenerator.gen( - planner.getTableConfig, - "PartitionByComparator", - partitionBySortingKeys, - partitionBySortingKeys.map(inputType.getTypeAt), - partitionBySortCollation.map(_._1), - partitionBySortCollation.map(_._2)), - ComparatorCodeGenerator.gen( - planner.getTableConfig, - "OrderByComparator", - orderByKeys, - orderByKeys.map(inputType.getTypeAt), - orderByCollation.map(_._1), - orderByCollation.map(_._2)), + override def translateToExecNode(): ExecNode[_] = { + new BatchExecRank( + partitionKey.toArray, + orderKey.getFieldCollations.map(_.getFieldIndex).toArray, rankStart, rankEnd, - outputRankNumber) - - ExecNodeUtil.createOneInputTransformation( - input, - getRelDetailedDescription, - SimpleOperatorFactory.of(operator), - InternalTypeInfo.of(outputType), - input.getParallelism, - 0) + outputRankNumber, + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index fc754285cea..a1801e92159 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -409,7 +409,7 @@ object FlinkBatchRuleSets { BatchPhysicalLimitRule.INSTANCE, BatchExecSortLimitRule.INSTANCE, // rank - BatchExecRankRule.INSTANCE, + BatchPhysicalRankRule.INSTANCE, RemoveRedundantLocalRankRule.INSTANCE, // expand BatchPhysicalExpandRule.INSTANCE, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecRankRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala similarity index 90% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecRankRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala index 9571bb740f5..2031bc13707 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecRankRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala @@ -22,7 +22,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType} @@ -36,18 +36,18 @@ import scala.collection.JavaConversions._ * Rule that matches [[FlinkLogicalRank]] with rank function and constant rank range, * and converts it to * {{{ - * BatchExecRank (global) + * BatchPhysicalRank (global) * +- BatchPhysicalExchange (singleton if partition keys is empty, else hash) - * +- BatchExecRank (local) + * +- BatchPhysicalRank (local) * +- input of rank * }}} */ -class BatchExecRankRule +class BatchPhysicalRankRule extends ConverterRule( classOf[FlinkLogicalRank], FlinkConventions.LOGICAL, FlinkConventions.BATCH_PHYSICAL, - "BatchExecRankRule") { + "BatchPhysicalRankRule") { override def matches(call: RelOptRuleCall): Boolean = { val rank: FlinkLogicalRank = call.rel(0) @@ -70,9 +70,9 @@ class BatchExecRankRule val localRequiredTraitSet = emptyTraits.replace(sortCollation) val newLocalInput = RelOptRule.convert(rank.getInput, localRequiredTraitSet) - // create local BatchExecRank + // create local BatchPhysicalRank val localRankRange = new ConstantRankRange(1, rankEnd) // local rank always start from 1 - val localRank = new BatchExecRank( + val localRank = new BatchPhysicalRank( cluster, emptyTraits, newLocalInput, @@ -85,7 +85,7 @@ class BatchExecRankRule isGlobal = false ) - // create local BatchExecRank + // create local BatchPhysicalRank val globalRequiredDistribution = if (rank.partitionKey.isEmpty) { FlinkRelDistribution.SINGLETON } else { @@ -98,7 +98,7 @@ class BatchExecRankRule // require SINGLETON or HASH exchange val newGlobalInput = RelOptRule.convert(localRank, globalRequiredTraitSet) - val globalRank = new BatchExecRank( + val globalRank = new BatchPhysicalRank( cluster, emptyTraits, newGlobalInput, @@ -114,6 +114,6 @@ class BatchExecRankRule } } -object BatchExecRankRule { - val INSTANCE: RelOptRule = new BatchExecRankRule +object BatchPhysicalRankRule { + val INSTANCE: RelOptRule = new BatchPhysicalRankRule } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala index 7d8d488f1a7..50490c64a6a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.planner.plan.nodes.FlinkConventions -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} @@ -28,18 +28,18 @@ import org.apache.calcite.rel.RelNode import scala.collection.JavaConversions._ /** - * Planner rule that matches a global [[BatchExecRank]] on a local [[BatchExecRank]], - * and merge them into a global [[BatchExecRank]]. + * Planner rule that matches a global [[BatchPhysicalRank]] on a local [[BatchPhysicalRank]], + * and merge them into a global [[BatchPhysicalRank]]. */ class RemoveRedundantLocalRankRule extends RelOptRule( - operand(classOf[BatchExecRank], - operand(classOf[BatchExecRank], + operand(classOf[BatchPhysicalRank], + operand(classOf[BatchPhysicalRank], operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))), "RemoveRedundantLocalRankRule") { override def matches(call: RelOptRuleCall): Boolean = { - val globalRank: BatchExecRank = call.rel(0) - val localRank: BatchExecRank = call.rel(1) + val globalRank: BatchPhysicalRank = call.rel(0) + val localRank: BatchPhysicalRank = call.rel(1) globalRank.isGlobal && !localRank.isGlobal && globalRank.rankType == localRank.rankType && globalRank.partitionKey == localRank.partitionKey && @@ -48,7 +48,7 @@ class RemoveRedundantLocalRankRule extends RelOptRule( } override def onMatch(call: RelOptRuleCall): Unit = { - val globalRank: BatchExecRank = call.rel(0) + val globalRank: BatchPhysicalRank = call.rel(0) val inputOfLocalRank: RelNode = call.rel(2) val newGlobalRank = globalRank.copy(globalRank.getTraitSet, List(inputOfLocalRank)) call.transformTo(newGlobalRank) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala index da04c77a5ed..b2359939ae6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.metadata import org.apache.flink.table.api.TableException -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank import org.apache.flink.table.planner.plan.stats._ import org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil import org.apache.flink.table.planner.{JBoolean, JDouble} @@ -331,7 +331,8 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase { assertNull(mq.getColumnInterval(rank, 5)) assertNull(mq.getColumnInterval(rank, 6)) rank match { - case r: BatchExecRank if !r.isGlobal => // local batch rank does not output rank function + case r: BatchPhysicalRank if !r.isGlobal => + // local batch rank does not output rank function case _ => assertEquals(ValueInterval(bd(1), bd(5)), mq.getColumnInterval(rank, 7)) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala index a698b129ad3..d81e8cdcff0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.plan.metadata -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil import org.apache.calcite.rel.metadata.RelMdUtil @@ -307,7 +307,7 @@ class FlinkRelMdDistinctRowCountTest extends FlinkRelMdHandlerTestBase { assertEquals(2.0, mq.getDistinctRowCount(rank, ImmutableBitSet.of(5), null)) assertEquals(null, mq.getDistinctRowCount(rank, ImmutableBitSet.of(6), null)) rank match { - case r: BatchExecRank if !r.isGlobal => // local rank does not output rank func + case r: BatchPhysicalRank if !r.isGlobal => // local rank does not output rank func case _ => assertEquals(5.0, mq.getDistinctRowCount(rank, ImmutableBitSet.of(7), null)) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index cbc71d461ca..c571a1628e6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -448,7 +448,7 @@ class FlinkRelMdHandlerTestBase { outputRankNumber = true ) - val batchLocalRank = new BatchExecRank( + val batchLocalRank = new BatchPhysicalRank( cluster, batchPhysicalTraits, studentBatchScan, @@ -464,7 +464,7 @@ class FlinkRelMdHandlerTestBase { val hash6 = FlinkRelDistribution.hash(Array(6), requireStrict = true) val batchExchange = new BatchPhysicalExchange( cluster, batchLocalRank.getTraitSet.replace(hash6), batchLocalRank, hash6) - val batchGlobalRank = new BatchExecRank( + val batchGlobalRank = new BatchPhysicalRank( cluster, batchPhysicalTraits, batchExchange, @@ -530,7 +530,7 @@ class FlinkRelMdHandlerTestBase { outputRankNumber = true ) - val batchLocalRank = new BatchExecRank( + val batchLocalRank = new BatchPhysicalRank( cluster, batchPhysicalTraits, studentBatchScan, @@ -546,7 +546,7 @@ class FlinkRelMdHandlerTestBase { val hash6 = FlinkRelDistribution.hash(Array(6), requireStrict = true) val batchExchange = new BatchPhysicalExchange( cluster, batchLocalRank.getTraitSet.replace(hash6), batchLocalRank, hash6) - val batchGlobalRank = new BatchExecRank( + val batchGlobalRank = new BatchPhysicalRank( cluster, batchPhysicalTraits, batchExchange, diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala index db79714cc03..e57c3ef0779 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.plan.metadata -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.util.ImmutableBitSet @@ -179,7 +179,7 @@ class FlinkRelMdPopulationSizeTest extends FlinkRelMdHandlerTestBase { assertNull(mq.getPopulationSize(rank, ImmutableBitSet.of(6))) assertEquals(50.0, mq.getPopulationSize(rank, ImmutableBitSet.of(0, 2))) rank match { - case r: BatchExecRank => + case r: BatchPhysicalRank => // local batch rank does not output rank func // TODO re-check this if (r.isGlobal) { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala index 906a7f82bb3..dab451cd797 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.metadata import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.planner.plan.nodes.calcite.LogicalExpand import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalDataStreamTableScan, FlinkLogicalExpand, FlinkLogicalOverAggregate} -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecRank, BatchPhysicalCalc} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalRank, BatchPhysicalCalc} import org.apache.flink.table.planner.plan.utils.ExpandUtil import com.google.common.collect.{ImmutableList, Lists} @@ -196,7 +196,7 @@ class FlinkRelMdSelectivityTest extends FlinkRelMdHandlerTestBase { assertEquals(1.0 / 7.0, mq.getSelectivity(rank, condition1)) rank match { - case r: BatchExecRank if !r.isGlobal => // batch local rank does not output rank fun + case r: BatchPhysicalRank if !r.isGlobal => // batch local rank does not output rank fun case _ => // rk > 2 val condition2 = -- GitLab