提交 7668e4e3 编写于 作者: 龙三 提交者: godfreyhe

[FLINK-20766][table-planner-blink] Introduce BatchPhysicalSortLimit, and make...

[FLINK-20766][table-planner-blink] Introduce BatchPhysicalSortLimit, and make BatchExecSortLimit only extended from ExecNode

This closes #14502
上级 ae796bb6
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.batch;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.utils.SortSpec;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.operators.sort.SortLimitOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
/**
* {@link BatchExecNode} for Sort with limit.
*
* <p>This node will output data rank from `limitStart` to `limitEnd`.
*/
public class BatchExecSortLimit extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
private final SortSpec sortSpec;
private final long limitStart;
private final long limitEnd;
private final boolean isGlobal;
public BatchExecSortLimit(
SortSpec sortSpec,
long limitStart,
long limitEnd,
boolean isGlobal,
ExecEdge inputEdge,
RowType outputType,
String description) {
super(Collections.singletonList(inputEdge), outputType, description);
this.sortSpec = sortSpec;
this.limitStart = limitStart;
this.limitEnd = limitEnd;
this.isGlobal = isGlobal;
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
if (limitEnd == Long.MAX_VALUE) {
throw new TableException("Not support limitEnd is max value now!");
}
ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
RowType inputType = (RowType) inputNode.getOutputType();
// generate comparator
GeneratedRecordComparator genComparator =
ComparatorCodeGenerator.gen(
planner.getTableConfig(),
"SortLimitComparator",
sortSpec.getFieldIndices(),
sortSpec.getFieldTypes(inputType),
sortSpec.getAscendingOrders(),
sortSpec.getNullsIsLast());
// TODO If input is ordered, there is no need to use the heap.
SortLimitOperator operator =
new SortLimitOperator(isGlobal, limitStart, limitEnd, genComparator);
OneInputTransformation<RowData, RowData> transform =
new OneInputTransformation<>(
inputTransform,
getDesc(),
SimpleOperatorFactory.of(operator),
InternalTypeInfo.of(inputType),
inputTransform.getParallelism());
if (inputsContainSingleton()) {
transform.setParallelism(1);
transform.setMaxParallelism(1);
}
return transform;
}
}
......@@ -18,18 +18,11 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory
import org.apache.flink.table.api.TableException
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode, ExecEdge}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortLimit
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils.{RelExplainUtil, SortUtil}
import org.apache.flink.table.runtime.operators.sort.SortLimitOperator
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.core.Sort
......@@ -37,8 +30,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
import org.apache.calcite.rex.{RexLiteral, RexNode}
import java.util
import scala.collection.JavaConversions._
/**
......@@ -50,7 +41,7 @@ import scala.collection.JavaConversions._
* partition will forward elements to a single partition, lastly it take the `limit` elements
* beginning with the first `offset` elements from the single output partition.
**/
class BatchExecSortLimit(
class BatchPhysicalSortLimit(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
......@@ -59,22 +50,18 @@ class BatchExecSortLimit(
fetch: RexNode,
isGlobal: Boolean)
extends Sort(cluster, traitSet, inputRel, sortCollation, offset, fetch)
with BatchPhysicalRel
with LegacyBatchExecNode[RowData] {
with BatchPhysicalRel {
private val limitStart: Long = SortUtil.getLimitStart(offset)
private val limitEnd: Long = SortUtil.getLimitEnd(offset, fetch)
private val (keys, orders, nullsIsLast) = SortUtil.getKeysAndOrders(
sortCollation.getFieldCollations)
override def copy(
traitSet: RelTraitSet,
newInput: RelNode,
newCollation: RelCollation,
offset: RexNode,
fetch: RexNode): Sort = {
new BatchExecSortLimit(cluster, traitSet, newInput, newCollation, offset, fetch, isGlobal)
new BatchPhysicalSortLimit(cluster, traitSet, newInput, newCollation, offset, fetch, isGlobal)
}
override def explainTerms(pw: RelWriter): RelWriter = {
......@@ -111,37 +98,15 @@ class BatchExecSortLimit(
costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost)
}
//~ ExecNode methods -----------------------------------------------------------
override def getInputEdges: util.List[ExecEdge] = List(
ExecEdge.builder()
.damBehavior(ExecEdge.DamBehavior.END_INPUT)
.build())
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[RowData] = {
if (limitEnd == Long.MaxValue) {
throw new TableException("Not support limitEnd is max value now!")
}
val input = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val inputType = input.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
val types = inputType.toRowFieldTypes
// generate comparator
val genComparator = ComparatorCodeGenerator.gen(
planner.getTableConfig, "SortLimitComparator", keys, keys.map(types(_)), orders, nullsIsLast)
// TODO If input is ordered, there is no need to use the heap.
val operator = new SortLimitOperator(isGlobal, limitStart, limitEnd, genComparator)
ExecNodeUtil.createOneInputTransformation(
input,
getRelDetailedDescription,
SimpleOperatorFactory.of(operator),
inputType,
input.getParallelism,
0)
override def translateToExecNode(): ExecNode[_] = {
new BatchExecSortLimit(
SortUtil.getSortSpec(sortCollation.getFieldCollations),
limitStart,
limitEnd,
isGlobal,
ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).build(),
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
......@@ -407,7 +407,7 @@ object FlinkBatchRuleSets {
// sort
BatchPhysicalSortRule.INSTANCE,
BatchPhysicalLimitRule.INSTANCE,
BatchExecSortLimitRule.INSTANCE,
BatchPhysicalSortLimitRule.INSTANCE,
// rank
BatchPhysicalRankRule.INSTANCE,
RemoveRedundantLocalRankRule.INSTANCE,
......
......@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSortLimit
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortLimit
import org.apache.flink.table.planner.plan.utils.SortUtil
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
......@@ -33,25 +33,25 @@ import org.apache.calcite.sql.`type`.SqlTypeName
* Rule that matches [[FlinkLogicalSort]] with non-empty sort fields and non-null fetch or offset,
* and converts it to
* {{{
* BatchExecSortLimit (global)
* BatchPhysicalSortLimit (global)
* +- BatchPhysicalExchange (singleton)
* +- BatchExecSortLimit (local)
* +- BatchPhysicalSortLimit (local)
* +- input of sort
* }}}
* when fetch is not null, or
* {{{
* BatchExecSortLimit (global)
* BatchPhysicalSortLimit (global)
* +- BatchPhysicalExchange (singleton)
* +- input of sort
* }}}
* when fetch is null
*/
class BatchExecSortLimitRule
class BatchPhysicalSortLimitRule
extends ConverterRule(
classOf[FlinkLogicalSort],
FlinkConventions.LOGICAL,
FlinkConventions.BATCH_PHYSICAL,
"BatchExecSortLimitRule") {
"BatchPhysicalSortLimitRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
......@@ -72,7 +72,7 @@ class BatchExecSortLimitRule
val intType = rexBuilder.getTypeFactory.createSqlType(SqlTypeName.INTEGER)
val providedLocalTraitSet = localRequiredTrait.replace(sort.getCollation)
// for local BatchExecSortLimit, offset is always 0, and fetch is `limit`
new BatchExecSortLimit(
new BatchPhysicalSortLimit(
rel.getCluster,
providedLocalTraitSet,
localInput,
......@@ -92,7 +92,7 @@ class BatchExecSortLimitRule
// create global BatchExecSortLimit
val providedGlobalTraitSet = requiredTrait.replace(sort.getCollation)
new BatchExecSortLimit(
new BatchPhysicalSortLimit(
rel.getCluster,
providedGlobalTraitSet,
newInput,
......@@ -104,6 +104,6 @@ class BatchExecSortLimitRule
}
}
object BatchExecSortLimitRule {
val INSTANCE: RelOptRule = new BatchExecSortLimitRule
object BatchPhysicalSortLimitRule {
val INSTANCE: RelOptRule = new BatchPhysicalSortLimitRule
}
......@@ -389,18 +389,20 @@ class FlinkRelMdHandlerTestBase {
val flinkLogicalSortLimit = new FlinkLogicalSort(cluster,
flinkLogicalTraits.replace(collection), studentFlinkLogicalScan, collection, offset, fetch)
val batchSortLimit = new BatchExecSortLimit(cluster, batchPhysicalTraits.replace(collection),
val batchSortLimit = new BatchPhysicalSortLimit(
cluster, batchPhysicalTraits.replace(collection),
new BatchPhysicalExchange(
cluster, batchPhysicalTraits.replace(FlinkRelDistribution.SINGLETON), studentBatchScan,
FlinkRelDistribution.SINGLETON),
collection, offset, fetch, true)
val batchSortLocalLimit = new BatchExecSortLimit(cluster,
val batchSortLocalLimit = new BatchPhysicalSortLimit(cluster,
batchPhysicalTraits.replace(collection), studentBatchScan, collection,
relBuilder.literal(0),
relBuilder.literal(SortUtil.getLimitEnd(offset, fetch)),
false)
val batchSortGlobal = new BatchExecSortLimit(cluster, batchPhysicalTraits.replace(collection),
val batchSortGlobal = new BatchPhysicalSortLimit(
cluster, batchPhysicalTraits.replace(collection),
new BatchPhysicalExchange(
cluster, batchPhysicalTraits.replace(FlinkRelDistribution.SINGLETON), batchSortLocalLimit,
FlinkRelDistribution.SINGLETON),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册