提交 dc0b143b 编写于 作者: 龙三 提交者: godfreyhe

[FLINK-20782][table-planner-blink] Introduce BatchPhysicalRank, and make...

[FLINK-20782][table-planner-blink] Introduce BatchPhysicalRank, and make BatchExecRank only extended from ExecNode

This closes #14506
上级 7aafc4c5
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.flink.table.planner.plan.nodes.exec.batch;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.runtime.operators.sort.RankOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import java.util.stream.IntStream;
/**
* {@link BatchExecNode} for Rank.
*
* <p>This node supports two-stage(local and global) rank to reduce data-shuffling.
*/
public class BatchExecRank extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
private final int[] partitionFields;
private final int[] sortFields;
private final long rankStart;
private final long rankEnd;
private final boolean outputRankNumber;
public BatchExecRank(
int[] partitionFields,
int[] sortFields,
long rankStart,
long rankEnd,
boolean outputRankNumber,
ExecEdge inputEdge,
RowType outputType,
String description) {
super(Collections.singletonList(inputEdge), outputType, description);
this.partitionFields = partitionFields;
this.sortFields = sortFields;
this.rankStart = rankStart;
this.rankEnd = rankEnd;
this.outputRankNumber = outputRankNumber;
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
RowType inputType = (RowType) inputNode.getOutputType();
LogicalType[] partitionTypes =
IntStream.of(partitionFields)
.mapToObj(inputType::getTypeAt)
.toArray(LogicalType[]::new);
LogicalType[] sortTypes =
IntStream.of(sortFields).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
// operator needn't cache data
// The collation for the partition-by and order-by fields is inessential here,
// we only use the comparator to distinguish fields change.
RankOperator operator =
new RankOperator(
ComparatorCodeGenerator.gen(
planner.getTableConfig(),
"PartitionByComparator",
partitionFields,
partitionTypes,
new boolean[partitionFields.length],
new boolean[partitionFields.length]),
ComparatorCodeGenerator.gen(
planner.getTableConfig(),
"OrderByComparator",
sortFields,
sortTypes,
new boolean[sortFields.length],
new boolean[sortFields.length]),
rankStart,
rankEnd,
outputRankNumber);
OneInputTransformation<RowData, RowData> transform =
new OneInputTransformation<>(
inputTransform,
getDesc(),
SimpleOperatorFactory.of(operator),
InternalTypeInfo.of((RowType) getOutputType()),
inputTransform.getParallelism());
if (inputsContainSingleton()) {
transform.setParallelism(1);
transform.setMaxParallelism(1);
}
return transform;
}
}
......@@ -18,23 +18,16 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory
import org.apache.flink.table.api.TableException
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.nodes.calcite.Rank
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode, ExecEdge}
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecRank
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecJoinRuleBase
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankRange, RankType}
import org.apache.flink.table.runtime.operators.sort.RankOperator
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelDistribution.Type
......@@ -53,7 +46,7 @@ import scala.collection.JavaConversions._
*
* This node supports two-stage(local and global) rank to reduce data-shuffling.
*/
class BatchExecRank(
class BatchPhysicalRank(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
......@@ -74,8 +67,7 @@ class BatchExecRank(
rankRange,
rankNumberType,
outputRankNumber)
with BatchPhysicalRel
with LegacyBatchExecNode[RowData] {
with BatchPhysicalRel {
require(rankType == RankType.RANK, "Only RANK is supported now")
val (rankStart, rankEnd) = rankRange match {
......@@ -84,7 +76,7 @@ class BatchExecRank(
}
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new BatchExecRank(
new BatchPhysicalRank(
cluster,
traitSet,
inputs.get(0),
......@@ -235,54 +227,16 @@ class BatchExecRank(
}
}
//~ ExecNode methods -----------------------------------------------------------
override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[RowData] = {
val input = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
val partitionBySortingKeys = partitionKey.toArray
// The collation for the partition-by fields is inessential here, we only use the
// comparator to distinguish different groups.
// (order[is_asc], null_is_last)
val partitionBySortCollation = partitionBySortingKeys.map(_ => (true, true))
// The collation for the order-by fields is inessential here, we only use the
// comparator to distinguish order-by fields change.
// (order[is_asc], null_is_last)
val orderByCollation = orderKey.getFieldCollations.map(_ => (true, true)).toArray
val orderByKeys = orderKey.getFieldCollations.map(_.getFieldIndex).toArray
val inputType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType)
//operator needn't cache data
val operator = new RankOperator(
ComparatorCodeGenerator.gen(
planner.getTableConfig,
"PartitionByComparator",
partitionBySortingKeys,
partitionBySortingKeys.map(inputType.getTypeAt),
partitionBySortCollation.map(_._1),
partitionBySortCollation.map(_._2)),
ComparatorCodeGenerator.gen(
planner.getTableConfig,
"OrderByComparator",
orderByKeys,
orderByKeys.map(inputType.getTypeAt),
orderByCollation.map(_._1),
orderByCollation.map(_._2)),
override def translateToExecNode(): ExecNode[_] = {
new BatchExecRank(
partitionKey.toArray,
orderKey.getFieldCollations.map(_.getFieldIndex).toArray,
rankStart,
rankEnd,
outputRankNumber)
ExecNodeUtil.createOneInputTransformation(
input,
getRelDetailedDescription,
SimpleOperatorFactory.of(operator),
InternalTypeInfo.of(outputType),
input.getParallelism,
0)
outputRankNumber,
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
......@@ -409,7 +409,7 @@ object FlinkBatchRuleSets {
BatchPhysicalLimitRule.INSTANCE,
BatchExecSortLimitRule.INSTANCE,
// rank
BatchExecRankRule.INSTANCE,
BatchPhysicalRankRule.INSTANCE,
RemoveRedundantLocalRankRule.INSTANCE,
// expand
BatchPhysicalExpandRule.INSTANCE,
......
......@@ -22,7 +22,7 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType}
......@@ -36,18 +36,18 @@ import scala.collection.JavaConversions._
* Rule that matches [[FlinkLogicalRank]] with rank function and constant rank range,
* and converts it to
* {{{
* BatchExecRank (global)
* BatchPhysicalRank (global)
* +- BatchPhysicalExchange (singleton if partition keys is empty, else hash)
* +- BatchExecRank (local)
* +- BatchPhysicalRank (local)
* +- input of rank
* }}}
*/
class BatchExecRankRule
class BatchPhysicalRankRule
extends ConverterRule(
classOf[FlinkLogicalRank],
FlinkConventions.LOGICAL,
FlinkConventions.BATCH_PHYSICAL,
"BatchExecRankRule") {
"BatchPhysicalRankRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
......@@ -70,9 +70,9 @@ class BatchExecRankRule
val localRequiredTraitSet = emptyTraits.replace(sortCollation)
val newLocalInput = RelOptRule.convert(rank.getInput, localRequiredTraitSet)
// create local BatchExecRank
// create local BatchPhysicalRank
val localRankRange = new ConstantRankRange(1, rankEnd) // local rank always start from 1
val localRank = new BatchExecRank(
val localRank = new BatchPhysicalRank(
cluster,
emptyTraits,
newLocalInput,
......@@ -85,7 +85,7 @@ class BatchExecRankRule
isGlobal = false
)
// create local BatchExecRank
// create local BatchPhysicalRank
val globalRequiredDistribution = if (rank.partitionKey.isEmpty) {
FlinkRelDistribution.SINGLETON
} else {
......@@ -98,7 +98,7 @@ class BatchExecRankRule
// require SINGLETON or HASH exchange
val newGlobalInput = RelOptRule.convert(localRank, globalRequiredTraitSet)
val globalRank = new BatchExecRank(
val globalRank = new BatchPhysicalRank(
cluster,
emptyTraits,
newGlobalInput,
......@@ -114,6 +114,6 @@ class BatchExecRankRule
}
}
object BatchExecRankRule {
val INSTANCE: RelOptRule = new BatchExecRankRule
object BatchPhysicalRankRule {
val INSTANCE: RelOptRule = new BatchPhysicalRankRule
}
......@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.rules.physical.batch
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
......@@ -28,18 +28,18 @@ import org.apache.calcite.rel.RelNode
import scala.collection.JavaConversions._
/**
* Planner rule that matches a global [[BatchExecRank]] on a local [[BatchExecRank]],
* and merge them into a global [[BatchExecRank]].
* Planner rule that matches a global [[BatchPhysicalRank]] on a local [[BatchPhysicalRank]],
* and merge them into a global [[BatchPhysicalRank]].
*/
class RemoveRedundantLocalRankRule extends RelOptRule(
operand(classOf[BatchExecRank],
operand(classOf[BatchExecRank],
operand(classOf[BatchPhysicalRank],
operand(classOf[BatchPhysicalRank],
operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))),
"RemoveRedundantLocalRankRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val globalRank: BatchExecRank = call.rel(0)
val localRank: BatchExecRank = call.rel(1)
val globalRank: BatchPhysicalRank = call.rel(0)
val localRank: BatchPhysicalRank = call.rel(1)
globalRank.isGlobal && !localRank.isGlobal &&
globalRank.rankType == localRank.rankType &&
globalRank.partitionKey == localRank.partitionKey &&
......@@ -48,7 +48,7 @@ class RemoveRedundantLocalRankRule extends RelOptRule(
}
override def onMatch(call: RelOptRuleCall): Unit = {
val globalRank: BatchExecRank = call.rel(0)
val globalRank: BatchPhysicalRank = call.rel(0)
val inputOfLocalRank: RelNode = call.rel(2)
val newGlobalRank = globalRank.copy(globalRank.getTraitSet, List(inputOfLocalRank))
call.transformTo(newGlobalRank)
......
......@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.flink.table.planner.plan.stats._
import org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil
import org.apache.flink.table.planner.{JBoolean, JDouble}
......@@ -331,7 +331,8 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
assertNull(mq.getColumnInterval(rank, 5))
assertNull(mq.getColumnInterval(rank, 6))
rank match {
case r: BatchExecRank if !r.isGlobal => // local batch rank does not output rank function
case r: BatchPhysicalRank if !r.isGlobal =>
// local batch rank does not output rank function
case _ => assertEquals(ValueInterval(bd(1), bd(5)), mq.getColumnInterval(rank, 7))
}
}
......
......@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
import org.apache.calcite.rel.metadata.RelMdUtil
......@@ -307,7 +307,7 @@ class FlinkRelMdDistinctRowCountTest extends FlinkRelMdHandlerTestBase {
assertEquals(2.0, mq.getDistinctRowCount(rank, ImmutableBitSet.of(5), null))
assertEquals(null, mq.getDistinctRowCount(rank, ImmutableBitSet.of(6), null))
rank match {
case r: BatchExecRank if !r.isGlobal => // local rank does not output rank func
case r: BatchPhysicalRank if !r.isGlobal => // local rank does not output rank func
case _ =>
assertEquals(5.0, mq.getDistinctRowCount(rank, ImmutableBitSet.of(7), null))
}
......
......@@ -448,7 +448,7 @@ class FlinkRelMdHandlerTestBase {
outputRankNumber = true
)
val batchLocalRank = new BatchExecRank(
val batchLocalRank = new BatchPhysicalRank(
cluster,
batchPhysicalTraits,
studentBatchScan,
......@@ -464,7 +464,7 @@ class FlinkRelMdHandlerTestBase {
val hash6 = FlinkRelDistribution.hash(Array(6), requireStrict = true)
val batchExchange = new BatchPhysicalExchange(
cluster, batchLocalRank.getTraitSet.replace(hash6), batchLocalRank, hash6)
val batchGlobalRank = new BatchExecRank(
val batchGlobalRank = new BatchPhysicalRank(
cluster,
batchPhysicalTraits,
batchExchange,
......@@ -530,7 +530,7 @@ class FlinkRelMdHandlerTestBase {
outputRankNumber = true
)
val batchLocalRank = new BatchExecRank(
val batchLocalRank = new BatchPhysicalRank(
cluster,
batchPhysicalTraits,
studentBatchScan,
......@@ -546,7 +546,7 @@ class FlinkRelMdHandlerTestBase {
val hash6 = FlinkRelDistribution.hash(Array(6), requireStrict = true)
val batchExchange = new BatchPhysicalExchange(
cluster, batchLocalRank.getTraitSet.replace(hash6), batchLocalRank, hash6)
val batchGlobalRank = new BatchExecRank(
val batchGlobalRank = new BatchPhysicalRank(
cluster,
batchPhysicalTraits,
batchExchange,
......
......@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.ImmutableBitSet
......@@ -179,7 +179,7 @@ class FlinkRelMdPopulationSizeTest extends FlinkRelMdHandlerTestBase {
assertNull(mq.getPopulationSize(rank, ImmutableBitSet.of(6)))
assertEquals(50.0, mq.getPopulationSize(rank, ImmutableBitSet.of(0, 2)))
rank match {
case r: BatchExecRank =>
case r: BatchPhysicalRank =>
// local batch rank does not output rank func
// TODO re-check this
if (r.isGlobal) {
......
......@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalExpand
import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalDataStreamTableScan, FlinkLogicalExpand, FlinkLogicalOverAggregate}
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecRank, BatchPhysicalCalc}
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalRank, BatchPhysicalCalc}
import org.apache.flink.table.planner.plan.utils.ExpandUtil
import com.google.common.collect.{ImmutableList, Lists}
......@@ -196,7 +196,7 @@ class FlinkRelMdSelectivityTest extends FlinkRelMdHandlerTestBase {
assertEquals(1.0 / 7.0, mq.getSelectivity(rank, condition1))
rank match {
case r: BatchExecRank if !r.isGlobal => // batch local rank does not output rank fun
case r: BatchPhysicalRank if !r.isGlobal => // batch local rank does not output rank fun
case _ =>
// rk > 2
val condition2 =
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册