提交 677d392a 编写于 作者: 龙三 提交者: godfreyhe

[FLINK-20736][table-planner-blink] Introduce BatchPhysicalLimit, and make...

[FLINK-20736][table-planner-blink] Introduce BatchPhysicalLimit, and make BatchExecLimit only extended from ExecNode

This closes #14472
上级 f3d03eb4
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.batch;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.operators.sort.LimitOperator;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Collections;
/**
* Batch {@link ExecNode} for Limit.
*/
public class BatchExecLimit extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
private final long limitStart;
private final long limitEnd;
private final boolean isGlobal;
public BatchExecLimit(
long limitStart,
long limitEnd,
boolean isGlobal,
ExecEdge inputEdge,
LogicalType outputType,
String description) {
super(Collections.singletonList(inputEdge), outputType, description);
this.isGlobal = isGlobal;
this.limitStart = limitStart;
this.limitEnd = limitEnd;
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
Transformation<RowData> input =
(Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
LimitOperator operator = new LimitOperator(isGlobal, limitStart, limitEnd);
Transformation<RowData> transformation = ExecNodeUtil.createOneInputTransformation(
input,
getDesc(),
SimpleOperatorFactory.of(operator),
input.getOutputType(),
input.getParallelism(),
0);
if (inputsContainSingleton()) {
transformation.setParallelism(1);
transformation.setMaxParallelism(1);
}
return transformation;
}
}
......@@ -18,17 +18,13 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.cost.FlinkCost._
import org.apache.flink.table.planner.plan.cost.FlinkCostFactory
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode, ExecEdge}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil.fetchToString
import org.apache.flink.table.planner.plan.utils.SortUtil
import org.apache.flink.table.runtime.operators.sort.LimitOperator
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel._
......@@ -36,16 +32,12 @@ import org.apache.calcite.rel.core.Sort
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexNode
import java.util
import scala.collection.JavaConversions._
/**
* Batch physical RelNode for [[Sort]].
*
* This node will output `limit` records beginning with the first `offset` records without sort.
*/
class BatchExecLimit(
class BatchPhysicalLimit(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
......@@ -59,8 +51,7 @@ class BatchExecLimit(
traitSet.getTrait(RelCollationTraitDef.INSTANCE),
offset,
fetch)
with BatchPhysicalRel
with LegacyBatchExecNode[RowData] {
with BatchPhysicalRel {
private lazy val limitStart: Long = SortUtil.getLimitStart(offset)
private lazy val limitEnd: Long = SortUtil.getLimitEnd(offset, fetch)
......@@ -71,7 +62,7 @@ class BatchExecLimit(
newCollation: RelCollation,
offset: RexNode,
fetch: RexNode): Sort = {
new BatchExecLimit(cluster, traitSet, newInput, offset, fetch, isGlobal)
new BatchPhysicalLimit(cluster, traitSet, newInput, offset, fetch, isGlobal)
}
override def explainTerms(pw: RelWriter): RelWriter = {
......@@ -88,22 +79,13 @@ class BatchExecLimit(
costFactory.makeCost(rowCount, cpuCost, 0, 0, 0)
}
//~ ExecNode methods -----------------------------------------------------------
override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[RowData] = {
val input = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val inputType = input.getOutputType
val operator = new LimitOperator(isGlobal, limitStart, limitEnd)
ExecNodeUtil.createOneInputTransformation(
input,
getRelDetailedDescription,
SimpleOperatorFactory.of(operator),
inputType,
input.getParallelism,
0)
override def translateToExecNode(): ExecNode[_] = {
new BatchExecLimit(
limitStart,
limitEnd,
isGlobal,
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}
}
......@@ -405,7 +405,7 @@ object FlinkBatchRuleSets {
BatchPhysicalUnionRule.INSTANCE,
// sort
BatchExecSortRule.INSTANCE,
BatchExecLimitRule.INSTANCE,
BatchPhysicalLimitRule.INSTANCE,
BatchExecSortLimitRule.INSTANCE,
// rank
BatchExecRankRule.INSTANCE,
......
......@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLimit
import org.apache.flink.table.planner.plan.utils.SortUtil
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
......@@ -34,25 +34,25 @@ import org.apache.calcite.sql.`type`.SqlTypeName
* Rule that matches [[FlinkLogicalSort]] with empty sort fields,
* and converts it to
* {{{
* BatchExecLimit (global)
* BatchPhysicalLimit (global)
* +- BatchPhysicalExchange (singleton)
* +- BatchExecLimit (local)
* +- BatchPhysicalLimit (local)
* +- input of sort
* }}}
* when fetch is not null, or
* {{{
* BatchExecLimit
* BatchPhysicalLimit
* +- BatchPhysicalExchange (singleton)
* +- input of sort
* }}}
* when fetch is null.
*/
class BatchExecLimitRule
class BatchPhysicalLimitRule
extends ConverterRule(
classOf[FlinkLogicalSort],
FlinkConventions.LOGICAL,
FlinkConventions.BATCH_PHYSICAL,
"BatchExecLimitRule") {
"BatchPhysicalLimitRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
......@@ -69,14 +69,14 @@ class BatchExecLimitRule
val traitSet = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
val newLocalInput = RelOptRule.convert(input, traitSet)
// if fetch is null, there is no need to create local BatchExecLimit
// if fetch is null, there is no need to create local BatchPhysicalLimit
val inputOfExchange = if (sort.fetch != null) {
val providedLocalTraitSet = traitSet
val limit = SortUtil.getLimitEnd(sort.offset, sort.fetch)
val rexBuilder = sort.getCluster.getRexBuilder
val intType = rexBuilder.getTypeFactory.createSqlType(SqlTypeName.INTEGER)
// for local BatchExecLimit, offset is always 0, and fetch is `limit`
new BatchExecLimit(
// for local BatchPhysicalLimit, offset is always 0, and fetch is `limit`
new BatchPhysicalLimit(
rel.getCluster,
providedLocalTraitSet,
newLocalInput,
......@@ -91,9 +91,9 @@ class BatchExecLimitRule
val newTraitSet = traitSet.replace(FlinkRelDistribution.SINGLETON)
val newInput = RelOptRule.convert(inputOfExchange, newTraitSet)
// create global BatchExecLimit
// create global BatchPhysicalLimit
val providedGlobalTraitSet = newTraitSet
new BatchExecLimit(
new BatchPhysicalLimit(
rel.getCluster,
providedGlobalTraitSet,
newInput,
......@@ -103,6 +103,6 @@ class BatchExecLimitRule
}
}
object BatchExecLimitRule {
val INSTANCE: RelOptRule = new BatchExecLimitRule
object BatchPhysicalLimitRule {
val INSTANCE: RelOptRule = new BatchPhysicalLimitRule
}
......@@ -346,18 +346,18 @@ class FlinkRelMdHandlerTestBase {
cluster, flinkLogicalTraits.replace(collation), studentFlinkLogicalScan, collation,
logicalSort.offset, logicalSort.fetch)
val batchSort = new BatchExecLimit(cluster, batchPhysicalTraits.replace(collation),
val batchSort = new BatchPhysicalLimit(cluster, batchPhysicalTraits.replace(collation),
new BatchPhysicalExchange(
cluster, batchPhysicalTraits.replace(FlinkRelDistribution.SINGLETON), studentBatchScan,
FlinkRelDistribution.SINGLETON),
logicalSort.offset, logicalSort.fetch, true)
val batchSortLocal = new BatchExecLimit(cluster, batchPhysicalTraits.replace(collation),
val batchSortLocal = new BatchPhysicalLimit(cluster, batchPhysicalTraits.replace(collation),
studentBatchScan,
relBuilder.literal(0),
relBuilder.literal(SortUtil.getLimitEnd(logicalSort.offset, logicalSort.fetch)),
false)
val batchSortGlobal = new BatchExecLimit(cluster, batchPhysicalTraits.replace(collation),
val batchSortGlobal = new BatchPhysicalLimit(cluster, batchPhysicalTraits.replace(collation),
new BatchPhysicalExchange(
cluster, batchPhysicalTraits.replace(FlinkRelDistribution.SINGLETON), batchSortLocal,
FlinkRelDistribution.SINGLETON),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册