提交 bc782b6d 编写于 作者: 龙三 提交者: godfreyhe

[FLINK-20766][table-planner-blink] Introduce StreamPhysicalSortLimit, and make...

[FLINK-20766][table-planner-blink] Introduce StreamPhysicalSortLimit, and make StreamExecSortLimit only extends from ExecNode.

This closes #14502
上级 6c75072f
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.utils.PartitionSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.SortSpec;
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.types.logical.RowType;
/** {@link StreamExecNode} for Sort with limit. */
public class StreamExecSortLimit extends StreamExecRank {
private final long limitEnd;
public StreamExecSortLimit(
SortSpec sortSpec,
long limitStart,
long limitEnd,
RankProcessStrategy rankStrategy,
boolean generateUpdateBefore,
ExecEdge inputEdge,
RowType outputType,
String description) {
super(
RankType.ROW_NUMBER,
PartitionSpec.ALL_IN_ONE,
sortSpec,
new ConstantRankRange(limitStart + 1, limitEnd),
rankStrategy,
false,
generateUpdateBefore,
inputEdge,
outputType,
description);
this.limitEnd = limitEnd;
}
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
if (limitEnd == Long.MAX_VALUE) {
throw new TableException(
"FETCH is missed, which on streaming table is not supported currently.");
}
return super.translateToPlanInternal(planner);
}
}
......@@ -17,27 +17,16 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.TableException
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector
import org.apache.flink.table.runtime.operators.rank._
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.core.Sort
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy}
import scala.collection.JavaConversions._
......@@ -46,7 +35,7 @@ import scala.collection.JavaConversions._
*
* This RelNode take the `limit` elements beginning with the first `offset` elements.
**/
class StreamExecSortLimit(
class StreamPhysicalSortLimit(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
......@@ -55,8 +44,7 @@ class StreamExecSortLimit(
fetch: RexNode,
rankStrategy: RankProcessStrategy)
extends Sort(cluster, traitSet, inputRel, sortCollation, offset, fetch)
with StreamPhysicalRel
with LegacyStreamExecNode[RowData] {
with StreamPhysicalRel {
private val limitStart: Long = SortUtil.getLimitStart(offset)
private val limitEnd: Long = SortUtil.getLimitEnd(offset, fetch)
......@@ -69,11 +57,12 @@ class StreamExecSortLimit(
newCollation: RelCollation,
offset: RexNode,
fetch: RexNode): Sort = {
new StreamExecSortLimit(cluster, traitSet, newInput, newCollation, offset, fetch, rankStrategy)
new StreamPhysicalSortLimit(
cluster, traitSet, newInput, newCollation, offset, fetch, rankStrategy)
}
def copy(newStrategy: RankProcessStrategy): StreamExecSortLimit = {
new StreamExecSortLimit(cluster, traitSet, input, sortCollation, offset, fetch, newStrategy)
def copy(newStrategy: RankProcessStrategy): StreamPhysicalSortLimit = {
new StreamPhysicalSortLimit(cluster, traitSet, input, sortCollation, offset, fetch, newStrategy)
}
override def explainTerms(pw: RelWriter): RelWriter = {
......@@ -98,114 +87,17 @@ class StreamExecSortLimit(
}
}
//~ ExecNode methods -----------------------------------------------------------
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
if (fetch == null) {
throw new TableException(
"FETCH is missed, which on streaming table is not supported currently")
}
val inputTransform = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val inputRowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
val fieldCollations = sortCollation.getFieldCollations
val (sortFields, sortDirections, nullsIsLast) = SortUtil.getKeysAndOrders(fieldCollations)
val sortKeySelector = KeySelectorUtil.getRowDataSelector(sortFields, inputRowTypeInfo)
val sortKeyType = sortKeySelector.getProducedType
val tableConfig = planner.getTableConfig
val sortKeyComparator = ComparatorCodeGenerator.gen(
tableConfig,
"StreamExecSortComparator",
sortFields.indices.toArray,
sortKeyType.toRowFieldTypes,
sortDirections,
nullsIsLast)
override def translateToExecNode(): ExecNode[_] = {
val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
val cacheSize = tableConfig.getConfiguration.getLong(StreamExecRank.TABLE_EXEC_TOPN_CACHE_SIZE)
val minIdleStateRetentionTime = tableConfig.getMinIdleStateRetentionTime
val maxIdleStateRetentionTime = tableConfig.getMaxIdleStateRetentionTime
// rankStart begin with 1
val rankRange = new ConstantRankRange(limitStart + 1, limitEnd)
val rankType = RankType.ROW_NUMBER
val outputRankNumber = false
// Use RankFunction underlying StreamExecSortLimit
val processFunction = rankStrategy match {
case _: AppendFastStrategy =>
new AppendOnlyTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize)
case updateFastStrategy: UpdateFastStrategy =>
val rowKeySelector = KeySelectorUtil.getRowDataSelector(
updateFastStrategy.getPrimaryKeys, inputRowTypeInfo)
new UpdatableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize)
// TODO Use UnaryUpdateTopNFunction after SortedMapState is merged
case _: RetractStrategy =>
val equaliserCodeGen = new EqualiserCodeGenerator(inputRowTypeInfo.toRowFieldTypes)
val generatedEqualiser = equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser")
val comparator = new ComparableRecordComparator(
sortKeyComparator,
sortFields.indices.toArray,
sortKeyType.toRowFieldTypes,
sortDirections,
nullsIsLast)
new RetractableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
comparator,
sortKeySelector,
rankType,
rankRange,
generatedEqualiser,
generateUpdateBefore,
outputRankNumber)
}
val operator = new KeyedProcessOperator(processFunction)
processFunction.setKeyContext(operator)
val outputRowTypeInfo = InternalTypeInfo.of(
FlinkTypeFactory.toLogicalRowType(getRowType))
val ret = new OneInputTransformation(
inputTransform,
getRelDetailedDescription,
operator,
outputRowTypeInfo,
inputTransform.getParallelism)
if (inputsContainSingleton()) {
ret.setParallelism(1)
ret.setMaxParallelism(1)
}
val selector = EmptyRowDataKeySelector.INSTANCE
ret.setStateKeySelector(selector)
ret.setStateKeyType(selector.getProducedType)
ret
new StreamExecSortLimit(
SortUtil.getSortSpec(sortCollation.getFieldCollations),
limitStart,
limitEnd,
rankStrategy,
generateUpdateBefore,
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
......@@ -220,7 +220,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
}
createNewNode(limit, children, providedTrait, requiredTrait, requester)
case _: StreamPhysicalRank | _: StreamExecSortLimit =>
case _: StreamPhysicalRank | _: StreamPhysicalSortLimit =>
// Rank and SortLimit supports consuming all changes
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
createNewNode(
......@@ -484,7 +484,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
rank, rank.partitionKey, rank.orderKey)
visitRankStrategies(rankStrategies, requiredTrait, rankStrategy => rank.copy(rankStrategy))
case sortLimit: StreamExecSortLimit =>
case sortLimit: StreamPhysicalSortLimit =>
val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies(
sortLimit, ImmutableBitSet.of(), sortLimit.getCollation)
visitRankStrategies(
......
......@@ -412,7 +412,7 @@ object FlinkStreamRuleSets {
// sort
StreamPhysicalSortRule.INSTANCE,
StreamPhysicalLimitRule.INSTANCE,
StreamExecSortLimitRule.INSTANCE,
StreamPhysicalSortLimitRule.INSTANCE,
StreamExecTemporalSortRule.INSTANCE,
// rank
StreamPhysicalRankRule.INSTANCE,
......
......@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSortLimit
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSortLimit
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
......@@ -28,14 +28,14 @@ import org.apache.flink.table.planner.plan.utils.RankProcessStrategy
/**
* Rule that matches [[FlinkLogicalSort]] with non-empty sort fields and non-null fetch or offset,
* and converts it to [[StreamExecSortLimit]].
* and converts it to [[StreamPhysicalSortLimit]].
*/
class StreamExecSortLimitRule
class StreamPhysicalSortLimitRule
extends ConverterRule(
classOf[FlinkLogicalSort],
FlinkConventions.LOGICAL,
FlinkConventions.STREAM_PHYSICAL,
"StreamExecSortLimitRule") {
"StreamPhysicalSortLimitRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
......@@ -54,7 +54,7 @@ class StreamExecSortLimitRule
.replace(FlinkConventions.STREAM_PHYSICAL)
val newInput: RelNode = RelOptRule.convert(input, requiredTraitSet)
new StreamExecSortLimit(
new StreamPhysicalSortLimit(
rel.getCluster,
providedTraitSet,
newInput,
......@@ -65,6 +65,6 @@ class StreamExecSortLimitRule
}
}
object StreamExecSortLimitRule {
val INSTANCE: RelOptRule = new StreamExecSortLimitRule
object StreamPhysicalSortLimitRule {
val INSTANCE: RelOptRule = new StreamPhysicalSortLimitRule
}
......@@ -406,7 +406,7 @@ class FlinkRelMdHandlerTestBase {
FlinkRelDistribution.SINGLETON),
collection, offset, fetch, true)
val streamSort = new StreamExecSortLimit(cluster, streamPhysicalTraits.replace(collection),
val streamSort = new StreamPhysicalSortLimit(cluster, streamPhysicalTraits.replace(collection),
studentStreamScan, collection, offset, fetch, RankProcessStrategy.UNDEFINED_STRATEGY)
(logicalSortLimit, flinkLogicalSortLimit,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册