提交 a8e591ad 编写于 作者: G godfreyhe

[FLINK-20857][table-planner-blink] Introduce BatchPhysicalHashWindowAggregate...

[FLINK-20857][table-planner-blink] Introduce BatchPhysicalHashWindowAggregate & BatchPhysicalLocalHashWindowAggregate, and make BatchExecHashWindowAggregate only extended from ExecNode

This closes #14574
上级 29065828
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.batch;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.agg.batch.HashWindowCodeGenerator;
import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedOperator;
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.calcite.rel.core.AggregateCall;
import java.util.Arrays;
import java.util.Collections;
/** Batch {@link ExecNode} for hash-based window aggregate operator. */
public class BatchExecHashWindowAggregate extends ExecNodeBase<RowData>
implements BatchExecNode<RowData> {
private final int[] grouping;
private final int[] auxGrouping;
private final AggregateCall[] aggCalls;
private final LogicalWindow window;
private final int inputTimeFieldIndex;
private final boolean inputTimeIsDate;
private final PlannerNamedWindowProperty[] namedWindowProperties;
private final RowType aggInputRowType;
private final boolean enableAssignPane;
private final boolean isMerge;
private final boolean isFinal;
public BatchExecHashWindowAggregate(
int[] grouping,
int[] auxGrouping,
AggregateCall[] aggCalls,
LogicalWindow window,
int inputTimeFieldIndex,
boolean inputTimeIsDate,
PlannerNamedWindowProperty[] namedWindowProperties,
RowType aggInputRowType,
boolean enableAssignPane,
boolean isMerge,
boolean isFinal,
ExecEdge inputEdge,
RowType outputType,
String description) {
super(Collections.singletonList(inputEdge), outputType, description);
this.grouping = grouping;
this.auxGrouping = auxGrouping;
this.aggCalls = aggCalls;
this.window = window;
this.inputTimeFieldIndex = inputTimeFieldIndex;
this.inputTimeIsDate = inputTimeIsDate;
this.namedWindowProperties = namedWindowProperties;
this.aggInputRowType = aggInputRowType;
this.enableAssignPane = enableAssignPane;
this.isMerge = isMerge;
this.isFinal = isFinal;
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner);
final AggregateInfoList aggInfos =
AggregateUtil.transformToBatchAggregateInfoList(
aggInputRowType,
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
null,
null);
final TableConfig tableConfig = planner.getTableConfig();
final RowType inputRowType = (RowType) inputNode.getOutputType();
final HashWindowCodeGenerator hashWindowCodeGenerator =
new HashWindowCodeGenerator(
new CodeGeneratorContext(tableConfig),
planner.getRelBuilder(),
window,
inputTimeFieldIndex,
inputTimeIsDate,
JavaScalaConversionUtil.toScala(Arrays.asList(namedWindowProperties)),
aggInfos,
inputRowType,
grouping,
auxGrouping,
enableAssignPane,
isMerge,
isFinal);
final int groupBufferLimitSize =
tableConfig
.getConfiguration()
.getInteger(ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT);
final Tuple2<Long, Long> windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window);
final GeneratedOperator<OneInputStreamOperator<RowData, RowData>> generatedOperator =
hashWindowCodeGenerator.gen(
inputRowType,
(RowType) getOutputType(),
groupBufferLimitSize,
0, // windowStart
windowSizeAndSlideSize.f0,
windowSizeAndSlideSize.f1);
final long managedMemory =
ExecNodeUtil.getMemorySize(
tableConfig, ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY);
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
getDesc(),
new CodeGenOperatorFactory<>(generatedOperator),
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism(),
managedMemory);
}
}
......@@ -72,7 +72,7 @@ class HashWindowCodeGenerator(
inputTimeIsDate: Boolean,
namedProperties: Seq[PlannerNamedWindowProperty],
aggInfoList: AggregateInfoList,
inputRowType: RelDataType,
inputRowType: RowType,
grouping: Array[Int],
auxGrouping: Array[Int],
enableAssignPane: Boolean = true,
......
......@@ -62,7 +62,7 @@ class SortWindowCodeGenerator(
inputTimeIsDate: Boolean,
namedProperties: Seq[PlannerNamedWindowProperty],
aggInfoList: AggregateInfoList,
inputRowType: RelDataType,
inputRowType: RowType,
inputType: RowType,
outputType: RowType,
buffLimitSize: Int,
......
......@@ -18,11 +18,7 @@
package org.apache.flink.table.planner.codegen.agg.batch
import org.apache.calcite.avatica.util.DateTimeUtils
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.tools.RelBuilder
import org.apache.commons.math3.util.ArithmeticUtils
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.utils.JoinedRowData
......@@ -52,6 +48,11 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME
import org.apache.flink.table.types.logical._
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot
import org.apache.calcite.avatica.util.DateTimeUtils
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.tools.RelBuilder
import org.apache.commons.math3.util.ArithmeticUtils
import scala.collection.JavaConversions._
abstract class WindowCodeGenerator(
......@@ -61,14 +62,15 @@ abstract class WindowCodeGenerator(
inputTimeIsDate: Boolean,
namedProperties: Seq[PlannerNamedWindowProperty],
aggInfoList: AggregateInfoList,
inputRowType: RelDataType,
inputRowType: RowType,
grouping: Array[Int],
auxGrouping: Array[Int],
enableAssignPane: Boolean = true,
val isMerge: Boolean,
val isFinal: Boolean) {
protected lazy val builder: RelBuilder = relBuilder.values(inputRowType)
protected lazy val builder: RelBuilder = relBuilder.values(
FlinkTypeFactory.INSTANCE.buildRelNodeRowType(inputRowType))
protected lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
......@@ -79,14 +81,12 @@ abstract class WindowCodeGenerator(
AggCodeGenHelper.getAggBufferNames(auxGrouping, aggInfos)
protected lazy val aggBufferTypes: Array[Array[LogicalType]] = AggCodeGenHelper.getAggBufferTypes(
inputType,
inputRowType,
auxGrouping,
aggInfos)
protected lazy val groupKeyRowType: RowType = AggCodeGenHelper.projectRowType(inputType, grouping)
private lazy val inputType: RowType =
FlinkTypeFactory.toLogicalType(inputRowType).asInstanceOf[RowType]
protected lazy val groupKeyRowType: RowType =
AggCodeGenHelper.projectRowType(inputRowType, grouping)
protected lazy val timestampInternalType: LogicalType =
if (inputTimeIsDate) new IntType() else new BigIntType()
......@@ -116,7 +116,7 @@ abstract class WindowCodeGenerator(
(groupKeyTypes :+ timestampInternalType) ++ aggBuffTypes,
((groupKeyNames :+ "assignedTs$") ++ aggBuffNames).toArray)
} else {
FlinkTypeFactory.toLogicalRowType(inputRowType)
inputRowType
}
}
......@@ -680,7 +680,8 @@ abstract class WindowCodeGenerator(
remainder)),
literal(index * slideSize))
exprCodegen.generateExpression(new CallExpressionResolver(relBuilder).resolve(expr).accept(
new ExpressionConverter(relBuilder.values(inputRowType))))
new ExpressionConverter(
relBuilder.values(FlinkTypeFactory.INSTANCE.buildRelNodeRowType(inputRowType)))))
}
def getGrouping: Array[Int] = grouping
......@@ -726,7 +727,7 @@ abstract class WindowCodeGenerator(
object WindowCodeGenerator {
def getWindowDef(window: LogicalWindow): (Long, Long) = {
def getWindowDef(window: LogicalWindow): JTuple2[JLong, JLong] = {
val (windowSize, slideSize): (Long, Long) = window match {
case TumblingGroupWindow(_, _, size) if isTimeIntervalLiteral(size) =>
(asLong(size), asLong(size))
......@@ -736,7 +737,7 @@ object WindowCodeGenerator {
// count tumbling/sliding window and session window not supported now
throw new UnsupportedOperationException(s"Window $window is not supported right now.")
}
(windowSize, slideSize)
new JTuple2[JLong, JLong](windowSize, slideSize)
}
def asLong(expr: Expression): Long = extractValue(expr, classOf[JLong]).get()
......
......@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.planner.JDouble
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalWindowAggregateBase}
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, BatchPhysicalWindowAggregateBase}
import org.apache.flink.table.planner.plan.stats._
import org.apache.flink.table.planner.plan.utils.AggregateUtil
......@@ -62,7 +62,7 @@ class AggCallSelectivityEstimator(agg: RelNode, mq: FlinkRelMetadataQuery)
(rel.getGroupSet.toArray ++ auxGroupSet, otherAggCalls)
case rel: BatchPhysicalGroupAggregateBase =>
(rel.grouping ++ rel.auxGrouping, rel.getAggCallList)
case rel: BatchExecLocalHashWindowAggregate =>
case rel: BatchPhysicalLocalHashWindowAggregate =>
val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ rel.auxGrouping
(fullGrouping, rel.getAggCallList)
case rel: BatchExecLocalSortWindowAggregate =>
......
......@@ -545,7 +545,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
case agg: BatchExecLocalSortWindowAggregate =>
// grouping + assignTs + auxGrouping
agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping
case agg: BatchExecLocalHashWindowAggregate =>
case agg: BatchPhysicalLocalHashWindowAggregate =>
// grouping + assignTs + auxGrouping
agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping
case agg: BatchPhysicalWindowAggregateBase => agg.grouping ++ agg.auxGrouping
......
......@@ -213,7 +213,7 @@ class FlinkRelMdSize private extends MetadataHandler[BuiltInMetadata.Size] {
val mapInputToOutput: Map[Int, Int] = windowAgg match {
case agg: WindowAggregate =>
AggregateUtil.checkAndGetFullGroupSet(agg).zipWithIndex.toMap
case agg: BatchExecLocalHashWindowAggregate =>
case agg: BatchPhysicalLocalHashWindowAggregate =>
// local win-agg output type: grouping + assignTs + auxGrouping + aggCalls
agg.grouping.zipWithIndex.toMap ++
agg.auxGrouping.zipWithIndex.map {
......
......@@ -126,7 +126,7 @@ class BatchExecPythonGroupWindowAggregate(
val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window)
val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window)
val groupBufferLimitSize = planner.getTableConfig.getConfiguration.getInteger(
ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)
......@@ -137,8 +137,8 @@ class BatchExecPythonGroupWindowAggregate(
outputType,
inputTimeFieldIndex,
groupBufferLimitSize,
windowSize,
slideSize,
windowSizeAndSlideSize.f0,
windowSizeAndSlideSize.f1,
getConfig(planner.getExecEnv, planner.getTableConfig))
if (isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) {
......
......@@ -103,13 +103,13 @@ abstract class BatchExecSortWindowAggregateBase(
val groupBufferLimitSize = planner.getTableConfig.getConfiguration.getInteger(
ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)
val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window)
val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window)
val generator = new SortWindowCodeGenerator(
ctx, planner.getRelBuilder, window, inputTimeFieldIndex,
inputTimeIsDate, namedWindowProperties,
aggInfos, inputRowType, inputType, outputType,
groupBufferLimitSize, 0L, windowSize, slideSize,
aggInfos, inputType, inputType, outputType,
groupBufferLimitSize, 0L, windowSizeAndSlideSize.f0, windowSizeAndSlideSize.f1,
grouping, auxGrouping, enableAssignPane, isMerge, isFinal)
val generatedOperator = if (grouping.isEmpty) {
generator.genWithoutKeys()
......
......@@ -20,8 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.logical.LogicalWindow
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashWindowAggregate
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
......@@ -30,9 +32,10 @@ import org.apache.calcite.rel.core.AggregateCall
import java.util
import scala.collection.JavaConversions._
class BatchExecHashWindowAggregate(
/**
* Batch physical RelNode for (global) hash-based window aggregate.
*/
class BatchPhysicalHashWindowAggregate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
......@@ -47,25 +50,22 @@ class BatchExecHashWindowAggregate(
namedWindowProperties: Seq[PlannerNamedWindowProperty],
enableAssignPane: Boolean = false,
isMerge: Boolean)
extends BatchExecHashWindowAggregateBase(
extends BatchPhysicalHashWindowAggregateBase(
cluster,
traitSet,
inputRel,
outputRowType,
aggInputRowType,
grouping,
auxGrouping,
aggCallToAggFunction,
window,
inputTimeFieldIndex,
inputTimeIsDate,
namedWindowProperties,
enableAssignPane,
isMerge,
isFinal = true) {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new BatchExecHashWindowAggregate(
new BatchPhysicalHashWindowAggregate(
cluster,
traitSet,
inputs.get(0),
......@@ -82,10 +82,22 @@ class BatchExecHashWindowAggregate(
isMerge)
}
//~ ExecNode methods -----------------------------------------------------------
override def getInputEdges: util.List[ExecEdge] = List(
ExecEdge.builder()
.damBehavior(ExecEdge.DamBehavior.END_INPUT)
.build())
override def translateToExecNode(): ExecNode[_] = {
new BatchExecHashWindowAggregate(
grouping,
auxGrouping,
getAggCallList.toArray,
window,
inputTimeFieldIndex,
inputTimeIsDate,
namedWindowProperties.toArray,
FlinkTypeFactory.toLogicalRowType(aggInputRowType),
enableAssignPane,
isMerge,
true, // isFinal is always true
ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).build(),
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
......@@ -18,44 +18,31 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.MemorySize
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.RowData
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.CodeGeneratorContext
import org.apache.flink.table.planner.codegen.agg.batch.{HashWindowCodeGenerator, WindowCodeGenerator}
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.logical.LogicalWindow
import org.apache.flink.table.planner.plan.nodes.exec.LegacyBatchExecNode
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.table.runtime.util.collections.binary.BytesMap
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.table.runtime.util.collections.binary.BytesMap
abstract class BatchExecHashWindowAggregateBase(
/**
* Batch physical RelNode for hash-based window aggregate.
*/
abstract class BatchPhysicalHashWindowAggregateBase(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
outputRowType: RelDataType,
aggInputRowType: RelDataType,
grouping: Array[Int],
auxGrouping: Array[Int],
aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
window: LogicalWindow,
inputTimeFieldIndex: Int,
inputTimeIsDate: Boolean,
namedWindowProperties: Seq[PlannerNamedWindowProperty],
enableAssignPane: Boolean = false,
isMerge: Boolean,
......@@ -72,8 +59,7 @@ abstract class BatchExecHashWindowAggregateBase(
namedWindowProperties,
enableAssignPane,
isMerge,
isFinal)
with LegacyBatchExecNode[RowData] {
isFinal) {
override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val numOfGroupKey = grouping.length
......@@ -96,43 +82,4 @@ abstract class BatchExecHashWindowAggregateBase(
val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory]
costFactory.makeCost(rowCnt, hashCpuCost + aggFunctionCpuCost, 0, 0, memCost)
}
//~ ExecNode methods -----------------------------------------------------------
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[RowData] = {
val config = planner.getTableConfig
val input = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val ctx = CodeGeneratorContext(config)
val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
val inputRowType = getInput.getRowType
val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
val aggInfos = transformToBatchAggregateInfoList(
FlinkTypeFactory.toLogicalRowType(aggInputRowType), aggCallToAggFunction.map(_._1))
val groupBufferLimitSize = config.getConfiguration.getInteger(
ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)
val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window)
val generatedOperator = new HashWindowCodeGenerator(
ctx, planner.getRelBuilder, window, inputTimeFieldIndex,
inputTimeIsDate, namedWindowProperties,
aggInfos, inputRowType, grouping, auxGrouping, enableAssignPane, isMerge, isFinal).gen(
inputType, outputType, groupBufferLimitSize, 0,
windowSize, slideSize)
val operator = new CodeGenOperatorFactory[RowData](generatedOperator)
val managedMemory = MemorySize.parse(config.getConfiguration.getString(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY)).getBytes
ExecNodeUtil.createOneInputTransformation(
input,
getRelDetailedDescription,
operator,
InternalTypeInfo.of(outputType),
input.getParallelism,
managedMemory)
}
}
......@@ -20,8 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.logical.LogicalWindow
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashWindowAggregate
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
......@@ -30,9 +32,10 @@ import org.apache.calcite.rel.core.AggregateCall
import java.util
import scala.collection.JavaConversions._
class BatchExecLocalHashWindowAggregate(
/**
* Batch physical RelNode for local hash-based window aggregate.
*/
class BatchPhysicalLocalHashWindowAggregate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
......@@ -46,25 +49,22 @@ class BatchExecLocalHashWindowAggregate(
inputTimeIsDate: Boolean,
namedWindowProperties: Seq[PlannerNamedWindowProperty],
enableAssignPane: Boolean = false)
extends BatchExecHashWindowAggregateBase(
extends BatchPhysicalHashWindowAggregateBase(
cluster,
traitSet,
inputRel,
outputRowType,
inputRowType,
grouping,
auxGrouping,
aggCallToAggFunction,
window,
inputTimeFieldIndex,
inputTimeIsDate,
namedWindowProperties,
enableAssignPane,
isMerge = false,
isFinal = false) {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new BatchExecLocalHashWindowAggregate(
new BatchPhysicalLocalHashWindowAggregate(
cluster,
traitSet,
inputs.get(0),
......@@ -80,7 +80,22 @@ class BatchExecLocalHashWindowAggregate(
enableAssignPane)
}
//~ ExecNode methods -----------------------------------------------------------
override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
override def translateToExecNode(): ExecNode[_] = {
new BatchExecHashWindowAggregate(
grouping,
auxGrouping,
getAggCallList.toArray,
window,
inputTimeFieldIndex,
inputTimeIsDate,
namedWindowProperties.toArray,
FlinkTypeFactory.toLogicalRowType(inputRowType),
enableAssignPane,
false, // isMerge is always false
false, // isFinal is always false
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
......@@ -423,7 +423,7 @@ object FlinkBatchRuleSets {
// over agg
BatchExecOverAggregateRule.INSTANCE,
// window agg
BatchExecWindowAggregateRule.INSTANCE,
BatchPhysicalWindowAggregateRule.INSTANCE,
BatchExecPythonWindowAggregateRule.INSTANCE,
// join
BatchExecHashJoinRule.INSTANCE,
......
......@@ -61,17 +61,17 @@ class BatchPhysicalSortLimitRule
override def convert(rel: RelNode): RelNode = {
val sort = rel.asInstanceOf[FlinkLogicalSort]
// create local BatchExecSortLimit
// create local BatchPhysicalSortLimit
val localRequiredTrait = sort.getInput.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
val localInput = RelOptRule.convert(sort.getInput, localRequiredTrait)
// if fetch is null, there is no need to create local BatchExecSortLimit
// if fetch is null, there is no need to create local BatchPhysicalSortLimit
val inputOfExchange = if (sort.fetch != null) {
val limit = SortUtil.getLimitEnd(sort.offset, sort.fetch)
val rexBuilder = sort.getCluster.getRexBuilder
val intType = rexBuilder.getTypeFactory.createSqlType(SqlTypeName.INTEGER)
val providedLocalTraitSet = localRequiredTrait.replace(sort.getCollation)
// for local BatchExecSortLimit, offset is always 0, and fetch is `limit`
// for local BatchPhysicalSortLimit, offset is always 0, and fetch is `limit`
new BatchPhysicalSortLimit(
rel.getCluster,
providedLocalTraitSet,
......@@ -90,7 +90,7 @@ class BatchPhysicalSortLimitRule
.replace(FlinkRelDistribution.SINGLETON)
val newInput = RelOptRule.convert(inputOfExchange, requiredTrait)
// create global BatchExecSortLimit
// create global BatchPhysicalSortLimit
val providedGlobalTraitSet = requiredTrait.replace(sort.getCollation)
new BatchPhysicalSortLimit(
rel.getCluster,
......
......@@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow}
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecHashWindowAggregate, BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchExecSortWindowAggregate}
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate, BatchExecSortWindowAggregate, BatchPhysicalHashWindowAggregate, BatchPhysicalLocalHashWindowAggregate}
import org.apache.flink.table.planner.plan.utils.AggregateUtil
import org.apache.flink.table.planner.plan.utils.AggregateUtil.hasTimeIntervalType
import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
......@@ -48,15 +48,15 @@ import scala.collection.JavaConversions._
/**
* Rule to convert a [[FlinkLogicalWindowAggregate]] into a
* {{{
* BatchExecHash(or Sort)WindowAggregate (global)
* BatchPhysicalHash(or Sort)WindowAggregate (global)
* +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
* +- BatchExecLocalHash(or Sort)WindowAggregate (local)
* +- BatchPhysicalLocalHash(or Sort)WindowAggregate (local)
* +- input of window agg
* }}}
* when all aggregate functions are mergeable
* and [[OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY]] is TWO_PHASE, or
* {{{
* BatchExecHash(or Sort)WindowAggregate
* BatchPhysicalHash(or Sort)WindowAggregate
* +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
* +- input of window agg
* }}}
......@@ -67,12 +67,12 @@ import scala.collection.JavaConversions._
* this rule will try to create two possibilities above, and chooses the best one based on cost.
* if all aggregate function buffer are fix length, the rule will choose hash window agg.
*/
class BatchExecWindowAggregateRule
class BatchPhysicalWindowAggregateRule
extends RelOptRule(
operand(classOf[FlinkLogicalWindowAggregate],
operand(classOf[RelNode], any)),
FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE,
"BatchExecWindowAggregateRule")
"BatchPhysicalWindowAggregateRule")
with BatchPhysicalAggRuleBase {
override def matches(call: RelOptRuleCall): Boolean = {
......@@ -163,7 +163,7 @@ class BatchExecWindowAggregateRule
input.getRowType, call.builder(), window.timeAttribute)
val inputTimeFieldType = agg.getInput.getRowType.getFieldList.get(inputTimeFieldIndex).getType
val inputTimeIsDate = inputTimeFieldType.getSqlTypeName == SqlTypeName.DATE
// local-agg output order: groupset | assignTs | aucGroupSet | aggCalls
// local-agg output order: groupSet | assignTs | auxGroupSet | aggCalls
val newInputTimeFieldIndexFromLocal = groupSet.length
val config = input.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
......@@ -180,7 +180,7 @@ class BatchExecWindowAggregateRule
val newLocalInput = RelOptRule.convert(input, localRequiredTraitSet)
val localProvidedTraitSet = localRequiredTraitSet
new BatchExecLocalHashWindowAggregate(
new BatchPhysicalLocalHashWindowAggregate(
agg.getCluster,
localProvidedTraitSet,
newLocalInput,
......@@ -234,7 +234,7 @@ class BatchExecWindowAggregateRule
// hash
val newGlobalAggInput = RelOptRule.convert(localAgg, globalRequiredTraitSet)
new BatchExecHashWindowAggregate(
new BatchPhysicalHashWindowAggregate(
agg.getCluster,
aggProvidedTraitSet,
newGlobalAggInput,
......@@ -293,7 +293,7 @@ class BatchExecWindowAggregateRule
// case 2: Sliding window without pane optimization
val newInput = RelOptRule.convert(input, requiredTraitSet)
new BatchExecHashWindowAggregate(
new BatchPhysicalHashWindowAggregate(
agg.getCluster,
aggProvidedTraitSet,
newInput,
......@@ -430,6 +430,6 @@ class BatchExecWindowAggregateRule
}
}
object BatchExecWindowAggregateRule {
val INSTANCE: RelOptRule = new BatchExecWindowAggregateRule
object BatchPhysicalWindowAggregateRule {
val INSTANCE: RelOptRule = new BatchPhysicalWindowAggregateRule
}
......@@ -49,7 +49,7 @@ class RemoveRedundantLocalHashAggRule extends RelOptRule(
localAgg.grouping,
localAgg.auxGrouping,
// Use the localAgg agg calls because the global agg call filters was removed,
// see BatchExecHashAggRule for details.
// see BatchPhysicalHashAggRule for details.
localAgg.getAggCallToAggFunction,
isMerge = false)
call.transformTo(newGlobalAgg)
......
......@@ -48,7 +48,7 @@ abstract class RemoveRedundantLocalSortAggRule(
localAgg.grouping,
localAgg.auxGrouping,
// Use the localAgg agg calls because the global agg call filters was removed,
// see BatchExecSortAggRule for details.
// see BatchPhysicalSortAggRule for details.
localAgg.getAggCallToAggFunction,
isMerge = false)
call.transformTo(newGlobalAgg)
......
......@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.JDouble
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalWindowAggregateBase}
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, BatchPhysicalWindowAggregateBase}
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankRange}
import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.LENGTH_SIZE_IN_BYTES
......@@ -334,7 +334,7 @@ object FlinkRelMdUtil {
// grouping + assignTs + auxGrouping
(agg.getAggCallList,
agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping)
case agg: BatchExecLocalHashWindowAggregate =>
case agg: BatchPhysicalLocalHashWindowAggregate =>
// grouping + assignTs + auxGrouping
(agg.getAggCallList,
agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping)
......@@ -388,7 +388,7 @@ object FlinkRelMdUtil {
}
/**
* Split groupKeys on Aggregate/ BatchExecGroupAggregateBase/ BatchExecWindowAggregateBase
* Split groupKeys on Aggregate/ BatchPhysicalGroupAggregateBase/ BatchPhysicalWindowAggregateBase
* into keys on aggregate's groupKey and aggregate's aggregateCalls.
*
* @param agg the aggregate
......
......@@ -1244,7 +1244,7 @@ class FlinkRelMdHandlerTestBase {
Array("count$0")).toList // agg calls
val localWindowAggRowType = typeFactory.createStructType(
localWindowAggTypes, localWindowAggNames)
val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate(
val batchLocalWindowAgg = new BatchPhysicalLocalHashWindowAggregate(
batchCalc.getCluster,
batchPhysicalTraits,
batchCalc,
......@@ -1260,7 +1260,7 @@ class FlinkRelMdHandlerTestBase {
enableAssignPane = false)
val batchExchange2 = new BatchPhysicalExchange(
cluster, batchPhysicalTraits.replace(hash01), batchLocalWindowAgg, hash01)
val batchWindowAggWithLocal = new BatchExecHashWindowAggregate(
val batchWindowAggWithLocal = new BatchPhysicalHashWindowAggregate(
cluster,
batchPhysicalTraits,
batchExchange2,
......@@ -1277,7 +1277,7 @@ class FlinkRelMdHandlerTestBase {
isMerge = true
)
val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate(
val batchWindowAggWithoutLocal = new BatchPhysicalHashWindowAggregate(
batchExchange1.getCluster,
batchPhysicalTraits,
batchExchange1,
......@@ -1383,7 +1383,7 @@ class FlinkRelMdHandlerTestBase {
Array("count$0")).toList // agg calls
val localWindowAggRowType = typeFactory.createStructType(
localWindowAggTypes, localWindowAggNames)
val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate(
val batchLocalWindowAgg = new BatchPhysicalLocalHashWindowAggregate(
batchCalc.getCluster,
batchPhysicalTraits,
batchCalc,
......@@ -1399,7 +1399,7 @@ class FlinkRelMdHandlerTestBase {
enableAssignPane = false)
val batchExchange2 = new BatchPhysicalExchange(
cluster, batchPhysicalTraits.replace(hash1), batchLocalWindowAgg, hash1)
val batchWindowAggWithLocal = new BatchExecHashWindowAggregate(
val batchWindowAggWithLocal = new BatchPhysicalHashWindowAggregate(
cluster,
batchPhysicalTraits,
batchExchange2,
......@@ -1416,7 +1416,7 @@ class FlinkRelMdHandlerTestBase {
isMerge = true
)
val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate(
val batchWindowAggWithoutLocal = new BatchPhysicalHashWindowAggregate(
batchExchange1.getCluster,
batchPhysicalTraits,
batchExchange1,
......@@ -1527,7 +1527,7 @@ class FlinkRelMdHandlerTestBase {
Array("count$0")).toList // agg calls
val localWindowAggRowType = typeFactory.createStructType(
localWindowAggTypes, localWindowAggNames)
val batchLocalWindowAggWithAuxGroup = new BatchExecLocalHashWindowAggregate(
val batchLocalWindowAggWithAuxGroup = new BatchPhysicalLocalHashWindowAggregate(
batchCalc.getCluster,
batchPhysicalTraits,
batchCalc,
......@@ -1543,7 +1543,7 @@ class FlinkRelMdHandlerTestBase {
enableAssignPane = false)
val batchExchange2 = new BatchPhysicalExchange(
cluster, batchPhysicalTraits.replace(hash0), batchLocalWindowAggWithAuxGroup, hash0)
val batchWindowAggWithLocalWithAuxGroup = new BatchExecHashWindowAggregate(
val batchWindowAggWithLocalWithAuxGroup = new BatchPhysicalHashWindowAggregate(
cluster,
batchPhysicalTraits,
batchExchange2,
......@@ -1560,7 +1560,7 @@ class FlinkRelMdHandlerTestBase {
isMerge = true
)
val batchWindowAggWithoutLocalWithAuxGroup = new BatchExecHashWindowAggregate(
val batchWindowAggWithoutLocalWithAuxGroup = new BatchPhysicalHashWindowAggregate(
batchExchange1.getCluster,
batchPhysicalTraits,
batchExchange1,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册