diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java new file mode 100644 index 0000000000000000000000000000000000000000..d8b147dcdd1819fc669b20dea0d920a6adf83171 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty; +import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.agg.batch.HashWindowCodeGenerator; +import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.runtime.generated.GeneratedOperator; +import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rel.core.AggregateCall; + +import java.util.Arrays; +import java.util.Collections; + +/** Batch {@link ExecNode} for hash-based window aggregate operator. */ +public class BatchExecHashWindowAggregate extends ExecNodeBase + implements BatchExecNode { + + private final int[] grouping; + private final int[] auxGrouping; + private final AggregateCall[] aggCalls; + private final LogicalWindow window; + private final int inputTimeFieldIndex; + private final boolean inputTimeIsDate; + private final PlannerNamedWindowProperty[] namedWindowProperties; + private final RowType aggInputRowType; + private final boolean enableAssignPane; + private final boolean isMerge; + private final boolean isFinal; + + public BatchExecHashWindowAggregate( + int[] grouping, + int[] auxGrouping, + AggregateCall[] aggCalls, + LogicalWindow window, + int inputTimeFieldIndex, + boolean inputTimeIsDate, + PlannerNamedWindowProperty[] namedWindowProperties, + RowType aggInputRowType, + boolean enableAssignPane, + boolean isMerge, + boolean isFinal, + ExecEdge inputEdge, + RowType outputType, + String description) { + super(Collections.singletonList(inputEdge), outputType, description); + this.grouping = grouping; + this.auxGrouping = auxGrouping; + this.aggCalls = aggCalls; + this.window = window; + this.inputTimeFieldIndex = inputTimeFieldIndex; + this.inputTimeIsDate = inputTimeIsDate; + this.namedWindowProperties = namedWindowProperties; + this.aggInputRowType = aggInputRowType; + this.enableAssignPane = enableAssignPane; + this.isMerge = isMerge; + this.isFinal = isFinal; + } + + @SuppressWarnings("unchecked") + @Override + protected Transformation translateToPlanInternal(PlannerBase planner) { + final ExecNode inputNode = (ExecNode) getInputNodes().get(0); + final Transformation inputTransform = inputNode.translateToPlan(planner); + + final AggregateInfoList aggInfos = + AggregateUtil.transformToBatchAggregateInfoList( + aggInputRowType, + JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)), + null, + null); + final TableConfig tableConfig = planner.getTableConfig(); + final RowType inputRowType = (RowType) inputNode.getOutputType(); + final HashWindowCodeGenerator hashWindowCodeGenerator = + new HashWindowCodeGenerator( + new CodeGeneratorContext(tableConfig), + planner.getRelBuilder(), + window, + inputTimeFieldIndex, + inputTimeIsDate, + JavaScalaConversionUtil.toScala(Arrays.asList(namedWindowProperties)), + aggInfos, + inputRowType, + grouping, + auxGrouping, + enableAssignPane, + isMerge, + isFinal); + final int groupBufferLimitSize = + tableConfig + .getConfiguration() + .getInteger(ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT); + final Tuple2 windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window); + final GeneratedOperator> generatedOperator = + hashWindowCodeGenerator.gen( + inputRowType, + (RowType) getOutputType(), + groupBufferLimitSize, + 0, // windowStart + windowSizeAndSlideSize.f0, + windowSizeAndSlideSize.f1); + + final long managedMemory = + ExecNodeUtil.getMemorySize( + tableConfig, ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY); + return ExecNodeUtil.createOneInputTransformation( + inputTransform, + getDesc(), + new CodeGenOperatorFactory<>(generatedOperator), + InternalTypeInfo.of(getOutputType()), + inputTransform.getParallelism(), + managedMemory); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala index a1d0a653784b7fe0234beed6535c494fa0605f4f..76de099653674d81996a9f2f2c491c6df37f5ba0 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala @@ -72,7 +72,7 @@ class HashWindowCodeGenerator( inputTimeIsDate: Boolean, namedProperties: Seq[PlannerNamedWindowProperty], aggInfoList: AggregateInfoList, - inputRowType: RelDataType, + inputRowType: RowType, grouping: Array[Int], auxGrouping: Array[Int], enableAssignPane: Boolean = true, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala index 3677b415e408b4f674b1382e6bda1bf96406ac1b..bce1f569e736d25899a784f6348c49822f2b8343 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala @@ -62,7 +62,7 @@ class SortWindowCodeGenerator( inputTimeIsDate: Boolean, namedProperties: Seq[PlannerNamedWindowProperty], aggInfoList: AggregateInfoList, - inputRowType: RelDataType, + inputRowType: RowType, inputType: RowType, outputType: RowType, buffLimitSize: Int, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala index b28c8130d1084a2bfaa64c9836cacf5e1aa9c6a0..f30568c4123e166cc16d1df8f8eb5aaec6519e10 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala @@ -18,11 +18,7 @@ package org.apache.flink.table.planner.codegen.agg.batch -import org.apache.calcite.avatica.util.DateTimeUtils -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.tools.RelBuilder -import org.apache.commons.math3.util.ArithmeticUtils +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.table.api.DataTypes import org.apache.flink.table.data.binary.BinaryRowData import org.apache.flink.table.data.utils.JoinedRowData @@ -52,6 +48,11 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME import org.apache.flink.table.types.logical._ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot +import org.apache.calcite.avatica.util.DateTimeUtils +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.tools.RelBuilder +import org.apache.commons.math3.util.ArithmeticUtils + import scala.collection.JavaConversions._ abstract class WindowCodeGenerator( @@ -61,14 +62,15 @@ abstract class WindowCodeGenerator( inputTimeIsDate: Boolean, namedProperties: Seq[PlannerNamedWindowProperty], aggInfoList: AggregateInfoList, - inputRowType: RelDataType, + inputRowType: RowType, grouping: Array[Int], auxGrouping: Array[Int], enableAssignPane: Boolean = true, val isMerge: Boolean, val isFinal: Boolean) { - protected lazy val builder: RelBuilder = relBuilder.values(inputRowType) + protected lazy val builder: RelBuilder = relBuilder.values( + FlinkTypeFactory.INSTANCE.buildRelNodeRowType(inputRowType)) protected lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos @@ -79,14 +81,12 @@ abstract class WindowCodeGenerator( AggCodeGenHelper.getAggBufferNames(auxGrouping, aggInfos) protected lazy val aggBufferTypes: Array[Array[LogicalType]] = AggCodeGenHelper.getAggBufferTypes( - inputType, + inputRowType, auxGrouping, aggInfos) - protected lazy val groupKeyRowType: RowType = AggCodeGenHelper.projectRowType(inputType, grouping) - - private lazy val inputType: RowType = - FlinkTypeFactory.toLogicalType(inputRowType).asInstanceOf[RowType] + protected lazy val groupKeyRowType: RowType = + AggCodeGenHelper.projectRowType(inputRowType, grouping) protected lazy val timestampInternalType: LogicalType = if (inputTimeIsDate) new IntType() else new BigIntType() @@ -116,7 +116,7 @@ abstract class WindowCodeGenerator( (groupKeyTypes :+ timestampInternalType) ++ aggBuffTypes, ((groupKeyNames :+ "assignedTs$") ++ aggBuffNames).toArray) } else { - FlinkTypeFactory.toLogicalRowType(inputRowType) + inputRowType } } @@ -680,7 +680,8 @@ abstract class WindowCodeGenerator( remainder)), literal(index * slideSize)) exprCodegen.generateExpression(new CallExpressionResolver(relBuilder).resolve(expr).accept( - new ExpressionConverter(relBuilder.values(inputRowType)))) + new ExpressionConverter( + relBuilder.values(FlinkTypeFactory.INSTANCE.buildRelNodeRowType(inputRowType))))) } def getGrouping: Array[Int] = grouping @@ -726,7 +727,7 @@ abstract class WindowCodeGenerator( object WindowCodeGenerator { - def getWindowDef(window: LogicalWindow): (Long, Long) = { + def getWindowDef(window: LogicalWindow): JTuple2[JLong, JLong] = { val (windowSize, slideSize): (Long, Long) = window match { case TumblingGroupWindow(_, _, size) if isTimeIntervalLiteral(size) => (asLong(size), asLong(size)) @@ -736,7 +737,7 @@ object WindowCodeGenerator { // count tumbling/sliding window and session window not supported now throw new UnsupportedOperationException(s"Window $window is not supported right now.") } - (windowSize, slideSize) + new JTuple2[JLong, JLong](windowSize, slideSize) } def asLong(expr: Expression): Long = extractValue(expr, classOf[JLong]).get() diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala index acc90d5a03fd4a754e539a350066ba4bbb9cb003..26eba34e94b6cee6759415fa1bf4dfbbc151b9fd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.metadata import org.apache.flink.table.planner.JDouble -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalWindowAggregateBase} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, BatchPhysicalWindowAggregateBase} import org.apache.flink.table.planner.plan.stats._ import org.apache.flink.table.planner.plan.utils.AggregateUtil @@ -62,7 +62,7 @@ class AggCallSelectivityEstimator(agg: RelNode, mq: FlinkRelMetadataQuery) (rel.getGroupSet.toArray ++ auxGroupSet, otherAggCalls) case rel: BatchPhysicalGroupAggregateBase => (rel.grouping ++ rel.auxGrouping, rel.getAggCallList) - case rel: BatchExecLocalHashWindowAggregate => + case rel: BatchPhysicalLocalHashWindowAggregate => val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ rel.auxGrouping (fullGrouping, rel.getAggCallList) case rel: BatchExecLocalSortWindowAggregate => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala index 4b80a55c16023025a1c4550286035a316cad6511..b1eec167efb8b97efa2eaa1458386c79810e591c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala @@ -545,7 +545,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] { case agg: BatchExecLocalSortWindowAggregate => // grouping + assignTs + auxGrouping agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping - case agg: BatchExecLocalHashWindowAggregate => + case agg: BatchPhysicalLocalHashWindowAggregate => // grouping + assignTs + auxGrouping agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping case agg: BatchPhysicalWindowAggregateBase => agg.grouping ++ agg.auxGrouping diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala index e4082030523efa3e55b53e4e06fe2668ef0aeb3e..199e5d4d57f3e07efddba56a03e6021af3e08dca 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala @@ -213,7 +213,7 @@ class FlinkRelMdSize private extends MetadataHandler[BuiltInMetadata.Size] { val mapInputToOutput: Map[Int, Int] = windowAgg match { case agg: WindowAggregate => AggregateUtil.checkAndGetFullGroupSet(agg).zipWithIndex.toMap - case agg: BatchExecLocalHashWindowAggregate => + case agg: BatchPhysicalLocalHashWindowAggregate => // local win-agg output type: grouping + assignTs + auxGrouping + aggCalls agg.grouping.zipWithIndex.toMap ++ agg.auxGrouping.zipWithIndex.map { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala index ba84b24db9f6c61d552084d4a3015cf69e8af00d..a5e5d59702c691211e722cdd96b3ed0a74ee593c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala @@ -126,7 +126,7 @@ class BatchExecPythonGroupWindowAggregate( val outputType = FlinkTypeFactory.toLogicalRowType(getRowType) val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType) - val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window) + val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window) val groupBufferLimitSize = planner.getTableConfig.getConfiguration.getInteger( ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT) @@ -137,8 +137,8 @@ class BatchExecPythonGroupWindowAggregate( outputType, inputTimeFieldIndex, groupBufferLimitSize, - windowSize, - slideSize, + windowSizeAndSlideSize.f0, + windowSizeAndSlideSize.f1, getConfig(planner.getExecEnv, planner.getTableConfig)) if (isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala index 5ad4827935c42160c386f3c605967fc375e15bf9..2395de8a2704461921be71df7f6dec076445b4e2 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala @@ -103,13 +103,13 @@ abstract class BatchExecSortWindowAggregateBase( val groupBufferLimitSize = planner.getTableConfig.getConfiguration.getInteger( ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT) - val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window) + val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window) val generator = new SortWindowCodeGenerator( ctx, planner.getRelBuilder, window, inputTimeFieldIndex, inputTimeIsDate, namedWindowProperties, - aggInfos, inputRowType, inputType, outputType, - groupBufferLimitSize, 0L, windowSize, slideSize, + aggInfos, inputType, inputType, outputType, + groupBufferLimitSize, 0L, windowSizeAndSlideSize.f0, windowSizeAndSlideSize.f1, grouping, auxGrouping, enableAssignPane, isMerge, isFinal) val generatedOperator = if (grouping.isEmpty) { generator.genWithoutKeys() diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregate.scala similarity index 71% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregate.scala index f3ce9fec95bcf703e33fcdc00cde494c5e73a1e7..6ec67fac2f49264e520a8845699f5946722cbf99 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregate.scala @@ -20,8 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashWindowAggregate import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode @@ -30,9 +32,10 @@ import org.apache.calcite.rel.core.AggregateCall import java.util -import scala.collection.JavaConversions._ - -class BatchExecHashWindowAggregate( +/** + * Batch physical RelNode for (global) hash-based window aggregate. + */ +class BatchPhysicalHashWindowAggregate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -47,25 +50,22 @@ class BatchExecHashWindowAggregate( namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean) - extends BatchExecHashWindowAggregateBase( + extends BatchPhysicalHashWindowAggregateBase( cluster, traitSet, inputRel, outputRowType, - aggInputRowType, grouping, auxGrouping, aggCallToAggFunction, window, - inputTimeFieldIndex, - inputTimeIsDate, namedWindowProperties, enableAssignPane, isMerge, isFinal = true) { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchExecHashWindowAggregate( + new BatchPhysicalHashWindowAggregate( cluster, traitSet, inputs.get(0), @@ -82,10 +82,22 @@ class BatchExecHashWindowAggregate( isMerge) } - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List( - ExecEdge.builder() - .damBehavior(ExecEdge.DamBehavior.END_INPUT) - .build()) + override def translateToExecNode(): ExecNode[_] = { + new BatchExecHashWindowAggregate( + grouping, + auxGrouping, + getAggCallList.toArray, + window, + inputTimeFieldIndex, + inputTimeIsDate, + namedWindowProperties.toArray, + FlinkTypeFactory.toLogicalRowType(aggInputRowType), + enableAssignPane, + isMerge, + true, // isFinal is always true + ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).build(), + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregateBase.scala similarity index 54% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregateBase.scala index 92bfeca79e447a2fc8818850cb14475b19362fa3..06f6fea480a9339f7da27728a308a35cac7f0d9b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregateBase.scala @@ -18,44 +18,31 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch -import org.apache.flink.api.dag.Transformation -import org.apache.flink.configuration.MemorySize -import org.apache.flink.table.api.config.ExecutionConfigOptions -import org.apache.flink.table.data.RowData import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty -import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.codegen.CodeGeneratorContext -import org.apache.flink.table.planner.codegen.agg.batch.{HashWindowCodeGenerator, WindowCodeGenerator} -import org.apache.flink.table.planner.delegation.BatchPlanner import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory} import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.exec.LegacyBatchExecNode -import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil -import org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToBatchAggregateInfoList import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil -import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo +import org.apache.flink.table.runtime.util.collections.binary.BytesMap + import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.tools.RelBuilder -import org.apache.flink.table.runtime.util.collections.binary.BytesMap -abstract class BatchExecHashWindowAggregateBase( +/** + * Batch physical RelNode for hash-based window aggregate. + */ +abstract class BatchPhysicalHashWindowAggregateBase( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, - aggInputRowType: RelDataType, grouping: Array[Int], auxGrouping: Array[Int], aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)], window: LogicalWindow, - inputTimeFieldIndex: Int, - inputTimeIsDate: Boolean, namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean, @@ -72,8 +59,7 @@ abstract class BatchExecHashWindowAggregateBase( namedWindowProperties, enableAssignPane, isMerge, - isFinal) - with LegacyBatchExecNode[RowData] { + isFinal) { override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { val numOfGroupKey = grouping.length @@ -96,43 +82,4 @@ abstract class BatchExecHashWindowAggregateBase( val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory] costFactory.makeCost(rowCnt, hashCpuCost + aggFunctionCpuCost, 0, 0, memCost) } - - //~ ExecNode methods ----------------------------------------------------------- - - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { - val config = planner.getTableConfig - val input = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val ctx = CodeGeneratorContext(config) - val outputType = FlinkTypeFactory.toLogicalRowType(getRowType) - val inputRowType = getInput.getRowType - val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType) - - val aggInfos = transformToBatchAggregateInfoList( - FlinkTypeFactory.toLogicalRowType(aggInputRowType), aggCallToAggFunction.map(_._1)) - - val groupBufferLimitSize = config.getConfiguration.getInteger( - ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT) - - val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window) - - val generatedOperator = new HashWindowCodeGenerator( - ctx, planner.getRelBuilder, window, inputTimeFieldIndex, - inputTimeIsDate, namedWindowProperties, - aggInfos, inputRowType, grouping, auxGrouping, enableAssignPane, isMerge, isFinal).gen( - inputType, outputType, groupBufferLimitSize, 0, - windowSize, slideSize) - val operator = new CodeGenOperatorFactory[RowData](generatedOperator) - - val managedMemory = MemorySize.parse(config.getConfiguration.getString( - ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY)).getBytes - ExecNodeUtil.createOneInputTransformation( - input, - getRelDetailedDescription, - operator, - InternalTypeInfo.of(outputType), - input.getParallelism, - managedMemory) - } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalHashWindowAggregate.scala similarity index 70% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalHashWindowAggregate.scala index 2e19199cba051a8fd0d4f28abd3de7d7b441ed1c..937f941aa47d9b32fe752af09a54d8242917ec05 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalHashWindowAggregate.scala @@ -20,8 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashWindowAggregate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode @@ -30,9 +32,10 @@ import org.apache.calcite.rel.core.AggregateCall import java.util -import scala.collection.JavaConversions._ - -class BatchExecLocalHashWindowAggregate( +/** + * Batch physical RelNode for local hash-based window aggregate. + */ +class BatchPhysicalLocalHashWindowAggregate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -46,25 +49,22 @@ class BatchExecLocalHashWindowAggregate( inputTimeIsDate: Boolean, namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false) - extends BatchExecHashWindowAggregateBase( + extends BatchPhysicalHashWindowAggregateBase( cluster, traitSet, inputRel, outputRowType, - inputRowType, grouping, auxGrouping, aggCallToAggFunction, window, - inputTimeFieldIndex, - inputTimeIsDate, namedWindowProperties, enableAssignPane, isMerge = false, isFinal = false) { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchExecLocalHashWindowAggregate( + new BatchPhysicalLocalHashWindowAggregate( cluster, traitSet, inputs.get(0), @@ -80,7 +80,22 @@ class BatchExecLocalHashWindowAggregate( enableAssignPane) } - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT) + override def translateToExecNode(): ExecNode[_] = { + new BatchExecHashWindowAggregate( + grouping, + auxGrouping, + getAggCallList.toArray, + window, + inputTimeFieldIndex, + inputTimeIsDate, + namedWindowProperties.toArray, + FlinkTypeFactory.toLogicalRowType(inputRowType), + enableAssignPane, + false, // isMerge is always false + false, // isFinal is always false + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 1f5809ee6cce690114822b2f0cc4f6f449cc7b82..bd25c16140c3982fe52628a80a31693f6892cef7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -423,7 +423,7 @@ object FlinkBatchRuleSets { // over agg BatchExecOverAggregateRule.INSTANCE, // window agg - BatchExecWindowAggregateRule.INSTANCE, + BatchPhysicalWindowAggregateRule.INSTANCE, BatchExecPythonWindowAggregateRule.INSTANCE, // join BatchExecHashJoinRule.INSTANCE, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala index ef7c0ba71c0c91f6b1a38c049556d6314c956a34..9296afdb638a376b06e89092ed25d88188784844 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala @@ -61,17 +61,17 @@ class BatchPhysicalSortLimitRule override def convert(rel: RelNode): RelNode = { val sort = rel.asInstanceOf[FlinkLogicalSort] - // create local BatchExecSortLimit + // create local BatchPhysicalSortLimit val localRequiredTrait = sort.getInput.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) val localInput = RelOptRule.convert(sort.getInput, localRequiredTrait) - // if fetch is null, there is no need to create local BatchExecSortLimit + // if fetch is null, there is no need to create local BatchPhysicalSortLimit val inputOfExchange = if (sort.fetch != null) { val limit = SortUtil.getLimitEnd(sort.offset, sort.fetch) val rexBuilder = sort.getCluster.getRexBuilder val intType = rexBuilder.getTypeFactory.createSqlType(SqlTypeName.INTEGER) val providedLocalTraitSet = localRequiredTrait.replace(sort.getCollation) - // for local BatchExecSortLimit, offset is always 0, and fetch is `limit` + // for local BatchPhysicalSortLimit, offset is always 0, and fetch is `limit` new BatchPhysicalSortLimit( rel.getCluster, providedLocalTraitSet, @@ -90,7 +90,7 @@ class BatchPhysicalSortLimitRule .replace(FlinkRelDistribution.SINGLETON) val newInput = RelOptRule.convert(inputOfExchange, requiredTrait) - // create global BatchExecSortLimit + // create global BatchPhysicalSortLimit val providedGlobalTraitSet = requiredTrait.replace(sort.getCollation) new BatchPhysicalSortLimit( rel.getCluster, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala similarity index 96% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala index bbd96ac4ec72c4d4b0b056315b6746aa7dbb40c6..ff79e51f18592fe9c6afeac0be4a2497e1a6f360 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow} import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecHashWindowAggregate, BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchExecSortWindowAggregate} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate, BatchExecSortWindowAggregate, BatchPhysicalHashWindowAggregate, BatchPhysicalLocalHashWindowAggregate} import org.apache.flink.table.planner.plan.utils.AggregateUtil import org.apache.flink.table.planner.plan.utils.AggregateUtil.hasTimeIntervalType import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate @@ -48,15 +48,15 @@ import scala.collection.JavaConversions._ /** * Rule to convert a [[FlinkLogicalWindowAggregate]] into a * {{{ - * BatchExecHash(or Sort)WindowAggregate (global) + * BatchPhysicalHash(or Sort)WindowAggregate (global) * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton) - * +- BatchExecLocalHash(or Sort)WindowAggregate (local) + * +- BatchPhysicalLocalHash(or Sort)WindowAggregate (local) * +- input of window agg * }}} * when all aggregate functions are mergeable * and [[OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY]] is TWO_PHASE, or * {{{ - * BatchExecHash(or Sort)WindowAggregate + * BatchPhysicalHash(or Sort)WindowAggregate * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton) * +- input of window agg * }}} @@ -67,12 +67,12 @@ import scala.collection.JavaConversions._ * this rule will try to create two possibilities above, and chooses the best one based on cost. * if all aggregate function buffer are fix length, the rule will choose hash window agg. */ -class BatchExecWindowAggregateRule +class BatchPhysicalWindowAggregateRule extends RelOptRule( operand(classOf[FlinkLogicalWindowAggregate], operand(classOf[RelNode], any)), FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE, - "BatchExecWindowAggregateRule") + "BatchPhysicalWindowAggregateRule") with BatchPhysicalAggRuleBase { override def matches(call: RelOptRuleCall): Boolean = { @@ -163,7 +163,7 @@ class BatchExecWindowAggregateRule input.getRowType, call.builder(), window.timeAttribute) val inputTimeFieldType = agg.getInput.getRowType.getFieldList.get(inputTimeFieldIndex).getType val inputTimeIsDate = inputTimeFieldType.getSqlTypeName == SqlTypeName.DATE - // local-agg output order: groupset | assignTs | aucGroupSet | aggCalls + // local-agg output order: groupSet | assignTs | auxGroupSet | aggCalls val newInputTimeFieldIndexFromLocal = groupSet.length val config = input.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig @@ -180,7 +180,7 @@ class BatchExecWindowAggregateRule val newLocalInput = RelOptRule.convert(input, localRequiredTraitSet) val localProvidedTraitSet = localRequiredTraitSet - new BatchExecLocalHashWindowAggregate( + new BatchPhysicalLocalHashWindowAggregate( agg.getCluster, localProvidedTraitSet, newLocalInput, @@ -234,7 +234,7 @@ class BatchExecWindowAggregateRule // hash val newGlobalAggInput = RelOptRule.convert(localAgg, globalRequiredTraitSet) - new BatchExecHashWindowAggregate( + new BatchPhysicalHashWindowAggregate( agg.getCluster, aggProvidedTraitSet, newGlobalAggInput, @@ -293,7 +293,7 @@ class BatchExecWindowAggregateRule // case 2: Sliding window without pane optimization val newInput = RelOptRule.convert(input, requiredTraitSet) - new BatchExecHashWindowAggregate( + new BatchPhysicalHashWindowAggregate( agg.getCluster, aggProvidedTraitSet, newInput, @@ -430,6 +430,6 @@ class BatchExecWindowAggregateRule } } -object BatchExecWindowAggregateRule { - val INSTANCE: RelOptRule = new BatchExecWindowAggregateRule +object BatchPhysicalWindowAggregateRule { + val INSTANCE: RelOptRule = new BatchPhysicalWindowAggregateRule } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala index 4484f2e1937a00e2c7ee5b4dda4f120ff4cec2e5..0a711f405ad304e0e65324aa4a08c78a62f142f6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala @@ -49,7 +49,7 @@ class RemoveRedundantLocalHashAggRule extends RelOptRule( localAgg.grouping, localAgg.auxGrouping, // Use the localAgg agg calls because the global agg call filters was removed, - // see BatchExecHashAggRule for details. + // see BatchPhysicalHashAggRule for details. localAgg.getAggCallToAggFunction, isMerge = false) call.transformTo(newGlobalAgg) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala index 32920f988bdb717fc768e375b9c080a5f03eb59c..833cc081d77af6234bcb84b9eed82f0276da3561 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala @@ -48,7 +48,7 @@ abstract class RemoveRedundantLocalSortAggRule( localAgg.grouping, localAgg.auxGrouping, // Use the localAgg agg calls because the global agg call filters was removed, - // see BatchExecSortAggRule for details. + // see BatchPhysicalSortAggRule for details. localAgg.getAggCallToAggFunction, isMerge = false) call.transformTo(newGlobalAgg) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala index 2afbf0cee2bb9aa7ab68eb58790316a2d9cdaa47..81b436eaa3a7360f10c0a3ca57b71935c9077904 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.planner.JDouble import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate} -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalWindowAggregateBase} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, BatchPhysicalWindowAggregateBase} import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankRange} import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.LENGTH_SIZE_IN_BYTES @@ -334,7 +334,7 @@ object FlinkRelMdUtil { // grouping + assignTs + auxGrouping (agg.getAggCallList, agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping) - case agg: BatchExecLocalHashWindowAggregate => + case agg: BatchPhysicalLocalHashWindowAggregate => // grouping + assignTs + auxGrouping (agg.getAggCallList, agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping) @@ -388,7 +388,7 @@ object FlinkRelMdUtil { } /** - * Split groupKeys on Aggregate/ BatchExecGroupAggregateBase/ BatchExecWindowAggregateBase + * Split groupKeys on Aggregate/ BatchPhysicalGroupAggregateBase/ BatchPhysicalWindowAggregateBase * into keys on aggregate's groupKey and aggregate's aggregateCalls. * * @param agg the aggregate diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 5a5de0c624fb70f074f19cd6226d800fdb1f38e0..50154797a0c06d30509516049a712e626d9ab441 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -1244,7 +1244,7 @@ class FlinkRelMdHandlerTestBase { Array("count$0")).toList // agg calls val localWindowAggRowType = typeFactory.createStructType( localWindowAggTypes, localWindowAggNames) - val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate( + val batchLocalWindowAgg = new BatchPhysicalLocalHashWindowAggregate( batchCalc.getCluster, batchPhysicalTraits, batchCalc, @@ -1260,7 +1260,7 @@ class FlinkRelMdHandlerTestBase { enableAssignPane = false) val batchExchange2 = new BatchPhysicalExchange( cluster, batchPhysicalTraits.replace(hash01), batchLocalWindowAgg, hash01) - val batchWindowAggWithLocal = new BatchExecHashWindowAggregate( + val batchWindowAggWithLocal = new BatchPhysicalHashWindowAggregate( cluster, batchPhysicalTraits, batchExchange2, @@ -1277,7 +1277,7 @@ class FlinkRelMdHandlerTestBase { isMerge = true ) - val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate( + val batchWindowAggWithoutLocal = new BatchPhysicalHashWindowAggregate( batchExchange1.getCluster, batchPhysicalTraits, batchExchange1, @@ -1383,7 +1383,7 @@ class FlinkRelMdHandlerTestBase { Array("count$0")).toList // agg calls val localWindowAggRowType = typeFactory.createStructType( localWindowAggTypes, localWindowAggNames) - val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate( + val batchLocalWindowAgg = new BatchPhysicalLocalHashWindowAggregate( batchCalc.getCluster, batchPhysicalTraits, batchCalc, @@ -1399,7 +1399,7 @@ class FlinkRelMdHandlerTestBase { enableAssignPane = false) val batchExchange2 = new BatchPhysicalExchange( cluster, batchPhysicalTraits.replace(hash1), batchLocalWindowAgg, hash1) - val batchWindowAggWithLocal = new BatchExecHashWindowAggregate( + val batchWindowAggWithLocal = new BatchPhysicalHashWindowAggregate( cluster, batchPhysicalTraits, batchExchange2, @@ -1416,7 +1416,7 @@ class FlinkRelMdHandlerTestBase { isMerge = true ) - val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate( + val batchWindowAggWithoutLocal = new BatchPhysicalHashWindowAggregate( batchExchange1.getCluster, batchPhysicalTraits, batchExchange1, @@ -1527,7 +1527,7 @@ class FlinkRelMdHandlerTestBase { Array("count$0")).toList // agg calls val localWindowAggRowType = typeFactory.createStructType( localWindowAggTypes, localWindowAggNames) - val batchLocalWindowAggWithAuxGroup = new BatchExecLocalHashWindowAggregate( + val batchLocalWindowAggWithAuxGroup = new BatchPhysicalLocalHashWindowAggregate( batchCalc.getCluster, batchPhysicalTraits, batchCalc, @@ -1543,7 +1543,7 @@ class FlinkRelMdHandlerTestBase { enableAssignPane = false) val batchExchange2 = new BatchPhysicalExchange( cluster, batchPhysicalTraits.replace(hash0), batchLocalWindowAggWithAuxGroup, hash0) - val batchWindowAggWithLocalWithAuxGroup = new BatchExecHashWindowAggregate( + val batchWindowAggWithLocalWithAuxGroup = new BatchPhysicalHashWindowAggregate( cluster, batchPhysicalTraits, batchExchange2, @@ -1560,7 +1560,7 @@ class FlinkRelMdHandlerTestBase { isMerge = true ) - val batchWindowAggWithoutLocalWithAuxGroup = new BatchExecHashWindowAggregate( + val batchWindowAggWithoutLocalWithAuxGroup = new BatchPhysicalHashWindowAggregate( batchExchange1.getCluster, batchPhysicalTraits, batchExchange1,