提交 a25b9b84 编写于 作者: 龙三 提交者: godfreyhe

[FLINK-20736][table-planner-blink] Introduce StreamPhysicalRank &...

[FLINK-20736][table-planner-blink] Introduce StreamPhysicalRank & StreamPhysicalLimit, and make StreamExecRank & StreamExeLimit only extended from ExecNode

This closes #14472
上级 677d392a
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.types.logical.LogicalType;
/**
* Stream {@link ExecNode} for Limit.
*/
public class StreamExecLimit extends StreamExecRank {
private final long limitEnd;
public StreamExecLimit(
long limitStart,
long limitEnd,
boolean generateUpdateBefore,
boolean needRetraction,
ExecEdge inputEdge,
LogicalType outputType,
String description) {
super(
RankType.ROW_NUMBER,
new int[0],
new int[0],
new boolean[0],
new boolean[0],
new ConstantRankRange(limitStart + 1, limitEnd),
getRankStrategy(needRetraction),
false,
generateUpdateBefore,
inputEdge,
outputType,
description);
this.limitEnd = limitEnd;
}
private static RankProcessStrategy getRankStrategy(boolean needRetraction) {
if (needRetraction) {
return RankProcessStrategy.RETRACT_STRATEGY;
} else {
return RankProcessStrategy.APPEND_FAST_STRATEGY;
}
}
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
if (limitEnd == Long.MAX_VALUE) {
throw new TableException(
"FETCH is missed, which on streaming table is not supported currently.");
}
return super.translateToPlanInternal(planner);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction;
import org.apache.flink.table.runtime.operators.rank.ComparableRecordComparator;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
import org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import java.util.stream.IntStream;
/**
* Stream {@link ExecNode} for Rank.
*/
public class StreamExecRank extends ExecNodeBase<RowData> implements StreamExecNode<RowData> {
// It is a experimental config, will may be removed later.
@Experimental
public static final ConfigOption<Long> TABLE_EXEC_TOPN_CACHE_SIZE =
ConfigOptions.key("table.exec.topn.cache-size")
.longType()
.defaultValue(10000L)
.withDescription("TopN operator has a cache which caches partial state contents to reduce" +
" state access. Cache size is the number of records in each TopN task.");
private final RankType rankType;
private final int[] partitionFields;
private final int[] sortFields;
private final boolean[] sortDirections;
private final boolean[] nullsIsLast;
private final RankRange rankRange;
private final RankProcessStrategy rankStrategy;
private final boolean outputRankNumber;
private final boolean generateUpdateBefore;
public StreamExecRank(
RankType rankType,
int[] partitionFields,
int[] sortFields,
boolean[] sortDirections,
boolean[] nullsIsLast,
RankRange rankRange,
RankProcessStrategy rankStrategy,
boolean outputRankNumber,
boolean generateUpdateBefore,
ExecEdge inputEdge,
LogicalType outputType,
String description) {
super(Collections.singletonList(inputEdge), outputType, description);
this.rankType = rankType;
this.partitionFields = partitionFields;
this.sortFields = sortFields;
this.sortDirections = sortDirections;
this.nullsIsLast = nullsIsLast;
this.rankRange = rankRange;
this.rankStrategy = rankStrategy;
this.outputRankNumber = outputRankNumber;
this.generateUpdateBefore = generateUpdateBefore;
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
switch (rankType) {
case ROW_NUMBER:
break;
case RANK:
throw new TableException("RANK() on streaming table is not supported currently");
case DENSE_RANK:
throw new TableException("DENSE_RANK() on streaming table is not supported currently");
default:
throw new TableException(String.format("Streaming tables do not support %s rank function.", rankType));
}
ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
RowType inputType = (RowType) inputNode.getOutputType();
InternalTypeInfo<RowData> inputRowTypeInfo = InternalTypeInfo.of(inputType);
RowDataKeySelector sortKeySelector = KeySelectorUtil.getRowDataSelector(sortFields, inputRowTypeInfo);
LogicalType[] sortKeyTypes = IntStream.of(sortFields).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
int[] sortKeyPositions = IntStream.range(0, sortFields.length).toArray();
TableConfig tableConfig = planner.getTableConfig();
GeneratedRecordComparator sortKeyComparator = ComparatorCodeGenerator.gen(
tableConfig,
"StreamExecSortComparator",
sortKeyPositions,
sortKeyTypes,
sortDirections,
nullsIsLast);
long cacheSize = tableConfig.getConfiguration().getLong(TABLE_EXEC_TOPN_CACHE_SIZE);
long minIdleStateRetentionTime = tableConfig.getMinIdleStateRetentionTime();
long maxIdleStateRetentionTime = tableConfig.getMaxIdleStateRetentionTime();
AbstractTopNFunction processFunction;
if (rankStrategy instanceof RankProcessStrategy.AppendFastStrategy) {
processFunction = new AppendOnlyTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
} else if (rankStrategy instanceof RankProcessStrategy.UpdateFastStrategy) {
RankProcessStrategy.UpdateFastStrategy updateFastStrategy =
(RankProcessStrategy.UpdateFastStrategy) rankStrategy;
int[] primaryKeys = updateFastStrategy.getPrimaryKeys();
RowDataKeySelector rowKeySelector = KeySelectorUtil.getRowDataSelector(
primaryKeys,
inputRowTypeInfo);
processFunction = new UpdatableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
// TODO Use UnaryUpdateTopNFunction after SortedMapState is merged
} else if (rankStrategy instanceof RankProcessStrategy.RetractStrategy) {
EqualiserCodeGenerator equaliserCodeGen = new EqualiserCodeGenerator(
inputType.getFields().stream().map(RowType.RowField::getType).toArray(LogicalType[]::new));
GeneratedRecordEqualiser generatedEqualiser = equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser");
ComparableRecordComparator comparator = new ComparableRecordComparator(
sortKeyComparator,
sortKeyPositions,
sortKeyTypes,
sortDirections,
nullsIsLast);
processFunction = new RetractableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
comparator,
sortKeySelector,
rankType,
rankRange,
generatedEqualiser,
generateUpdateBefore,
outputRankNumber);
} else {
throw new TableException(String.format("rank strategy:%s is not supported.", rankStrategy));
}
KeyedProcessOperator<RowData, RowData, RowData> operator =
new KeyedProcessOperator<>(processFunction);
processFunction.setKeyContext(operator);
OneInputTransformation<RowData, RowData> transform = new OneInputTransformation<>(
inputTransform,
getDesc(),
operator,
InternalTypeInfo.of((RowType) getOutputType()),
inputTransform.getParallelism());
// set KeyType and Selector for state
RowDataKeySelector selector = KeySelectorUtil.getRowDataSelector(partitionFields, inputRowTypeInfo);
transform.setStateKeySelector(selector);
transform.setStateKeyType(selector.getProducedType());
if (inputsContainSingleton()) {
transform.setParallelism(1);
transform.setMaxParallelism(1);
}
return transform;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.utils;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram;
import org.apache.flink.table.planner.plan.trait.RelModifiedMonotonicity;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* Base class of Strategy to choose different rank process function.
*/
public interface RankProcessStrategy {
UndefinedStrategy UNDEFINED_STRATEGY = new UndefinedStrategy();
AppendFastStrategy APPEND_FAST_STRATEGY = new AppendFastStrategy();
RetractStrategy RETRACT_STRATEGY = new RetractStrategy();
/**
* A placeholder strategy which will be inferred after {@link FlinkChangelogModeInferenceProgram}.
*/
class UndefinedStrategy implements RankProcessStrategy {
private UndefinedStrategy() {}
@Override
public String toString() {
return "UndefinedStrategy";
}
}
/**
* A strategy which only works when input only contains insertion changes.
*/
class AppendFastStrategy implements RankProcessStrategy {
private AppendFastStrategy() {}
@Override
public String toString() {
return "AppendFastStrategy";
}
}
/**
* A strategy which works when input contains update or deletion changes.
*/
class RetractStrategy implements RankProcessStrategy {
private RetractStrategy() {}
@Override
public String toString() {
return "RetractStrategy";
}
}
/**
* A strategy which only works when input shouldn't contains deletion changes and input should
* have the given {@link #primaryKeys} and should be monotonic on the order by field.
*/
class UpdateFastStrategy implements RankProcessStrategy {
private final int[] primaryKeys;
public UpdateFastStrategy(int[] primaryKeys) {
this.primaryKeys = primaryKeys;
}
public int[] getPrimaryKeys() {
return primaryKeys;
}
@Override
public String toString() {
return String.format(
"UpdateFastStrategy[%s]", StringUtils.join(primaryKeys, ','));
}
}
/**
* Gets {@link RankProcessStrategy} based on input, partitionKey and orderKey.
*/
static List<RankProcessStrategy> analyzeRankProcessStrategies(
StreamPhysicalRel rank,
ImmutableBitSet partitionKey,
RelCollation orderKey) {
RelMetadataQuery mq = rank.getCluster().getMetadataQuery();
List<RelFieldCollation> fieldCollations = orderKey.getFieldCollations();
boolean isUpdateStream = !ChangelogPlanUtils.inputInsertOnly(rank);
RelNode input = rank.getInput(0);
if (isUpdateStream) {
Set<ImmutableBitSet> uniqueKeys = mq.getUniqueKeys(input);
if (uniqueKeys == null || uniqueKeys.isEmpty()
// unique key should contains partition key
|| uniqueKeys.stream().noneMatch(k -> k.contains(partitionKey))) {
// and we fall back to using retract rank
return Collections.singletonList(RETRACT_STRATEGY);
} else {
FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
RelModifiedMonotonicity monotonicity = fmq.getRelModifiedMonotonicity(input);
boolean isMonotonic = false;
if (monotonicity != null && !fieldCollations.isEmpty()) {
isMonotonic = fieldCollations.stream().allMatch(collation -> {
SqlMonotonicity fieldMonotonicity = monotonicity.fieldMonotonicities()[collation.getFieldIndex()];
RelFieldCollation.Direction direction = collation.direction;
if ((fieldMonotonicity == SqlMonotonicity.DECREASING
|| fieldMonotonicity == SqlMonotonicity.STRICTLY_DECREASING)
&& direction == RelFieldCollation.Direction.ASCENDING) {
// sort field is ascending and its monotonicity is decreasing
return true;
} else if ((fieldMonotonicity == SqlMonotonicity.INCREASING
|| fieldMonotonicity == SqlMonotonicity.STRICTLY_INCREASING)
&& direction == RelFieldCollation.Direction.DESCENDING) {
// sort field is descending and its monotonicity is increasing
return true;
} else {
// sort key is a grouping key of upstream agg, it is monotonic
return fieldMonotonicity == SqlMonotonicity.CONSTANT;
}
});
}
if (isMonotonic) {
//TODO: choose a set of primary key
return Arrays.asList(
new UpdateFastStrategy(uniqueKeys.iterator().next().toArray()),
RETRACT_STRATEGY);
} else {
return Collections.singletonList(RETRACT_STRATEGY);
}
}
} else {
return Collections.singletonList(APPEND_FAST_STRATEGY);
}
}
}
......@@ -50,8 +50,8 @@ import scala.collection.JavaConversions._
/**
* Stream physical RelNode which deduplicate on keys and keeps only first row or last row.
* This node is an optimization of [[StreamExecRank]] for some special cases.
* Compared to [[StreamExecRank]], this node could use mini-batch and access less state.
* This node is an optimization of [[StreamPhysicalRank]] for some special cases.
* Compared to [[StreamPhysicalRank]], this node could use mini-batch and access less state.
* <p>NOTES: only supports sort on proctime now, sort on rowtime will not translated into
* StreamExecDeduplicate node.
*/
......
......@@ -27,16 +27,17 @@ import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector
import org.apache.flink.table.runtime.operators.rank._
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.core.Sort
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy}
import scala.collection.JavaConversions._
......@@ -133,7 +134,7 @@ class StreamExecSortLimit(
// Use RankFunction underlying StreamExecSortLimit
val processFunction = rankStrategy match {
case AppendFastStrategy =>
case _: AppendFastStrategy =>
new AppendOnlyTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
......@@ -146,8 +147,9 @@ class StreamExecSortLimit(
outputRankNumber,
cacheSize)
case UpdateFastStrategy(primaryKeys) =>
val rowKeySelector = KeySelectorUtil.getRowDataSelector(primaryKeys, inputRowTypeInfo)
case updateFastStrategy: UpdateFastStrategy =>
val rowKeySelector = KeySelectorUtil.getRowDataSelector(
updateFastStrategy.getPrimaryKeys, inputRowTypeInfo)
new UpdatableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
......@@ -162,7 +164,7 @@ class StreamExecSortLimit(
cacheSize)
// TODO Use UnaryUpdateTopNFunction after SortedMapState is merged
case RetractStrategy =>
case _: RetractStrategy =>
val equaliserCodeGen = new EqualiserCodeGenerator(inputRowTypeInfo.toRowFieldTypes)
val generatedEqualiser = equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser")
val comparator = new ComparableRecordComparator(
......
......@@ -42,7 +42,7 @@ import scala.collection.JavaConversions._
/**
* Stream physical RelNode for time-ascending-order [[Sort]] without `limit`.
*
* @see [[StreamExecRank]] which must be with `limit` order by.
* @see [[StreamPhysicalRank]] which must be with `limit` order by.
* @see [[StreamExecSort]] which can be used for testing now, its sort key can be any type.
*/
class StreamExecTemporalSort(
......
......@@ -17,20 +17,10 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.TableException
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, RelExplainUtil, SortUtil}
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector
import org.apache.flink.table.runtime.operators.rank.{AppendOnlyTopNFunction, ComparableRecordComparator, ConstantRankRange, RankType, RetractableTopNFunction}
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel._
......@@ -42,7 +32,7 @@ import org.apache.calcite.rex.RexNode
*
* This node will output `limit` records beginning with the first `offset` records without sort.
*/
class StreamExecLimit(
class StreamPhysicalLimit(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
......@@ -55,8 +45,7 @@ class StreamExecLimit(
RelCollations.EMPTY,
offset,
fetch)
with StreamPhysicalRel
with LegacyStreamExecNode[RowData] {
with StreamPhysicalRel {
private lazy val limitStart: Long = SortUtil.getLimitStart(offset)
private lazy val limitEnd: Long = SortUtil.getLimitEnd(offset, fetch)
......@@ -69,7 +58,7 @@ class StreamExecLimit(
newCollation: RelCollation,
offset: RexNode,
fetch: RexNode): Sort = {
new StreamExecLimit(cluster, traitSet, newInput, offset, fetch)
new StreamPhysicalLimit(cluster, traitSet, newInput, offset, fetch)
}
override def explainTerms(pw: RelWriter): RelWriter = {
......@@ -78,90 +67,17 @@ class StreamExecLimit(
.item("fetch", RelExplainUtil.fetchToString(fetch))
}
//~ ExecNode methods -----------------------------------------------------------
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
if (fetch == null) {
throw new TableException(
"FETCH is missed, which on streaming table is not supported currently.")
}
val inputRowTypeInfo = InternalTypeInfo.of(
FlinkTypeFactory.toLogicalRowType(getInput.getRowType))
override def translateToExecNode(): ExecNode[_] = {
val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
val tableConfig = planner.getTableConfig
val minIdleStateRetentionTime = tableConfig.getMinIdleStateRetentionTime
val maxIdleStateRetentionTime = tableConfig.getMaxIdleStateRetentionTime
// rankStart begin with 1
val rankRange = new ConstantRankRange(limitStart + 1, limitEnd)
val rankType = RankType.ROW_NUMBER
val outputRankNumber = false
// Use TopNFunction underlying StreamExecLimit currently
val sortKeySelector = EmptyRowDataKeySelector.INSTANCE
val sortKeyComparator = ComparatorCodeGenerator.gen(
tableConfig, "AlwaysEqualsComparator", Array(), Array(), Array(), Array())
val processFunction = if (ChangelogPlanUtils.inputInsertOnly(this)) {
val cacheSize = tableConfig.getConfiguration.getLong(
StreamExecRank.TABLE_EXEC_TOPN_CACHE_SIZE)
new AppendOnlyTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
val needRetraction = !ChangelogPlanUtils.inputInsertOnly(this)
new StreamExecLimit(
limitStart,
limitEnd,
generateUpdateBefore,
outputRankNumber,
cacheSize)
} else {
val equaliserCodeGen = new EqualiserCodeGenerator(inputRowTypeInfo.toRowFieldTypes)
val generatedEqualiser = equaliserCodeGen.generateRecordEqualiser("LimitValueEqualiser")
val comparator = new ComparableRecordComparator(
sortKeyComparator,
Array(),
Array(),
Array(),
Array())
new RetractableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
comparator,
sortKeySelector,
rankType,
rankRange,
generatedEqualiser,
generateUpdateBefore,
outputRankNumber)
}
val operator = new KeyedProcessOperator(processFunction)
processFunction.setKeyContext(operator)
val inputTransform = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val outputRowTypeInfo = InternalTypeInfo.of(
FlinkTypeFactory.toLogicalRowType(getRowType))
// as input node is singleton exchange, its parallelism is 1.
val ret = new OneInputTransformation(
inputTransform,
getRelDetailedDescription,
operator,
outputRowTypeInfo,
inputTransform.getParallelism)
if (inputsContainSingleton()) {
ret.setParallelism(1)
ret.setMaxParallelism(1)
}
val selector = EmptyRowDataKeySelector.INSTANCE
ret.setStateKeySelector(selector)
ret.setStateKeyType(selector.getProducedType)
ret
needRetraction,
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
......@@ -17,39 +17,27 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.annotation.Experimental
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.ConfigOption
import org.apache.flink.configuration.ConfigOptions.key
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.TableException
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.calcite.Rank
import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
import org.apache.flink.table.planner.plan.utils.{KeySelectorUtil, _}
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.runtime.operators.rank._
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.RelDataTypeField
import org.apache.calcite.util.ImmutableBitSet
import java.lang.{Long => JLong}
import java.util
import scala.collection.JavaConversions._
/**
* Stream physical RelNode for [[Rank]].
*/
class StreamExecRank(
* Stream physical RelNode for [[Rank]].
*/
class StreamPhysicalRank(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
......@@ -70,13 +58,12 @@ class StreamExecRank(
rankRange,
rankNumberType,
outputRankNumber)
with StreamPhysicalRel
with LegacyStreamExecNode[RowData] {
with StreamPhysicalRel {
override def requireWatermark: Boolean = false
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new StreamExecRank(
new StreamPhysicalRank(
cluster,
traitSet,
inputs.get(0),
......@@ -89,8 +76,8 @@ class StreamExecRank(
rankStrategy)
}
def copy(newStrategy: RankProcessStrategy): StreamExecRank = {
new StreamExecRank(
def copy(newStrategy: RankProcessStrategy): StreamPhysicalRank = {
new StreamPhysicalRank(
cluster,
traitSet,
inputRel,
......@@ -114,117 +101,23 @@ class StreamExecRank(
.item("select", getRowType.getFieldNames.mkString(", "))
}
//~ ExecNode methods -----------------------------------------------------------
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
val tableConfig = planner.getTableConfig
rankType match {
case RankType.ROW_NUMBER => // ignore
case RankType.RANK =>
throw new TableException("RANK() on streaming table is not supported currently")
case RankType.DENSE_RANK =>
throw new TableException("DENSE_RANK() on streaming table is not supported currently")
case k =>
throw new TableException(s"Streaming tables do not support $k rank function.")
}
val inputRowTypeInfo = InternalTypeInfo.of(
FlinkTypeFactory.toLogicalRowType(getInput.getRowType))
override def translateToExecNode(): ExecNode[_] = {
val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
val fieldCollations = orderKey.getFieldCollations
val (sortFields, sortDirections, nullsIsLast) = SortUtil.getKeysAndOrders(fieldCollations)
val sortKeySelector = KeySelectorUtil.getRowDataSelector(sortFields, inputRowTypeInfo)
val sortKeyType = sortKeySelector.getProducedType
val sortKeyComparator = ComparatorCodeGenerator.gen(tableConfig, "StreamExecSortComparator",
sortFields.indices.toArray, sortKeyType.toRowFieldTypes, sortDirections, nullsIsLast)
val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
val cacheSize = tableConfig.getConfiguration.getLong(StreamExecRank.TABLE_EXEC_TOPN_CACHE_SIZE)
val minIdleStateRetentionTime = tableConfig.getMinIdleStateRetentionTime
val maxIdleStateRetentionTime = tableConfig.getMaxIdleStateRetentionTime
val processFunction = rankStrategy match {
case AppendFastStrategy =>
new AppendOnlyTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize)
case UpdateFastStrategy(primaryKeys) =>
val rowKeySelector = KeySelectorUtil.getRowDataSelector(primaryKeys, inputRowTypeInfo)
new UpdatableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize)
// TODO Use UnaryUpdateTopNFunction after SortedMapState is merged
case RetractStrategy =>
val equaliserCodeGen = new EqualiserCodeGenerator(inputRowTypeInfo.toRowFieldTypes)
val generatedEqualiser = equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser")
val comparator = new ComparableRecordComparator(
sortKeyComparator,
sortFields.indices.toArray,
sortKeyType.toRowFieldTypes,
sortDirections,
nullsIsLast)
new RetractableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
comparator,
sortKeySelector,
rankType,
rankRange,
generatedEqualiser,
generateUpdateBefore,
outputRankNumber)
}
val operator = new KeyedProcessOperator(processFunction)
processFunction.setKeyContext(operator)
val inputTransform = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val outputRowTypeInfo = InternalTypeInfo.of(
FlinkTypeFactory.toLogicalRowType(getRowType))
val ret = new OneInputTransformation(
inputTransform,
getRelDetailedDescription,
operator,
outputRowTypeInfo,
inputTransform.getParallelism)
if (inputsContainSingleton()) {
ret.setParallelism(1)
ret.setMaxParallelism(1)
}
// set KeyType and Selector for state
val selector = KeySelectorUtil.getRowDataSelector(partitionKey.toArray, inputRowTypeInfo)
ret.setStateKeySelector(selector)
ret.setStateKeyType(selector.getProducedType)
ret
new StreamExecRank(
rankType,
partitionKey.toArray,
sortFields,
sortDirections,
nullsIsLast,
rankRange,
rankStrategy,
outputRankNumber,
generateUpdateBefore,
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
object StreamExecRank {
// It is a experimental config, will may be removed later.
@Experimental
val TABLE_EXEC_TOPN_CACHE_SIZE: ConfigOption[JLong] =
key("table.exec.topn.cache-size")
.defaultValue(JLong.valueOf(10000L))
.withDescription("TopN operator has a cache which caches partial state contents to reduce" +
" state access. Cache size is the number of records in each TopN task.")
}
......@@ -27,9 +27,9 @@ import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.runtime.operators.join.FlinkJoinType
import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, StreamTableSink, UpsertStreamTableSink}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy}
import scala.collection.JavaConversions._
......@@ -210,7 +210,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(window, children, providedTrait, requiredTrait, requester)
case limit: StreamExecLimit =>
case limit: StreamPhysicalLimit =>
// limit support all changes in input
val children = visitChildren(limit, ModifyKindSetTrait.ALL_CHANGES)
val providedTrait = if (getModifyKindSet(children.head).isInsertOnly) {
......@@ -220,7 +220,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
}
createNewNode(limit, children, providedTrait, requiredTrait, requester)
case _: StreamExecRank | _: StreamExecSortLimit =>
case _: StreamPhysicalRank | _: StreamExecSortLimit =>
// Rank and SortLimit supports consuming all changes
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
createNewNode(
......@@ -462,7 +462,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
visitSink(sink, sinkRequiredTraits)
case _: StreamExecGroupAggregate | _: StreamExecGroupTableAggregate |
_: StreamExecLimit | _: StreamExecPythonGroupAggregate |
_: StreamPhysicalLimit | _: StreamExecPythonGroupAggregate |
_: StreamExecPythonGroupTableAggregate =>
// Aggregate, TableAggregate and Limit requires update_before if there are updates
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
......@@ -479,7 +479,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredTrait)
case rank: StreamExecRank =>
case rank: StreamPhysicalRank =>
val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies(
rank, rank.partitionKey, rank.orderKey)
visitRankStrategies(rankStrategies, requiredTrait, rankStrategy => rank.copy(rankStrategy))
......@@ -724,9 +724,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
// return the first satisfied converted node
for (strategy <- rankStrategies) {
val requiredChildrenTrait = strategy match {
case UpdateFastStrategy(_) => UpdateKindTrait.ONLY_UPDATE_AFTER
case RetractStrategy => UpdateKindTrait.BEFORE_AND_AFTER
case AppendFastStrategy => UpdateKindTrait.NONE
case _: UpdateFastStrategy => UpdateKindTrait.ONLY_UPDATE_AFTER
case _: RetractStrategy => UpdateKindTrait.BEFORE_AND_AFTER
case _: AppendFastStrategy => UpdateKindTrait.NONE
}
val node = applyRankStrategy(strategy)
val children = visitChildren(node, requiredChildrenTrait)
......
......@@ -409,11 +409,11 @@ object FlinkStreamRuleSets {
StreamPhysicalUnionRule.INSTANCE,
// sort
StreamExecSortRule.INSTANCE,
StreamExecLimitRule.INSTANCE,
StreamPhysicalLimitRule.INSTANCE,
StreamExecSortLimitRule.INSTANCE,
StreamExecTemporalSortRule.INSTANCE,
// rank
StreamExecRankRule.INSTANCE,
StreamPhysicalRankRule.INSTANCE,
StreamExecDeduplicateRule.RANK_INSTANCE,
// expand
StreamPhysicalExpandRule.INSTANCE,
......
......@@ -22,7 +22,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDeduplicate, StreamExecRank}
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDeduplicate, StreamPhysicalRank}
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
......@@ -35,8 +35,8 @@ import org.apache.calcite.rel.{RelCollation, RelNode}
* limits 1 and its rank type is ROW_NUMBER, and converts it to [[StreamExecDeduplicate]].
*
* NOTES: Queries that can be converted to [[StreamExecDeduplicate]] could be converted to
* [[StreamExecRank]] too. [[StreamExecDeduplicate]] is more efficient than [[StreamExecRank]]
* due to mini-batch and less state access.
* [[StreamPhysicalRank]] too. [[StreamExecDeduplicate]] is more efficient than
* [[StreamPhysicalRank]] due to mini-batch and less state access.
*
* e.g.
* 1. {{{
......
......@@ -24,7 +24,7 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSortL
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.planner.plan.utils.UndefinedStrategy
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy
/**
* Rule that matches [[FlinkLogicalSort]] with non-empty sort fields and non-null fetch or offset,
......@@ -61,7 +61,7 @@ class StreamExecSortLimitRule
sort.collation,
sort.offset,
sort.fetch,
UndefinedStrategy)
RankProcessStrategy.UNDEFINED_STRATEGY)
}
}
......
......@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLimit
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
......@@ -29,14 +29,14 @@ import org.apache.calcite.rel.convert.ConverterRule
/**
* Rule that matches [[FlinkLogicalSort]] with empty sort fields,
* and converts it to [[StreamExecLimit]].
* and converts it to [[StreamPhysicalLimit]].
*/
class StreamExecLimitRule
class StreamPhysicalLimitRule
extends ConverterRule(
classOf[FlinkLogicalSort],
FlinkConventions.LOGICAL,
FlinkConventions.STREAM_PHYSICAL,
"StreamExecLimitRule") {
"StreamPhysicalLimitRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
......@@ -54,9 +54,9 @@ class StreamExecLimitRule
.replace(FlinkRelDistribution.SINGLETON)
val newInput = RelOptRule.convert(input, newTraitSet)
// create StreamExecLimit
// create StreamPhysicalLimit
val providedGlobalTraitSet = newTraitSet
new StreamExecLimit(
new StreamPhysicalLimit(
rel.getCluster,
providedGlobalTraitSet,
newInput,
......@@ -65,6 +65,6 @@ class StreamExecLimitRule
}
}
object StreamExecLimitRule {
val INSTANCE: RelOptRule = new StreamExecLimitRule
object StreamPhysicalLimitRule {
val INSTANCE: RelOptRule = new StreamPhysicalLimitRule
}
......@@ -21,22 +21,22 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDeduplicate, StreamExecRank}
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.planner.plan.utils.UndefinedStrategy
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy
/**
* Rule that converts [[FlinkLogicalRank]] with fetch to [[StreamExecRank]].
* Rule that converts [[FlinkLogicalRank]] with fetch to [[StreamPhysicalRank]].
* NOTES: the rank can not be converted to [[StreamExecDeduplicate]].
*/
class StreamExecRankRule
class StreamPhysicalRankRule
extends ConverterRule(
classOf[FlinkLogicalRank],
FlinkConventions.LOGICAL,
FlinkConventions.STREAM_PHYSICAL,
"StreamExecRankRule") {
"StreamPhysicalRankRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
......@@ -57,7 +57,7 @@ class StreamExecRankRule
val providedTraitSet = rank.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val newInput: RelNode = RelOptRule.convert(input, requiredTraitSet)
new StreamExecRank(
new StreamPhysicalRank(
rank.getCluster,
providedTraitSet,
newInput,
......@@ -67,10 +67,10 @@ class StreamExecRankRule
rank.rankRange,
rank.rankNumberType,
rank.outputRankNumber,
UndefinedStrategy)
RankProcessStrategy.UNDEFINED_STRATEGY)
}
}
object StreamExecRankRule {
val INSTANCE: RelOptRule = new StreamExecRankRule
object StreamPhysicalRankRule {
val INSTANCE: RelOptRule = new StreamPhysicalRankRule
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.utils
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram
import org.apache.calcite.rel.RelFieldCollation.Direction
import org.apache.calcite.rel.{RelCollation, RelNode}
import org.apache.calcite.sql.validate.SqlMonotonicity
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel
import scala.collection.JavaConversions._
/**
* Base class of Strategy to choose different rank process function.
*/
sealed trait RankProcessStrategy
/**
* A placeholder strategy which will be inferred after [[FlinkChangelogModeInferenceProgram]]
*/
case object UndefinedStrategy extends RankProcessStrategy
/**
* A strategy which only works when input only contains insertion changes
*/
case object AppendFastStrategy extends RankProcessStrategy
/**
* A strategy which works when input contains update or deletion changes
*/
case object RetractStrategy extends RankProcessStrategy
/**
* A strategy which only works when input shouldn't contains deletion changes and input should
* have the given [[primaryKeys]] and should be monotonic on the order by field.
*/
case class UpdateFastStrategy(primaryKeys: Array[Int]) extends RankProcessStrategy {
override def toString: String = "UpdateFastStrategy" + primaryKeys.mkString("[", ",", "]")
}
object RankProcessStrategy {
/**
* Gets [[RankProcessStrategy]] based on input, partitionKey and orderKey.
*/
def analyzeRankProcessStrategies(
rank: StreamPhysicalRel,
partitionKey: ImmutableBitSet,
orderKey: RelCollation): Seq[RankProcessStrategy] = {
val mq = rank.getCluster.getMetadataQuery
val fieldCollations = orderKey.getFieldCollations
val isUpdateStream = !ChangelogPlanUtils.inputInsertOnly(rank)
val input = rank.getInput(0)
if (isUpdateStream) {
val uniqueKeys = mq.getUniqueKeys(input)
if (uniqueKeys == null || uniqueKeys.isEmpty
// unique key should contains partition key
|| !uniqueKeys.exists(k => k.contains(partitionKey))) {
// and we fall back to using retract rank
Seq(RetractStrategy)
} else {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
val monotonicity = fmq.getRelModifiedMonotonicity(input)
val isMonotonic = if (monotonicity == null) {
false
} else {
if (fieldCollations.isEmpty) {
false
} else {
fieldCollations.forall { collation =>
val fieldMonotonicity = monotonicity.fieldMonotonicities(collation.getFieldIndex)
val direction = collation.direction
if ((fieldMonotonicity == SqlMonotonicity.DECREASING
|| fieldMonotonicity == SqlMonotonicity.STRICTLY_DECREASING)
&& direction == Direction.ASCENDING) {
// sort field is ascending and its monotonicity is decreasing
true
} else if ((fieldMonotonicity == SqlMonotonicity.INCREASING
|| fieldMonotonicity == SqlMonotonicity.STRICTLY_INCREASING)
&& direction == Direction.DESCENDING) {
// sort field is descending and its monotonicity is increasing
true
} else if (fieldMonotonicity == SqlMonotonicity.CONSTANT) {
// sort key is a grouping key of upstream agg, it is monotonic
true
} else {
false
}
}
}
}
if (isMonotonic) {
//FIXME choose a set of primary key
Seq(UpdateFastStrategy(uniqueKeys.iterator().next().toArray), RetractStrategy)
} else {
Seq(RetractStrategy)
}
}
} else {
Seq(AppendFastStrategy)
}
}
}
......@@ -363,10 +363,10 @@ class FlinkRelMdHandlerTestBase {
FlinkRelDistribution.SINGLETON),
logicalSort.offset, logicalSort.fetch, true)
val streamSort = new StreamExecLimit(cluster, streamPhysicalTraits.replace(collation),
val streamLimit = new StreamPhysicalLimit(cluster, streamPhysicalTraits.replace(collation),
studentStreamScan, logicalSort.offset, logicalSort.fetch)
(logicalSort, flinkLogicalSort, batchSort, batchSortLocal, batchSortGlobal, streamSort)
(logicalSort, flinkLogicalSort, batchSort, batchSortLocal, batchSortGlobal, streamLimit)
}
// equivalent SQL is
......@@ -408,7 +408,7 @@ class FlinkRelMdHandlerTestBase {
collection, offset, fetch, true)
val streamSort = new StreamExecSortLimit(cluster, streamPhysicalTraits.replace(collection),
studentStreamScan, collection, offset, fetch, UndefinedStrategy)
studentStreamScan, collection, offset, fetch, RankProcessStrategy.UNDEFINED_STRATEGY)
(logicalSortLimit, flinkLogicalSortLimit,
batchSortLimit, batchSortLocalLimit, batchSortGlobal, streamSort)
......@@ -480,7 +480,7 @@ class FlinkRelMdHandlerTestBase {
val streamExchange = new BatchPhysicalExchange(cluster,
studentStreamScan.getTraitSet.replace(hash6), studentStreamScan, hash6)
val streamRank = new StreamExecRank(
val streamRank = new StreamPhysicalRank(
cluster,
streamPhysicalTraits,
streamExchange,
......@@ -490,7 +490,7 @@ class FlinkRelMdHandlerTestBase {
new ConstantRankRange(1, 5),
new RelDataTypeFieldImpl("rk", 7, longType),
outputRankNumber = true,
UndefinedStrategy
RankProcessStrategy.UNDEFINED_STRATEGY
)
(logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank)
......@@ -562,7 +562,7 @@ class FlinkRelMdHandlerTestBase {
val streamExchange = new BatchPhysicalExchange(cluster,
studentStreamScan.getTraitSet.replace(hash6), studentStreamScan, hash6)
val streamRank = new StreamExecRank(
val streamRank = new StreamPhysicalRank(
cluster,
streamPhysicalTraits,
streamExchange,
......@@ -572,7 +572,7 @@ class FlinkRelMdHandlerTestBase {
new ConstantRankRange(3, 5),
new RelDataTypeFieldImpl("rk", 7, longType),
outputRankNumber = true,
UndefinedStrategy
RankProcessStrategy.UNDEFINED_STRATEGY
)
(logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank)
......@@ -611,7 +611,7 @@ class FlinkRelMdHandlerTestBase {
val singleton = FlinkRelDistribution.SINGLETON
val streamExchange = new BatchPhysicalExchange(cluster,
studentStreamScan.getTraitSet.replace(singleton), studentStreamScan, singleton)
val streamRowNumber = new StreamExecRank(
val streamRowNumber = new StreamPhysicalRank(
cluster,
streamPhysicalTraits,
streamExchange,
......@@ -621,7 +621,7 @@ class FlinkRelMdHandlerTestBase {
new ConstantRankRange(3, 6),
new RelDataTypeFieldImpl("rn", 7, longType),
outputRankNumber = true,
UndefinedStrategy
RankProcessStrategy.UNDEFINED_STRATEGY
)
(logicalRowNumber, flinkLogicalRowNumber, streamRowNumber)
......@@ -762,7 +762,7 @@ class FlinkRelMdHandlerTestBase {
outputRankNumber = true
)
val streamRankWithVariableRange = new StreamExecRank(
val streamRankWithVariableRange = new StreamPhysicalRank(
cluster,
logicalTraits,
studentStreamScan,
......@@ -772,7 +772,7 @@ class FlinkRelMdHandlerTestBase {
new VariableRankRange(3),
new RelDataTypeFieldImpl("rk", 7, longType),
outputRankNumber = true,
UndefinedStrategy
RankProcessStrategy.UNDEFINED_STRATEGY
)
(logicalRankWithVariableRange, flinkLogicalRankWithVariableRange, streamRankWithVariableRange)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册