From bc782b6ddbe9ae84d3f7e9e1cae473a2cd1a5fd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E4=B8=89?= Date: Fri, 25 Dec 2020 14:59:05 +0800 Subject: [PATCH] [FLINK-20766][table-planner-blink] Introduce StreamPhysicalSortLimit, and make StreamExecSortLimit only extends from ExecNode. This closes #14502 --- .../exec/stream/StreamExecSortLimit.java | 69 ++++++ .../physical/stream/StreamExecSortLimit.scala | 211 ------------------ .../stream/StreamPhysicalSortLimit.scala | 103 +++++++++ .../FlinkChangelogModeInferenceProgram.scala | 4 +- .../plan/rules/FlinkStreamRuleSets.scala | 2 +- ...cala => StreamPhysicalSortLimitRule.scala} | 14 +- .../metadata/FlinkRelMdHandlerTestBase.scala | 2 +- 7 files changed, 183 insertions(+), 222 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSortLimit.java delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala create mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSortLimit.scala rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/{StreamExecSortLimitRule.scala => StreamPhysicalSortLimitRule.scala} (90%) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSortLimit.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSortLimit.java new file mode 100644 index 00000000000..a9a50d22a29 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSortLimit.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.utils.PartitionSpec; +import org.apache.flink.table.planner.plan.nodes.exec.utils.SortSpec; +import org.apache.flink.table.planner.plan.utils.RankProcessStrategy; +import org.apache.flink.table.runtime.operators.rank.ConstantRankRange; +import org.apache.flink.table.runtime.operators.rank.RankType; +import org.apache.flink.table.types.logical.RowType; + +/** {@link StreamExecNode} for Sort with limit. */ +public class StreamExecSortLimit extends StreamExecRank { + + private final long limitEnd; + + public StreamExecSortLimit( + SortSpec sortSpec, + long limitStart, + long limitEnd, + RankProcessStrategy rankStrategy, + boolean generateUpdateBefore, + ExecEdge inputEdge, + RowType outputType, + String description) { + super( + RankType.ROW_NUMBER, + PartitionSpec.ALL_IN_ONE, + sortSpec, + new ConstantRankRange(limitStart + 1, limitEnd), + rankStrategy, + false, + generateUpdateBefore, + inputEdge, + outputType, + description); + this.limitEnd = limitEnd; + } + + @Override + protected Transformation translateToPlanInternal(PlannerBase planner) { + if (limitEnd == Long.MAX_VALUE) { + throw new TableException( + "FETCH is missed, which on streaming table is not supported currently."); + } + return super.translateToPlanInternal(planner); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala deleted file mode 100644 index 149e9387bd7..00000000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.nodes.physical.stream - -import org.apache.flink.api.dag.Transformation -import org.apache.flink.streaming.api.operators.KeyedProcessOperator -import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.table.api.TableException -import org.apache.flink.table.data.RowData -import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator -import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator -import org.apache.flink.table.planner.delegation.StreamPlanner -import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank -import org.apache.flink.table.planner.plan.utils._ -import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector -import org.apache.flink.table.runtime.operators.rank._ -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.core.Sort -import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter} -import org.apache.calcite.rex.{RexLiteral, RexNode} -import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy} - -import scala.collection.JavaConversions._ - -/** - * Stream physical RelNode for [[Sort]]. - * - * This RelNode take the `limit` elements beginning with the first `offset` elements. - **/ -class StreamExecSortLimit( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputRel: RelNode, - sortCollation: RelCollation, - offset: RexNode, - fetch: RexNode, - rankStrategy: RankProcessStrategy) - extends Sort(cluster, traitSet, inputRel, sortCollation, offset, fetch) - with StreamPhysicalRel - with LegacyStreamExecNode[RowData] { - - private val limitStart: Long = SortUtil.getLimitStart(offset) - private val limitEnd: Long = SortUtil.getLimitEnd(offset, fetch) - - override def requireWatermark: Boolean = false - - override def copy( - traitSet: RelTraitSet, - newInput: RelNode, - newCollation: RelCollation, - offset: RexNode, - fetch: RexNode): Sort = { - new StreamExecSortLimit(cluster, traitSet, newInput, newCollation, offset, fetch, rankStrategy) - } - - def copy(newStrategy: RankProcessStrategy): StreamExecSortLimit = { - new StreamExecSortLimit(cluster, traitSet, input, sortCollation, offset, fetch, newStrategy) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - pw.input("input", getInput) - .item("orderBy", RelExplainUtil.collationToString(sortCollation, getRowType)) - .item("offset", limitStart) - .item("fetch", RelExplainUtil.fetchToString(fetch)) - .item("strategy", rankStrategy) - } - - override def estimateRowCount(mq: RelMetadataQuery): Double = { - val inputRows = mq.getRowCount(this.getInput) - if (inputRows == null) { - inputRows - } else { - val rowCount = (inputRows - limitStart).max(1.0) - if (fetch != null) { - rowCount.min(RexLiteral.intValue(fetch)) - } else { - rowCount - } - } - } - - //~ ExecNode methods ----------------------------------------------------------- - - override protected def translateToPlanInternal( - planner: StreamPlanner): Transformation[RowData] = { - if (fetch == null) { - throw new TableException( - "FETCH is missed, which on streaming table is not supported currently") - } - - val inputTransform = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val inputRowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]] - val fieldCollations = sortCollation.getFieldCollations - val (sortFields, sortDirections, nullsIsLast) = SortUtil.getKeysAndOrders(fieldCollations) - val sortKeySelector = KeySelectorUtil.getRowDataSelector(sortFields, inputRowTypeInfo) - val sortKeyType = sortKeySelector.getProducedType - val tableConfig = planner.getTableConfig - val sortKeyComparator = ComparatorCodeGenerator.gen( - tableConfig, - "StreamExecSortComparator", - sortFields.indices.toArray, - sortKeyType.toRowFieldTypes, - sortDirections, - nullsIsLast) - val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) - val cacheSize = tableConfig.getConfiguration.getLong(StreamExecRank.TABLE_EXEC_TOPN_CACHE_SIZE) - val minIdleStateRetentionTime = tableConfig.getMinIdleStateRetentionTime - val maxIdleStateRetentionTime = tableConfig.getMaxIdleStateRetentionTime - - // rankStart begin with 1 - val rankRange = new ConstantRankRange(limitStart + 1, limitEnd) - val rankType = RankType.ROW_NUMBER - val outputRankNumber = false - - // Use RankFunction underlying StreamExecSortLimit - val processFunction = rankStrategy match { - case _: AppendFastStrategy => - new AppendOnlyTopNFunction( - minIdleStateRetentionTime, - maxIdleStateRetentionTime, - inputRowTypeInfo, - sortKeyComparator, - sortKeySelector, - rankType, - rankRange, - generateUpdateBefore, - outputRankNumber, - cacheSize) - - case updateFastStrategy: UpdateFastStrategy => - val rowKeySelector = KeySelectorUtil.getRowDataSelector( - updateFastStrategy.getPrimaryKeys, inputRowTypeInfo) - new UpdatableTopNFunction( - minIdleStateRetentionTime, - maxIdleStateRetentionTime, - inputRowTypeInfo, - rowKeySelector, - sortKeyComparator, - sortKeySelector, - rankType, - rankRange, - generateUpdateBefore, - outputRankNumber, - cacheSize) - - // TODO Use UnaryUpdateTopNFunction after SortedMapState is merged - case _: RetractStrategy => - val equaliserCodeGen = new EqualiserCodeGenerator(inputRowTypeInfo.toRowFieldTypes) - val generatedEqualiser = equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser") - val comparator = new ComparableRecordComparator( - sortKeyComparator, - sortFields.indices.toArray, - sortKeyType.toRowFieldTypes, - sortDirections, - nullsIsLast) - new RetractableTopNFunction( - minIdleStateRetentionTime, - maxIdleStateRetentionTime, - inputRowTypeInfo, - comparator, - sortKeySelector, - rankType, - rankRange, - generatedEqualiser, - generateUpdateBefore, - outputRankNumber) - } - val operator = new KeyedProcessOperator(processFunction) - processFunction.setKeyContext(operator) - - val outputRowTypeInfo = InternalTypeInfo.of( - FlinkTypeFactory.toLogicalRowType(getRowType)) - - val ret = new OneInputTransformation( - inputTransform, - getRelDetailedDescription, - operator, - outputRowTypeInfo, - inputTransform.getParallelism) - - if (inputsContainSingleton()) { - ret.setParallelism(1) - ret.setMaxParallelism(1) - } - - val selector = EmptyRowDataKeySelector.INSTANCE - ret.setStateKeySelector(selector) - ret.setStateKeyType(selector.getProducedType) - ret - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSortLimit.scala new file mode 100644 index 00000000000..a306aea0a12 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSortLimit.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.nodes.physical.stream + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} +import org.apache.flink.table.planner.plan.utils._ + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.core.Sort +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter} +import org.apache.calcite.rex.{RexLiteral, RexNode} + +import scala.collection.JavaConversions._ + +/** + * Stream physical RelNode for [[Sort]]. + * + * This RelNode take the `limit` elements beginning with the first `offset` elements. + **/ +class StreamPhysicalSortLimit( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + sortCollation: RelCollation, + offset: RexNode, + fetch: RexNode, + rankStrategy: RankProcessStrategy) + extends Sort(cluster, traitSet, inputRel, sortCollation, offset, fetch) + with StreamPhysicalRel { + + private val limitStart: Long = SortUtil.getLimitStart(offset) + private val limitEnd: Long = SortUtil.getLimitEnd(offset, fetch) + + override def requireWatermark: Boolean = false + + override def copy( + traitSet: RelTraitSet, + newInput: RelNode, + newCollation: RelCollation, + offset: RexNode, + fetch: RexNode): Sort = { + new StreamPhysicalSortLimit( + cluster, traitSet, newInput, newCollation, offset, fetch, rankStrategy) + } + + def copy(newStrategy: RankProcessStrategy): StreamPhysicalSortLimit = { + new StreamPhysicalSortLimit(cluster, traitSet, input, sortCollation, offset, fetch, newStrategy) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + pw.input("input", getInput) + .item("orderBy", RelExplainUtil.collationToString(sortCollation, getRowType)) + .item("offset", limitStart) + .item("fetch", RelExplainUtil.fetchToString(fetch)) + .item("strategy", rankStrategy) + } + + override def estimateRowCount(mq: RelMetadataQuery): Double = { + val inputRows = mq.getRowCount(this.getInput) + if (inputRows == null) { + inputRows + } else { + val rowCount = (inputRows - limitStart).max(1.0) + if (fetch != null) { + rowCount.min(RexLiteral.intValue(fetch)) + } else { + rowCount + } + } + } + + override def translateToExecNode(): ExecNode[_] = { + val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) + new StreamExecSortLimit( + SortUtil.getSortSpec(sortCollation.getFieldCollations), + limitStart, + limitEnd, + rankStrategy, + generateUpdateBefore, + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 759cc595aed..bd65c379abe 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -220,7 +220,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti } createNewNode(limit, children, providedTrait, requiredTrait, requester) - case _: StreamPhysicalRank | _: StreamExecSortLimit => + case _: StreamPhysicalRank | _: StreamPhysicalSortLimit => // Rank and SortLimit supports consuming all changes val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES) createNewNode( @@ -484,7 +484,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti rank, rank.partitionKey, rank.orderKey) visitRankStrategies(rankStrategies, requiredTrait, rankStrategy => rank.copy(rankStrategy)) - case sortLimit: StreamExecSortLimit => + case sortLimit: StreamPhysicalSortLimit => val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies( sortLimit, ImmutableBitSet.of(), sortLimit.getCollation) visitRankStrategies( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index db399a826d1..c2635906ccc 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -412,7 +412,7 @@ object FlinkStreamRuleSets { // sort StreamPhysicalSortRule.INSTANCE, StreamPhysicalLimitRule.INSTANCE, - StreamExecSortLimitRule.INSTANCE, + StreamPhysicalSortLimitRule.INSTANCE, StreamExecTemporalSortRule.INSTANCE, // rank StreamPhysicalRankRule.INSTANCE, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSortLimitRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala similarity index 90% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSortLimitRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala index 432f05b29ea..af795c8aa53 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSortLimitRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSortLimitRule.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSortLimit +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSortLimit import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule @@ -28,14 +28,14 @@ import org.apache.flink.table.planner.plan.utils.RankProcessStrategy /** * Rule that matches [[FlinkLogicalSort]] with non-empty sort fields and non-null fetch or offset, - * and converts it to [[StreamExecSortLimit]]. + * and converts it to [[StreamPhysicalSortLimit]]. */ -class StreamExecSortLimitRule +class StreamPhysicalSortLimitRule extends ConverterRule( classOf[FlinkLogicalSort], FlinkConventions.LOGICAL, FlinkConventions.STREAM_PHYSICAL, - "StreamExecSortLimitRule") { + "StreamPhysicalSortLimitRule") { override def matches(call: RelOptRuleCall): Boolean = { val sort: FlinkLogicalSort = call.rel(0) @@ -54,7 +54,7 @@ class StreamExecSortLimitRule .replace(FlinkConventions.STREAM_PHYSICAL) val newInput: RelNode = RelOptRule.convert(input, requiredTraitSet) - new StreamExecSortLimit( + new StreamPhysicalSortLimit( rel.getCluster, providedTraitSet, newInput, @@ -65,6 +65,6 @@ class StreamExecSortLimitRule } } -object StreamExecSortLimitRule { - val INSTANCE: RelOptRule = new StreamExecSortLimitRule +object StreamPhysicalSortLimitRule { + val INSTANCE: RelOptRule = new StreamPhysicalSortLimitRule } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 0385de9d941..914b963a3a4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -406,7 +406,7 @@ class FlinkRelMdHandlerTestBase { FlinkRelDistribution.SINGLETON), collection, offset, fetch, true) - val streamSort = new StreamExecSortLimit(cluster, streamPhysicalTraits.replace(collection), + val streamSort = new StreamPhysicalSortLimit(cluster, streamPhysicalTraits.replace(collection), studentStreamScan, collection, offset, fetch, RankProcessStrategy.UNDEFINED_STRATEGY) (logicalSortLimit, flinkLogicalSortLimit, -- GitLab