提交 ae796bb6 编写于 作者: 龙三 提交者: godfreyhe

[FLINK-20766][table-planner-blink] Introduce BatchPhysicalSort, and make...

[FLINK-20766][table-planner-blink] Introduce BatchPhysicalSort, and make BatchExeSort only extended from ExecNode

This closes #14502
上级 91f93d9c
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.batch;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.nodes.exec.utils.SortSpec;
import org.apache.flink.table.runtime.operators.sort.SortOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
/**
* {@link BatchExecNode} for Sort without limit.
*
* <p>This node will output all data rather than `limit` records.
*/
public class BatchExecSort extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
private final SortSpec sortSpec;
public BatchExecSort(
SortSpec sortSpec, ExecEdge inputEdge, RowType outputType, String description) {
super(Collections.singletonList(inputEdge), outputType, description);
this.sortSpec = sortSpec;
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
TableConfig config = planner.getTableConfig();
RowType inputType = (RowType) inputNode.getOutputType();
SortCodeGenerator codeGen =
new SortCodeGenerator(
config,
sortSpec.getFieldIndices(),
sortSpec.getFieldTypes(inputType),
sortSpec.getAscendingOrders(),
sortSpec.getNullsIsLast());
SortOperator operator =
new SortOperator(
codeGen.generateNormalizedKeyComputer("BatchExecSortComputer"),
codeGen.generateRecordComparator("BatchExecSortComparator"));
long sortMemory =
MemorySize.parse(
config.getConfiguration()
.getString(
ExecutionConfigOptions
.TABLE_EXEC_RESOURCE_SORT_MEMORY))
.getBytes();
OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
getDesc(),
SimpleOperatorFactory.of(operator),
InternalTypeInfo.of((RowType) getOutputType()),
inputTransform.getParallelism(),
sortMemory);
if (inputsContainSingleton()) {
transform.setParallelism(1);
transform.setMaxParallelism(1);
}
return transform;
}
}
......@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.planner.JHashMap
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.planner.plan.metadata.FlinkMetadata.FlinkDistribution
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalSortRule
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.calcite.rel._
......@@ -73,7 +73,7 @@ class FlinkRelMdDistribution private extends MetadataHandler[FlinkDistribution]
def flinkDistribution(sort: Sort, mq: RelMetadataQuery): FlinkRelDistribution = {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(sort)
val enableRangeSort = tableConfig.getConfiguration.getBoolean(
BatchExecSortRule.TABLE_EXEC_SORT_RANGE_ENABLED)
BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED)
if ((sort.getCollation.getFieldCollations.nonEmpty &&
sort.fetch == null && sort.offset == null) && enableRangeSort) {
//If Sort is global sort, and the table config allows the range partition.
......
......@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.logical
import org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT
import org.apache.flink.table.planner.calcite.FlinkContext
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule.TABLE_EXEC_SORT_RANGE_ENABLED
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED
import org.apache.flink.table.planner.plan.utils.SortUtil
import org.apache.calcite.plan._
......@@ -105,7 +105,7 @@ class FlinkLogicalSortBatchConverter extends ConverterRule(
val sort = rel.asInstanceOf[LogicalSort]
val newInput = RelOptRule.convert(sort.getInput, FlinkConventions.LOGICAL)
val config = sort.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
val enableRangeSort = config.getConfiguration.getBoolean(TABLE_EXEC_SORT_RANGE_ENABLED)
val enableRangeSort = config.getConfiguration.getBoolean(TABLE_EXEC_RANGE_SORT_ENABLED)
val limitValue = config.getConfiguration.getInteger(TABLE_EXEC_SORT_DEFAULT_LIMIT)
val (offset, fetch) = if (sort.fetch == null && sort.offset == null
&& !enableRangeSort && limitValue > 0) {
......
......@@ -18,20 +18,11 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.MemorySize
import org.apache.flink.streaming.api.operators.{OneInputStreamOperator, SimpleOperatorFactory}
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode, ExecEdge}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, RelExplainUtil, SortUtil}
import org.apache.flink.table.runtime.operators.sort.SortOperator
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.core.Sort
......@@ -39,8 +30,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
import org.apache.calcite.rex.RexNode
import java.util
import scala.collection.JavaConversions._
/**
......@@ -48,18 +37,15 @@ import scala.collection.JavaConversions._
*
* This node will output all data rather than `limit` records.
*/
class BatchExecSort(
class BatchPhysicalSort(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
sortCollation: RelCollation)
extends Sort(cluster, traitSet, inputRel, sortCollation)
with BatchPhysicalRel
with LegacyBatchExecNode[RowData] {
with BatchPhysicalRel {
require(sortCollation.getFieldCollations.size() > 0)
private val (keys, orders, nullsIsLast) = SortUtil.getKeysAndOrders(
sortCollation.getFieldCollations)
override def copy(
traitSet: RelTraitSet,
......@@ -67,7 +53,7 @@ class BatchExecSort(
newCollation: RelCollation,
offset: RexNode,
fetch: RexNode): Sort = {
new BatchExecSort(cluster, traitSet, newInput, newCollation)
new BatchPhysicalSort(cluster, traitSet, newInput, newCollation)
}
override def explainTerms(pw: RelWriter): RelWriter = {
......@@ -90,38 +76,13 @@ class BatchExecSort(
costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost)
}
//~ ExecNode methods -----------------------------------------------------------
override def getInputEdges: util.List[ExecEdge] = List(
ExecEdge.builder()
.damBehavior(ExecEdge.DamBehavior.END_INPUT)
.build())
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[RowData] = {
val input = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val conf = planner.getTableConfig
val inputType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType)
val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
// sort code gen
val keyTypes = keys.map(inputType.getTypeAt)
val codeGen = new SortCodeGenerator(conf, keys, keyTypes, orders, nullsIsLast)
val operator = new SortOperator(
codeGen.generateNormalizedKeyComputer("BatchExecSortComputer"),
codeGen.generateRecordComparator("BatchExecSortComparator"))
val sortMemory = MemorySize.parse(conf.getConfiguration.getString(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY)).getBytes
ExecNodeUtil.createOneInputTransformation(
input,
getRelDetailedDescription,
SimpleOperatorFactory.of(operator.asInstanceOf[OneInputStreamOperator[RowData, RowData]]),
InternalTypeInfo.of(outputType),
input.getParallelism,
sortMemory)
@Override
override def translateToExecNode(): ExecNode[_] = {
new BatchExecSort(
SortUtil.getSortSpec(sortCollation.getFieldCollations),
ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).build(),
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
......@@ -405,7 +405,7 @@ object FlinkBatchRuleSets {
// union
BatchPhysicalUnionRule.INSTANCE,
// sort
BatchExecSortRule.INSTANCE,
BatchPhysicalSortRule.INSTANCE,
BatchPhysicalLimitRule.INSTANCE,
BatchExecSortLimitRule.INSTANCE,
// rank
......
......@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical
import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecSort, BatchPhysicalExchange, BatchPhysicalRel}
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalSort, BatchPhysicalExchange, BatchPhysicalRel}
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange
import org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule._
......@@ -158,7 +158,7 @@ object FlinkExpandConversionRule {
val sortCollation = RelCollationTraitDef.INSTANCE.canonize(requiredCollation)
flinkConvention match {
case FlinkConventions.BATCH_PHYSICAL =>
new BatchExecSort(node.getCluster, traitSet, node, sortCollation)
new BatchPhysicalSort(node.getCluster, traitSet, node, sortCollation)
case _ => throw new TableException(s"Unsupported convention: $flinkConvention")
}
} else {
......
......@@ -25,7 +25,7 @@ import org.apache.flink.table.planner.calcite.FlinkContext
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSort
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
......@@ -35,13 +35,13 @@ import java.lang.{Boolean => JBoolean}
/**
* Rule that matches [[FlinkLogicalSort]] which sort fields is non-empty and both `fetch` and
* `offset` are null, and converts it to [[BatchExecSort]].
* `offset` are null, and converts it to [[BatchPhysicalSort]].
*/
class BatchExecSortRule extends ConverterRule(
class BatchPhysicalSortRule extends ConverterRule(
classOf[FlinkLogicalSort],
FlinkConventions.LOGICAL,
FlinkConventions.BATCH_PHYSICAL,
"BatchExecSortRule") {
"BatchPhysicalSortRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val sort: FlinkLogicalSort = call.rel(0)
......@@ -54,7 +54,7 @@ class BatchExecSortRule extends ConverterRule(
val input = sort.getInput
val config = sort.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
val enableRangeSort = config.getConfiguration.getBoolean(
BatchExecSortRule.TABLE_EXEC_SORT_RANGE_ENABLED)
BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED)
val distribution = if (enableRangeSort) {
FlinkRelDistribution.range(sort.getCollation.getFieldCollations)
} else {
......@@ -68,7 +68,7 @@ class BatchExecSortRule extends ConverterRule(
.replace(FlinkConventions.BATCH_PHYSICAL)
val newInput = RelOptRule.convert(input, requiredTraitSet)
new BatchExecSort(
new BatchPhysicalSort(
sort.getCluster,
providedTraitSet,
newInput,
......@@ -76,12 +76,12 @@ class BatchExecSortRule extends ConverterRule(
}
}
object BatchExecSortRule {
val INSTANCE: RelOptRule = new BatchExecSortRule
object BatchPhysicalSortRule {
val INSTANCE: RelOptRule = new BatchPhysicalSortRule
// It is a experimental config, will may be removed later.
@Experimental
val TABLE_EXEC_SORT_RANGE_ENABLED: ConfigOption[JBoolean] =
val TABLE_EXEC_RANGE_SORT_ENABLED: ConfigOption[JBoolean] =
key("table.exec.range-sort.enabled")
.defaultValue(JBoolean.valueOf(false))
.withDescription("Sets whether to enable range sort, use range sort to sort all data in" +
......
......@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.rules.physical.batch
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecSort, BatchExecSortAggregate, BatchPhysicalExchange, BatchPhysicalExpand}
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalSort, BatchExecSortAggregate, BatchPhysicalExchange, BatchPhysicalExpand}
import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.RelOptRuleCall
......@@ -55,7 +55,7 @@ import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
*/
class EnforceLocalSortAggRule extends EnforceLocalAggRuleBase(
operand(classOf[BatchExecSortAggregate],
operand(classOf[BatchExecSort],
operand(classOf[BatchPhysicalSort],
operand(classOf[BatchPhysicalExchange],
operand(classOf[BatchPhysicalExpand], any)))),
"EnforceLocalSortAggRule") {
......@@ -92,13 +92,13 @@ class EnforceLocalSortAggRule extends EnforceLocalAggRuleBase(
private def createSort(
input: RelNode,
sortKeys: Array[Int]): BatchExecSort = {
sortKeys: Array[Int]): BatchPhysicalSort = {
val cluster = input.getCluster
val collation = createRelCollation(sortKeys)
val traitSet = cluster.getPlanner.emptyTraitSet
.replace(FlinkConventions.BATCH_PHYSICAL)
.replace(collation)
new BatchExecSort(
new BatchPhysicalSort(
cluster,
traitSet,
input,
......
......@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.rules.physical.batch
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortAggregate, BatchExecSort, BatchExecSortAggregate}
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortAggregate, BatchPhysicalSort, BatchExecSortAggregate}
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
......@@ -86,7 +86,7 @@ class RemoveRedundantLocalSortAggWithoutSortRule extends RemoveRedundantLocalSor
class RemoveRedundantLocalSortAggWithSortRule extends RemoveRedundantLocalSortAggRule(
operand(classOf[BatchExecSortAggregate],
operand(classOf[BatchExecSort],
operand(classOf[BatchPhysicalSort],
operand(classOf[BatchExecLocalSortAggregate],
operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any)))),
"RemoveRedundantLocalSortAggWithSortRule") {
......
......@@ -24,7 +24,7 @@ import org.apache.flink.table.api.Types
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.plan.stats.TableStats
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortMergeJoinRule
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule.TABLE_EXEC_SORT_RANGE_ENABLED
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit
import org.apache.flink.table.planner.utils.{TableFunc1, TableTestBase}
......@@ -105,7 +105,7 @@ class RemoveCollationTest extends TableTestBase {
@Test
def testRemoveCollation_Sort(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
val sqlQuery =
"""
|WITH r AS (SELECT a, b, COUNT(c) AS cnt FROM x GROUP BY a, b)
......@@ -119,7 +119,7 @@ class RemoveCollationTest extends TableTestBase {
def testRemoveCollation_Aggregate_3(): Unit = {
util.tableEnv.getConfig.getConfiguration.setString(
ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg")
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
val sqlQuery =
"""
|WITH r AS (SELECT * FROM x ORDER BY a, b)
......
......@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.batch.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule.TABLE_EXEC_SORT_RANGE_ENABLED
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED
import org.apache.flink.table.planner.utils.TableTestBase
import org.junit.Test
......@@ -35,61 +35,61 @@ class SortLimitTest extends TableTestBase {
@Test
def testNonRangeSortWithoutOffset(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, false)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC LIMIT 5")
}
@Test
def testNonRangeSortWithLimit0(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, false)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC LIMIT 0")
}
@Test
def testNonRangeSortOnlyWithOffset(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, false)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC OFFSET 10 ROWS")
}
@Test
def testNoneRangeSortWithOffsetLimit(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, false)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b LIMIT 10 OFFSET 1")
}
@Test
def testNoneRangeSortWithOffsetLimit0(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, false)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b LIMIT 0 OFFSET 1")
}
@Test
def testRangeSortOnWithoutOffset(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC LIMIT 5")
}
@Test
def testRangeSortOnWithLimit0(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC LIMIT 0")
}
@Test
def testRangeSortOnlyWithOffset(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC OFFSET 10 ROWS")
}
@Test
def testRangeSortWithOffsetLimit(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b LIMIT 10 OFFSET 1")
}
@Test
def testRangeSortWithOffsetLimit0(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b LIMIT 0 OFFSET 1")
}
}
......@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.batch.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule.TABLE_EXEC_SORT_RANGE_ENABLED
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED
import org.apache.flink.table.planner.utils.TableTestBase
import org.junit.Test
......@@ -33,7 +33,7 @@ class SortTest extends TableTestBase {
@Test
def testNonRangeSortOnSingleFieldWithoutForceLimit(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, false)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, false)
util.tableEnv.getConfig.getConfiguration.setInteger(
ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, -1)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC")
......@@ -42,7 +42,7 @@ class SortTest extends TableTestBase {
@Test
def testNonRangeSortOnMultiFieldsWithoutForceLimit(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(
TABLE_EXEC_SORT_RANGE_ENABLED, false)
TABLE_EXEC_RANGE_SORT_ENABLED, false)
util.tableEnv.getConfig.getConfiguration.setInteger(
ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, -1)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b")
......@@ -51,7 +51,7 @@ class SortTest extends TableTestBase {
@Test
def testNonRangeSortWithForceLimit(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(
TABLE_EXEC_SORT_RANGE_ENABLED, false)
TABLE_EXEC_RANGE_SORT_ENABLED, false)
util.tableEnv.getConfig.getConfiguration.setInteger(
ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, 200)
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC")
......@@ -59,7 +59,7 @@ class SortTest extends TableTestBase {
@Test
def testRangeSortWithoutForceLimit(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setInteger(
ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, -1)
// exec node does not support range sort yet, so we verify rel plan here
......@@ -68,7 +68,7 @@ class SortTest extends TableTestBase {
@Test
def testRangeSortWithForceLimit(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setInteger(
ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT, 200)
// exec node does not support range sort yet, so we verify rel plan here
......
......@@ -23,7 +23,7 @@ import org.apache.flink.table.api._
import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions}
import org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortMergeJoinRule
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule.TABLE_EXEC_SORT_RANGE_ENABLED
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED
import org.apache.flink.table.planner.plan.utils.OperatorType
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{NonDeterministicTableFunc, StringSplit}
......@@ -222,7 +222,7 @@ class SubplanReuseTest extends TableTestBase {
@Test
def testSubplanReuseOnSort(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(TABLE_EXEC_RANGE_SORT_ENABLED, true)
val sqlQuery =
"""
|WITH r AS (SELECT c, SUM(a) a, SUM(b) b FROM x GROUP BY c ORDER BY a, b DESC)
......
......@@ -319,7 +319,7 @@ class FlinkRelMdHandlerTestBase {
val collation = logicalSort.getCollation
val flinkLogicalSort = new FlinkLogicalSort(cluster, flinkLogicalTraits.replace(collation),
studentFlinkLogicalScan, collation, null, null)
val batchSort = new BatchExecSort(cluster,
val batchSort = new BatchPhysicalSort(cluster,
batchPhysicalTraits.replace(collation).replace(FlinkRelDistribution.SINGLETON),
studentBatchScan, collation)
val streamSort = new StreamPhysicalSort(cluster,
......@@ -1697,7 +1697,7 @@ class FlinkRelMdHandlerTestBase {
val collection1 = RelCollations.of(
FlinkRelOptUtil.ofRelFieldCollation(4), FlinkRelOptUtil.ofRelFieldCollation(1))
val newSortTrait1 = exchange1.getTraitSet.replace(collection1)
val sort1 = new BatchExecSort(cluster, newSortTrait1, exchange1,
val sort1 = new BatchPhysicalSort(cluster, newSortTrait1, exchange1,
newSortTrait1.getTrait(RelCollationTraitDef.INSTANCE))
val outputRowType1 = createRowType("id", "name", "score", "age", "class", "rn")
......@@ -1723,7 +1723,7 @@ class FlinkRelMdHandlerTestBase {
val collation2 = RelCollations.of(
FlinkRelOptUtil.ofRelFieldCollation(4), FlinkRelOptUtil.ofRelFieldCollation(2))
val newSortTrait2 = innerWindowAgg1.getTraitSet.replace(collation2)
val sort2 = new BatchExecSort(cluster, newSortTrait2, innerWindowAgg1,
val sort2 = new BatchPhysicalSort(cluster, newSortTrait2, innerWindowAgg1,
newSortTrait2.getTrait(RelCollationTraitDef.INSTANCE))
val outputRowType2 = createRowType(
......
......@@ -32,7 +32,7 @@ import org.apache.flink.table.data.{DecimalDataUtils, TimestampData}
import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter
import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF}
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalSortRule
import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil.parseFieldNames
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.TestData._
......@@ -1221,7 +1221,7 @@ class CalcITCase extends BatchTestBase {
conf.getConfiguration.setInteger(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
conf.getConfiguration.setBoolean(
BatchExecSortRule.TABLE_EXEC_SORT_RANGE_ENABLED, true)
BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED, true)
checkResult(
"select * from BinaryT order by c",
nullData3.sortBy((x : Row) =>
......
......@@ -39,8 +39,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Operator for batch sort. */
public class SortOperator extends TableStreamOperator<BinaryRowData>
implements OneInputStreamOperator<RowData, BinaryRowData>, BoundedOneInput {
public class SortOperator extends TableStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
private static final Logger LOG = LoggerFactory.getLogger(SortOperator.class);
......@@ -48,7 +48,7 @@ public class SortOperator extends TableStreamOperator<BinaryRowData>
private GeneratedRecordComparator gComparator;
private transient BinaryExternalSorter sorter;
private transient StreamRecordCollector<BinaryRowData> collector;
private transient StreamRecordCollector<RowData> collector;
private transient BinaryRowDataSerializer binarySerializer;
public SortOperator(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册