提交 91f93d9c 编写于 作者: 龙三 提交者: godfreyhe

[FLINK-20766][table-planner-blink] Introduce StreamPhysicalTemporalSort, and...

[FLINK-20766][table-planner-blink] Introduce StreamPhysicalTemporalSort, and make StreamExecTemporalSort only extended from ExecNode.

This closes #14502
上级 bc782b6d
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.utils.SortSpec;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.TimestampType;
import java.util.Collections;
/** {@link StreamExecNode} for time-ascending-order Sort without `limit`. */
public class StreamExecTemporalSort extends ExecNodeBase<RowData>
implements StreamExecNode<RowData> {
private final SortSpec sortSpec;
public StreamExecTemporalSort(
SortSpec sortSpec, ExecEdge inputEdge, RowType outputType, String description) {
super(Collections.singletonList(inputEdge), outputType, description);
this.sortSpec = sortSpec;
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
// time ordering needs to be ascending
if (sortSpec.getFieldSize() == 0 || !sortSpec.getFieldSpec(0).getIsAscendingOrder()) {
throw new TableException(
"Sort: Primary sort order of a streaming table must be ascending on time.\n"
+ "please re-check sort statement according to the description above");
}
ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
RowType inputType = (RowType) inputNode.getOutputType();
LogicalType timeType = inputType.getTypeAt(sortSpec.getFieldSpec(0).getFieldIndex());
TableConfig config = planner.getTableConfig();
if (timeType instanceof TimestampType) {
TimestampType keyType = (TimestampType) timeType;
if (keyType.getKind() == TimestampKind.ROWTIME) {
return createSortRowTime(inputType, inputTransform, config);
} else if (keyType.getKind() == TimestampKind.PROCTIME) {
return createSortProcTime(inputType, inputTransform, config);
}
}
throw new TableException(
String.format(
"Sort: Internal Error\n"
+ "First field in temporal sort is not a time attribute, %s is given.",
timeType));
}
/** Create Sort logic based on processing time. */
private Transformation<RowData> createSortProcTime(
RowType inputType, Transformation<RowData> inputTransform, TableConfig tableConfig) {
// if the order has secondary sorting fields in addition to the proctime
if (sortSpec.getFieldSize() > 1) {
// skip the first field which is the proctime field and would be ordered by timer.
SortSpec specExcludeTime = sortSpec.createSubSortSpec(1);
GeneratedRecordComparator rowComparator =
ComparatorCodeGenerator.gen(
tableConfig,
"ProcTimeSortComparator",
specExcludeTime.getFieldIndices(),
specExcludeTime.getFieldTypes(inputType),
specExcludeTime.getAscendingOrders(),
specExcludeTime.getNullsIsLast());
ProcTimeSortOperator sortOperator =
new ProcTimeSortOperator(InternalTypeInfo.of(inputType), rowComparator);
OneInputTransformation<RowData, RowData> transform =
new OneInputTransformation<>(
inputTransform,
getDesc(),
sortOperator,
InternalTypeInfo.of(inputType),
inputTransform.getParallelism());
// as input node is singleton exchange, its parallelism is 1.
if (inputsContainSingleton()) {
transform.setParallelism(1);
transform.setMaxParallelism(1);
}
EmptyRowDataKeySelector selector = EmptyRowDataKeySelector.INSTANCE;
transform.setStateKeySelector(selector);
transform.setStateKeyType(selector.getProducedType());
return transform;
} else {
// if the order is done only on proctime we only need to forward the elements
return inputTransform;
}
}
/** Create Sort logic based on row time. */
private Transformation<RowData> createSortRowTime(
RowType inputType, Transformation<RowData> inputTransform, TableConfig tableConfig) {
GeneratedRecordComparator rowComparator = null;
if (sortSpec.getFieldSize() > 1) {
// skip the first field which is the rowtime field and would be ordered by timer.
SortSpec specExcludeTime = sortSpec.createSubSortSpec(1);
rowComparator =
ComparatorCodeGenerator.gen(
tableConfig,
"RowTimeSortComparator",
specExcludeTime.getFieldIndices(),
specExcludeTime.getFieldTypes(inputType),
specExcludeTime.getAscendingOrders(),
specExcludeTime.getNullsIsLast());
}
RowTimeSortOperator sortOperator =
new RowTimeSortOperator(
InternalTypeInfo.of(inputType),
sortSpec.getFieldSpec(0).getFieldIndex(),
rowComparator);
OneInputTransformation<RowData, RowData> transform =
new OneInputTransformation<>(
inputTransform,
getDesc(),
sortOperator,
InternalTypeInfo.of(inputType),
inputTransform.getParallelism());
if (inputsContainSingleton()) {
transform.setParallelism(1);
transform.setMaxParallelism(1);
}
EmptyRowDataKeySelector selector = EmptyRowDataKeySelector.INSTANCE;
transform.setStateKeySelector(selector);
transform.setStateKeyType(selector.getProducedType());
return transform;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
import org.apache.flink.table.planner.plan.utils.{RelExplainUtil, SortUtil}
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector
import org.apache.flink.table.runtime.operators.sort.{ProcTimeSortOperator, RowTimeSortOperator}
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelFieldCollation.Direction
import org.apache.calcite.rel._
import org.apache.calcite.rel.core.Sort
import org.apache.calcite.rex.RexNode
import scala.collection.JavaConversions._
/**
* Stream physical RelNode for time-ascending-order [[Sort]] without `limit`.
*
* @see [[StreamPhysicalRank]] which must be with `limit` order by.
* @see [[StreamPhysicalSort]] which can be used for testing now, its sort key can be any type.
*/
class StreamExecTemporalSort(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
sortCollation: RelCollation)
extends Sort(cluster, traitSet, inputRel, sortCollation)
with StreamPhysicalRel
with LegacyStreamExecNode[RowData] {
override def requireWatermark: Boolean = false
override def copy(
traitSet: RelTraitSet,
input: RelNode,
newCollation: RelCollation,
offset: RexNode,
fetch: RexNode): Sort = {
new StreamExecTemporalSort(cluster, traitSet, input, newCollation)
}
override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("input", getInput())
.item("orderBy", RelExplainUtil.collationToString(sortCollation, getRowType))
}
//~ ExecNode methods -----------------------------------------------------------
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
val config = planner.getTableConfig
// time ordering needs to be ascending
if (SortUtil.getFirstSortDirection(sortCollation) != Direction.ASCENDING) {
throw new TableException(
"Sort: Primary sort order of a streaming table must be ascending on time.\n" +
"please re-check sort statement according to the description above")
}
val input = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val timeType = SortUtil.getFirstSortField(sortCollation, getRowType).getType
timeType match {
case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) =>
createSortProcTime(input, config)
case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeType) =>
createSortRowTime(input, config)
case _ =>
throw new TableException(
"Sort: Internal Error\n" +
"Normally, this happens unlikely. please contact customer support for this"
)
}
}
/**
* Create Sort logic based on processing time
*/
private def createSortProcTime(
input: Transformation[RowData],
tableConfig: TableConfig): Transformation[RowData] = {
val inputType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType)
val fieldCollations = sortCollation.getFieldCollations
// if the order has secondary sorting fields in addition to the proctime
if (fieldCollations.size() > 1) {
// strip off time collation
val (keys, orders, nullsIsLast) = SortUtil.getKeysAndOrders(fieldCollations.tail)
// sort code gen
val keyTypes = keys.map(inputType.getTypeAt)
val rowComparator = ComparatorCodeGenerator.gen(tableConfig, "ProcTimeSortComparator",
keys, keyTypes, orders, nullsIsLast)
val sortOperator = new ProcTimeSortOperator(InternalTypeInfo.of(inputType), rowComparator)
val outputRowTypeInfo = InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
// as input node is singleton exchange, its parallelism is 1.
val ret = new OneInputTransformation(
input,
getRelDetailedDescription,
sortOperator,
outputRowTypeInfo,
input.getParallelism)
val selector = EmptyRowDataKeySelector.INSTANCE
ret.setStateKeySelector(selector)
ret.setStateKeyType(selector.getProducedType)
ret
} else {
// if the order is done only on proctime we only need to forward the elements
input
}
}
/**
* Create Sort logic based on row time
*/
private def createSortRowTime(
input: Transformation[RowData],
tableConfig: TableConfig): Transformation[RowData] = {
val fieldCollations = sortCollation.getFieldCollations
val rowTimeIdx = fieldCollations.get(0).getFieldIndex
val inputType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType)
val rowComparator = if (fieldCollations.size() > 1) {
// strip off time collation
val (keys, orders, nullsIsLast) = SortUtil.getKeysAndOrders(fieldCollations.tail)
// comparator code gen
val keyTypes = keys.map(inputType.getTypeAt)
ComparatorCodeGenerator.gen(tableConfig, "RowTimeSortComparator", keys, keyTypes, orders,
nullsIsLast)
} else {
null
}
val sortOperator = new RowTimeSortOperator(
InternalTypeInfo.of(inputType), rowTimeIdx, rowComparator)
val outputRowTypeInfo = InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
val ret = new OneInputTransformation(
input,
getRelDetailedDescription,
sortOperator,
outputRowTypeInfo,
input.getParallelism)
if (inputsContainSingleton()) {
ret.setParallelism(1)
ret.setMaxParallelism(1)
}
val selector = EmptyRowDataKeySelector.INSTANCE
ret.setStateKeySelector(selector)
ret.setStateKeyType(selector.getProducedType)
ret
}
}
...@@ -37,11 +37,12 @@ import scala.collection.JavaConversions._ ...@@ -37,11 +37,12 @@ import scala.collection.JavaConversions._
* <b>NOTES:</b> This class is used for testing with bounded source now. * <b>NOTES:</b> This class is used for testing with bounded source now.
* If a query is converted to this node in product environment, an exception will be thrown. * If a query is converted to this node in product environment, an exception will be thrown.
* *
* @see [[StreamExecTemporalSort]] which must be time-ascending-order sort without `limit`. * @see [[StreamPhysicalTemporalSort]] which must be time-ascending-order sort without `limit`.
* *
* e.g. * e.g.
* ''SELECT * FROM TABLE ORDER BY ROWTIME, a'' will be converted to [[StreamExecTemporalSort]] * ''SELECT * FROM TABLE ORDER BY ROWTIME, a'' will be converted to
* ''SELECT * FROM TABLE ORDER BY a, ROWTIME'' will be converted to [[StreamPhysicalSort]] * [[StreamPhysicalTemporalSort]]
* ''SELECT * FROM TABLE ORDER BY a, ROWTIME'' will be converted to [[StreamPhysicalSort]]
*/ */
@Experimental @Experimental
class StreamPhysicalSort( class StreamPhysicalSort(
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort
import org.apache.flink.table.planner.plan.utils.{RelExplainUtil, SortUtil}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel._
import org.apache.calcite.rel.core.Sort
import org.apache.calcite.rex.RexNode
import scala.collection.JavaConversions._
/**
* Stream physical RelNode for time-ascending-order [[Sort]] without `limit`.
*
* @see [[StreamPhysicalRank]] which must be with `limit` order by.
* @see [[StreamPhysicalSort]] which can be used for testing now, its sort key can be any type.
*/
class StreamPhysicalTemporalSort(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
sortCollation: RelCollation)
extends Sort(cluster, traitSet, inputRel, sortCollation)
with StreamPhysicalRel {
override def requireWatermark: Boolean = false
override def copy(
traitSet: RelTraitSet,
input: RelNode,
newCollation: RelCollation,
offset: RexNode,
fetch: RexNode): Sort = {
new StreamPhysicalTemporalSort(cluster, traitSet, input, newCollation)
}
override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("input", getInput())
.item("orderBy", RelExplainUtil.collationToString(sortCollation, getRowType))
}
override def translateToExecNode(): ExecNode[_] = {
new StreamExecTemporalSort(
SortUtil.getSortSpec(sortCollation.getFieldCollations),
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
...@@ -240,7 +240,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti ...@@ -240,7 +240,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode( createNewNode(
cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester) cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
case _: StreamExecTemporalSort | _: StreamExecOverAggregate | _: StreamExecIntervalJoin | case _: StreamPhysicalTemporalSort | _: StreamExecOverAggregate | _: StreamExecIntervalJoin |
_: StreamExecPythonOverAggregate => _: StreamExecPythonOverAggregate =>
// TemporalSort, OverAggregate, IntervalJoin only support consuming insert-only // TemporalSort, OverAggregate, IntervalJoin only support consuming insert-only
// and producing insert-only changes // and producing insert-only changes
...@@ -471,7 +471,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti ...@@ -471,7 +471,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(rel, children, requiredTrait) createNewNode(rel, children, requiredTrait)
case _: StreamExecGroupWindowAggregate | _: StreamExecGroupWindowTableAggregate | case _: StreamExecGroupWindowAggregate | _: StreamExecGroupWindowTableAggregate |
_: StreamExecDeduplicate | _: StreamExecTemporalSort | _: StreamExecMatch | _: StreamExecDeduplicate | _: StreamPhysicalTemporalSort | _: StreamExecMatch |
_: StreamExecOverAggregate | _: StreamExecIntervalJoin | _: StreamExecOverAggregate | _: StreamExecIntervalJoin |
_: StreamExecPythonGroupWindowAggregate | _: StreamExecPythonOverAggregate => _: StreamExecPythonGroupWindowAggregate | _: StreamExecPythonOverAggregate =>
// WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP, OverAggregate // WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP, OverAggregate
......
...@@ -413,7 +413,7 @@ object FlinkStreamRuleSets { ...@@ -413,7 +413,7 @@ object FlinkStreamRuleSets {
StreamPhysicalSortRule.INSTANCE, StreamPhysicalSortRule.INSTANCE,
StreamPhysicalLimitRule.INSTANCE, StreamPhysicalLimitRule.INSTANCE,
StreamPhysicalSortLimitRule.INSTANCE, StreamPhysicalSortLimitRule.INSTANCE,
StreamExecTemporalSortRule.INSTANCE, StreamPhysicalTemporalSortRule.INSTANCE,
// rank // rank
StreamPhysicalRankRule.INSTANCE, StreamPhysicalRankRule.INSTANCE,
StreamExecDeduplicateRule.RANK_INSTANCE, StreamExecDeduplicateRule.RANK_INSTANCE,
......
...@@ -40,7 +40,7 @@ class StreamPhysicalSortRule ...@@ -40,7 +40,7 @@ class StreamPhysicalSortRule
override def matches(call: RelOptRuleCall): Boolean = { override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0) val sort: FlinkLogicalSort = call.rel(0)
!sort.getCollation.getFieldCollations.isEmpty && sort.fetch == null && sort.offset == null && !sort.getCollation.getFieldCollations.isEmpty && sort.fetch == null && sort.offset == null &&
!StreamExecTemporalSortRule.canConvertToTemporalSort(sort) !StreamPhysicalTemporalSortRule.canConvertToTemporalSort(sort)
} }
override def convert(rel: RelNode): RelNode = { override def convert(rel: RelNode): RelNode = {
......
...@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream ...@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalSort import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalSort
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
...@@ -30,18 +30,18 @@ import org.apache.calcite.rel.convert.ConverterRule ...@@ -30,18 +30,18 @@ import org.apache.calcite.rel.convert.ConverterRule
/** /**
* Rule that matches [[FlinkLogicalSort]] which is sorted by time attribute in ascending order * Rule that matches [[FlinkLogicalSort]] which is sorted by time attribute in ascending order
* and its `fetch` and `offset` are null, and converts it to [[StreamExecTemporalSort]]. * and its `fetch` and `offset` are null, and converts it to [[StreamPhysicalTemporalSort]].
*/ */
class StreamExecTemporalSortRule class StreamPhysicalTemporalSortRule
extends ConverterRule( extends ConverterRule(
classOf[FlinkLogicalSort], classOf[FlinkLogicalSort],
FlinkConventions.LOGICAL, FlinkConventions.LOGICAL,
FlinkConventions.STREAM_PHYSICAL, FlinkConventions.STREAM_PHYSICAL,
"StreamExecTemporalSortRule") { "StreamPhysicalTemporalSortRule") {
override def matches(call: RelOptRuleCall): Boolean = { override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0) val sort: FlinkLogicalSort = call.rel(0)
StreamExecTemporalSortRule.canConvertToTemporalSort(sort) StreamPhysicalTemporalSortRule.canConvertToTemporalSort(sort)
} }
override def convert(rel: RelNode): RelNode = { override def convert(rel: RelNode): RelNode = {
...@@ -56,7 +56,7 @@ class StreamExecTemporalSortRule ...@@ -56,7 +56,7 @@ class StreamExecTemporalSortRule
val convInput: RelNode = RelOptRule.convert(input, requiredTraitSet) val convInput: RelNode = RelOptRule.convert(input, requiredTraitSet)
new StreamExecTemporalSort( new StreamPhysicalTemporalSort(
rel.getCluster, rel.getCluster,
providedTraitSet, providedTraitSet,
convInput, convInput,
...@@ -65,17 +65,17 @@ class StreamExecTemporalSortRule ...@@ -65,17 +65,17 @@ class StreamExecTemporalSortRule
} }
object StreamExecTemporalSortRule { object StreamPhysicalTemporalSortRule {
val INSTANCE: RelOptRule = new StreamExecTemporalSortRule val INSTANCE: RelOptRule = new StreamPhysicalTemporalSortRule
/** /**
* Whether the given sort could be converted to [[StreamExecTemporalSort]]. * Whether the given sort could be converted to [[StreamPhysicalTemporalSort]].
* *
* Return true if the given sort is sorted by time attribute in ascending order * Return true if the given sort is sorted by time attribute in ascending order
* and its `fetch` and `offset` are null, else false. * and its `fetch` and `offset` are null, else false.
* *
* @param sort the [[FlinkLogicalSort]] node * @param sort the [[FlinkLogicalSort]] node
* @return True if the input sort could be converted to [[StreamExecTemporalSort]] * @return True if the input sort could be converted to [[StreamPhysicalTemporalSort]]
*/ */
def canConvertToTemporalSort(sort: FlinkLogicalSort): Boolean = { def canConvertToTemporalSort(sort: FlinkLogicalSort): Boolean = {
val fieldCollations = sort.collation.getFieldCollations val fieldCollations = sort.collation.getFieldCollations
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册