From 7668e4e37d6314781825530293bf262a14376c56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E4=B8=89?= Date: Mon, 28 Dec 2020 11:42:23 +0800 Subject: [PATCH] [FLINK-20766][table-planner-blink] Introduce BatchPhysicalSortLimit, and make BatchExecSortLimit only extended from ExecNode This closes #14502 --- .../nodes/exec/batch/BatchExecSortLimit.java | 105 ++++++++++++++++++ ...mit.scala => BatchPhysicalSortLimit.scala} | 67 +++-------- .../plan/rules/FlinkBatchRuleSets.scala | 2 +- ...scala => BatchPhysicalSortLimitRule.scala} | 20 ++-- .../metadata/FlinkRelMdHandlerTestBase.scala | 8 +- 5 files changed, 137 insertions(+), 65 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/{BatchExecSortLimit.scala => BatchPhysicalSortLimit.scala} (62%) rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/{BatchExecSortLimitRule.scala => BatchPhysicalSortLimitRule.scala} (91%) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java new file mode 100644 index 00000000000..e15bd39fec8 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.utils.SortSpec; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.operators.sort.SortLimitOperator; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; + +/** + * {@link BatchExecNode} for Sort with limit. + * + *

This node will output data rank from `limitStart` to `limitEnd`. + */ +public class BatchExecSortLimit extends ExecNodeBase implements BatchExecNode { + + private final SortSpec sortSpec; + private final long limitStart; + private final long limitEnd; + private final boolean isGlobal; + + public BatchExecSortLimit( + SortSpec sortSpec, + long limitStart, + long limitEnd, + boolean isGlobal, + ExecEdge inputEdge, + RowType outputType, + String description) { + super(Collections.singletonList(inputEdge), outputType, description); + this.sortSpec = sortSpec; + this.limitStart = limitStart; + this.limitEnd = limitEnd; + this.isGlobal = isGlobal; + } + + @SuppressWarnings("unchecked") + @Override + protected Transformation translateToPlanInternal(PlannerBase planner) { + if (limitEnd == Long.MAX_VALUE) { + throw new TableException("Not support limitEnd is max value now!"); + } + + ExecNode inputNode = (ExecNode) getInputNodes().get(0); + Transformation inputTransform = inputNode.translateToPlan(planner); + + RowType inputType = (RowType) inputNode.getOutputType(); + // generate comparator + GeneratedRecordComparator genComparator = + ComparatorCodeGenerator.gen( + planner.getTableConfig(), + "SortLimitComparator", + sortSpec.getFieldIndices(), + sortSpec.getFieldTypes(inputType), + sortSpec.getAscendingOrders(), + sortSpec.getNullsIsLast()); + + // TODO If input is ordered, there is no need to use the heap. + SortLimitOperator operator = + new SortLimitOperator(isGlobal, limitStart, limitEnd, genComparator); + + OneInputTransformation transform = + new OneInputTransformation<>( + inputTransform, + getDesc(), + SimpleOperatorFactory.of(operator), + InternalTypeInfo.of(inputType), + inputTransform.getParallelism()); + + if (inputsContainSingleton()) { + transform.setParallelism(1); + transform.setMaxParallelism(1); + } + return transform; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortLimit.scala similarity index 62% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortLimit.scala index cd5293568de..78b7d60b40b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortLimit.scala @@ -18,18 +18,11 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch -import org.apache.flink.api.dag.Transformation -import org.apache.flink.streaming.api.operators.SimpleOperatorFactory -import org.apache.flink.table.api.TableException -import org.apache.flink.table.data.RowData -import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator -import org.apache.flink.table.planner.delegation.BatchPlanner +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory} -import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil -import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode, ExecEdge} +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortLimit +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.utils.{RelExplainUtil, SortUtil} -import org.apache.flink.table.runtime.operators.sort.SortLimitOperator -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.core.Sort @@ -37,8 +30,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter} import org.apache.calcite.rex.{RexLiteral, RexNode} -import java.util - import scala.collection.JavaConversions._ /** @@ -50,7 +41,7 @@ import scala.collection.JavaConversions._ * partition will forward elements to a single partition, lastly it take the `limit` elements * beginning with the first `offset` elements from the single output partition. **/ -class BatchExecSortLimit( +class BatchPhysicalSortLimit( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -59,22 +50,18 @@ class BatchExecSortLimit( fetch: RexNode, isGlobal: Boolean) extends Sort(cluster, traitSet, inputRel, sortCollation, offset, fetch) - with BatchPhysicalRel - with LegacyBatchExecNode[RowData] { + with BatchPhysicalRel { private val limitStart: Long = SortUtil.getLimitStart(offset) private val limitEnd: Long = SortUtil.getLimitEnd(offset, fetch) - private val (keys, orders, nullsIsLast) = SortUtil.getKeysAndOrders( - sortCollation.getFieldCollations) - override def copy( traitSet: RelTraitSet, newInput: RelNode, newCollation: RelCollation, offset: RexNode, fetch: RexNode): Sort = { - new BatchExecSortLimit(cluster, traitSet, newInput, newCollation, offset, fetch, isGlobal) + new BatchPhysicalSortLimit(cluster, traitSet, newInput, newCollation, offset, fetch, isGlobal) } override def explainTerms(pw: RelWriter): RelWriter = { @@ -111,37 +98,15 @@ class BatchExecSortLimit( costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost) } - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List( - ExecEdge.builder() - .damBehavior(ExecEdge.DamBehavior.END_INPUT) - .build()) - - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { - if (limitEnd == Long.MaxValue) { - throw new TableException("Not support limitEnd is max value now!") - } - - val input = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val inputType = input.getOutputType.asInstanceOf[InternalTypeInfo[RowData]] - val types = inputType.toRowFieldTypes - - // generate comparator - val genComparator = ComparatorCodeGenerator.gen( - planner.getTableConfig, "SortLimitComparator", keys, keys.map(types(_)), orders, nullsIsLast) - - // TODO If input is ordered, there is no need to use the heap. - val operator = new SortLimitOperator(isGlobal, limitStart, limitEnd, genComparator) - - ExecNodeUtil.createOneInputTransformation( - input, - getRelDetailedDescription, - SimpleOperatorFactory.of(operator), - inputType, - input.getParallelism, - 0) + override def translateToExecNode(): ExecNode[_] = { + new BatchExecSortLimit( + SortUtil.getSortSpec(sortCollation.getFieldCollations), + limitStart, + limitEnd, + isGlobal, + ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).build(), + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index a3ca2cf00e4..b29f79186d0 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -407,7 +407,7 @@ object FlinkBatchRuleSets { // sort BatchPhysicalSortRule.INSTANCE, BatchPhysicalLimitRule.INSTANCE, - BatchExecSortLimitRule.INSTANCE, + BatchPhysicalSortLimitRule.INSTANCE, // rank BatchPhysicalRankRule.INSTANCE, RemoveRedundantLocalRankRule.INSTANCE, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSortLimitRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala similarity index 91% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSortLimitRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala index 82ff6edfa53..ef7c0ba71c0 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSortLimitRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSortLimit +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortLimit import org.apache.flink.table.planner.plan.utils.SortUtil import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} @@ -33,25 +33,25 @@ import org.apache.calcite.sql.`type`.SqlTypeName * Rule that matches [[FlinkLogicalSort]] with non-empty sort fields and non-null fetch or offset, * and converts it to * {{{ - * BatchExecSortLimit (global) + * BatchPhysicalSortLimit (global) * +- BatchPhysicalExchange (singleton) - * +- BatchExecSortLimit (local) + * +- BatchPhysicalSortLimit (local) * +- input of sort * }}} * when fetch is not null, or * {{{ - * BatchExecSortLimit (global) + * BatchPhysicalSortLimit (global) * +- BatchPhysicalExchange (singleton) * +- input of sort * }}} * when fetch is null */ -class BatchExecSortLimitRule +class BatchPhysicalSortLimitRule extends ConverterRule( classOf[FlinkLogicalSort], FlinkConventions.LOGICAL, FlinkConventions.BATCH_PHYSICAL, - "BatchExecSortLimitRule") { + "BatchPhysicalSortLimitRule") { override def matches(call: RelOptRuleCall): Boolean = { val sort: FlinkLogicalSort = call.rel(0) @@ -72,7 +72,7 @@ class BatchExecSortLimitRule val intType = rexBuilder.getTypeFactory.createSqlType(SqlTypeName.INTEGER) val providedLocalTraitSet = localRequiredTrait.replace(sort.getCollation) // for local BatchExecSortLimit, offset is always 0, and fetch is `limit` - new BatchExecSortLimit( + new BatchPhysicalSortLimit( rel.getCluster, providedLocalTraitSet, localInput, @@ -92,7 +92,7 @@ class BatchExecSortLimitRule // create global BatchExecSortLimit val providedGlobalTraitSet = requiredTrait.replace(sort.getCollation) - new BatchExecSortLimit( + new BatchPhysicalSortLimit( rel.getCluster, providedGlobalTraitSet, newInput, @@ -104,6 +104,6 @@ class BatchExecSortLimitRule } } -object BatchExecSortLimitRule { - val INSTANCE: RelOptRule = new BatchExecSortLimitRule +object BatchPhysicalSortLimitRule { + val INSTANCE: RelOptRule = new BatchPhysicalSortLimitRule } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 6eb567cd8b6..bb9e873d84d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -389,18 +389,20 @@ class FlinkRelMdHandlerTestBase { val flinkLogicalSortLimit = new FlinkLogicalSort(cluster, flinkLogicalTraits.replace(collection), studentFlinkLogicalScan, collection, offset, fetch) - val batchSortLimit = new BatchExecSortLimit(cluster, batchPhysicalTraits.replace(collection), + val batchSortLimit = new BatchPhysicalSortLimit( + cluster, batchPhysicalTraits.replace(collection), new BatchPhysicalExchange( cluster, batchPhysicalTraits.replace(FlinkRelDistribution.SINGLETON), studentBatchScan, FlinkRelDistribution.SINGLETON), collection, offset, fetch, true) - val batchSortLocalLimit = new BatchExecSortLimit(cluster, + val batchSortLocalLimit = new BatchPhysicalSortLimit(cluster, batchPhysicalTraits.replace(collection), studentBatchScan, collection, relBuilder.literal(0), relBuilder.literal(SortUtil.getLimitEnd(offset, fetch)), false) - val batchSortGlobal = new BatchExecSortLimit(cluster, batchPhysicalTraits.replace(collection), + val batchSortGlobal = new BatchPhysicalSortLimit( + cluster, batchPhysicalTraits.replace(collection), new BatchPhysicalExchange( cluster, batchPhysicalTraits.replace(FlinkRelDistribution.SINGLETON), batchSortLocalLimit, FlinkRelDistribution.SINGLETON), -- GitLab