未验证 提交 237e3a2d 编写于 作者: G godfreyhe 提交者: Kurt Young

[FLINK-12575][table-planner-blink] Introduce planner rules to remove redundant...

[FLINK-12575][table-planner-blink] Introduce planner rules to remove redundant shuffle and collation

This closes #8499

i#refactor satisfyTraits method
上级 f1c3ac47
......@@ -35,8 +35,8 @@ trait FlinkPhysicalRel extends FlinkRelNode {
*
* @param requiredTraitSet required traits
* @return A converted node which satisfy required traits by inputs node of current node.
* Returns null if required traits cannot be pushed down into inputs.
* Returns None if required traits cannot be satisfied.
*/
def satisfyTraitsByInput(requiredTraitSet: RelTraitSet): RelNode = null
def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = None
}
......@@ -24,6 +24,7 @@ import org.apache.flink.table.api.{BatchTableEnvironment, TableConfigOptions}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.{CalcCodeGenerator, CodeGeneratorContext}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
import org.apache.flink.table.plan.nodes.common.CommonCalc
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.util.RelExplainUtil
......@@ -32,7 +33,9 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rex.RexProgram
import org.apache.calcite.rex.{RexCall, RexInputRef, RexProgram}
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
import java.util
......@@ -57,6 +60,64 @@ class BatchExecCalc(
new BatchExecCalc(cluster, traitSet, child, program, outputRowType)
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
// Does not push broadcast distribution trait down into Calc.
if (requiredDistribution.getType == RelDistribution.Type.BROADCAST_DISTRIBUTED) {
return None
}
val projects = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
def getProjectMapping: Mapping = {
val mapping = Mappings.create(MappingType.INVERSE_FUNCTION,
getInput.getRowType.getFieldCount, projects.size)
projects.zipWithIndex.foreach {
case (project, index) =>
project match {
case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
case call: RexCall if call.getKind == SqlKind.AS =>
call.getOperands.head match {
case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
case _ => // ignore
}
case _ => // ignore
}
}
mapping.inverse()
}
val mapping = getProjectMapping
val appliedDistribution = requiredDistribution.apply(mapping)
// If both distribution and collation can be satisfied, satisfy both. If only distribution
// can be satisfied, only satisfy distribution. There is no possibility to only satisfy
// collation here except for there is no distribution requirement.
if ((!requiredDistribution.isTop) && (appliedDistribution eq FlinkRelDistribution.ANY)) {
return None
}
val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
val appliedCollation = TraitUtil.apply(requiredCollation, mapping)
val canCollationPushedDown = !appliedCollation.getFieldCollations.isEmpty
// If required traits only contains collation requirements, but collation keys are not columns
// from input, then no need to satisfy required traits.
if ((appliedDistribution eq FlinkRelDistribution.ANY) && !canCollationPushedDown) {
return None
}
var inputRequiredTraits = getInput.getTraitSet
var providedTraits = getTraitSet
if (!appliedDistribution.isTop) {
inputRequiredTraits = inputRequiredTraits.replace(appliedDistribution)
providedTraits = providedTraits.replace(requiredDistribution)
}
if (canCollationPushedDown) {
inputRequiredTraits = inputRequiredTraits.replace(appliedCollation)
providedTraits = providedTraits.replace(requiredCollation)
}
val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
Some(copy(providedTraits, Seq(newInput)))
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior = DamBehavior.PIPELINED
......
......@@ -23,16 +23,18 @@ import org.apache.flink.table.api.{BatchTableEnvironment, TableConfigOptions}
import org.apache.flink.table.codegen.{CodeGeneratorContext, CorrelateCodeGenerator}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
import org.apache.flink.table.plan.util.RelExplainUtil
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Correlate
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
import org.apache.calcite.sql.SemiJoinType
import org.apache.calcite.rel.{RelCollationTraitDef, RelDistribution, RelFieldCollation, RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
import org.apache.calcite.sql.{SemiJoinType, SqlKind}
import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
import java.util
......@@ -94,6 +96,79 @@ class BatchExecCorrelate(
.itemIf("condition", condition.orNull, condition.isDefined)
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
// Correlate could not provide broadcast distribution
if (requiredDistribution.getType == RelDistribution.Type.BROADCAST_DISTRIBUTED) {
return None
}
def getOutputInputMapping: Mapping = {
val inputFieldCnt = getInput.getRowType.getFieldCount
projectProgram match {
case Some(program) =>
val projects = program.getProjectList.map(program.expandLocalRef)
val mapping = Mappings.create(MappingType.INVERSE_FUNCTION, inputFieldCnt, projects.size)
projects.zipWithIndex.foreach {
case (project, index) =>
project match {
case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
case call: RexCall if call.getKind == SqlKind.AS =>
call.getOperands.head match {
case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
case _ => // ignore
}
case _ => // ignore
}
}
mapping.inverse()
case _ =>
val mapping = Mappings.create(MappingType.FUNCTION, inputFieldCnt, inputFieldCnt)
(0 until inputFieldCnt).foreach {
index => mapping.set(index, index)
}
mapping
}
}
val mapping = getOutputInputMapping
val appliedDistribution = requiredDistribution.apply(mapping)
// If both distribution and collation can be satisfied, satisfy both. If only distribution
// can be satisfied, only satisfy distribution. There is no possibility to only satisfy
// collation here except for there is no distribution requirement.
if ((!requiredDistribution.isTop) && (appliedDistribution eq FlinkRelDistribution.ANY)) {
return None
}
val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
val appliedCollation = TraitUtil.apply(requiredCollation, mapping)
// the required collation can be satisfied if field collations are not empty
// and the direction of each field collation is non-STRICTLY
val canSatisfyCollation = appliedCollation.getFieldCollations.nonEmpty &&
!appliedCollation.getFieldCollations.exists { c =>
(c.getDirection eq RelFieldCollation.Direction.STRICTLY_ASCENDING) ||
(c.getDirection eq RelFieldCollation.Direction.STRICTLY_DESCENDING)
}
// If required traits only contains collation requirements, but collation keys are not columns
// from input, then no need to satisfy required traits.
if ((appliedDistribution eq FlinkRelDistribution.ANY) && !canSatisfyCollation) {
return None
}
var inputRequiredTraits = getInput.getTraitSet
var providedTraits = getTraitSet
if (!appliedDistribution.isTop) {
inputRequiredTraits = inputRequiredTraits.replace(appliedDistribution)
providedTraits = providedTraits.replace(requiredDistribution)
}
if (canSatisfyCollation) {
inputRequiredTraits = inputRequiredTraits.replace(appliedCollation)
providedTraits = providedTraits.replace(requiredCollation)
}
val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
Some(copy(providedTraits, Seq(newInput)))
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
......
......@@ -18,9 +18,9 @@
package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.table.api.TableException
import org.apache.flink.table.api.{AggPhaseEnforcer, PlannerConfigOptions, TableException}
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.util.RelExplainUtil
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
......@@ -84,4 +84,11 @@ abstract class BatchExecGroupAggregateBase(
isFinal)
}
protected def isEnforceTwoStageAgg: Boolean = {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val aggConfig = tableConfig.getConf.getString(
PlannerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_ENFORCER)
AggPhaseEnforcer.TWO_PHASE.toString.equalsIgnoreCase(aggConfig)
}
}
......@@ -19,19 +19,24 @@ package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.{PlannerConfigOptions, TableConfig}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.util.RelExplainUtil
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelDistribution.Type.{HASH_DISTRIBUTED, SINGLETON}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.tools.RelBuilder
import org.apache.calcite.util.{ImmutableIntList, Util}
import java.util
import scala.collection.JavaConversions._
/**
* Batch physical RelNode for (global) hash-based aggregate operator.
*
......@@ -94,6 +99,53 @@ class BatchExecHashAggregate(
isGlobal = true))
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val canSatisfy = requiredDistribution.getType match {
case SINGLETON => grouping.length == 0
case HASH_DISTRIBUTED =>
val shuffleKeys = requiredDistribution.getKeys
val groupKeysList = ImmutableIntList.of(grouping.indices.toArray: _*)
if (requiredDistribution.requireStrict) {
shuffleKeys == groupKeysList
} else if (Util.startsWith(shuffleKeys, groupKeysList)) {
// If required distribution is not strict, Hash[a] can satisfy Hash[a, b].
// so return true if shuffleKeys(Hash[a, b]) start with groupKeys(Hash[a])
true
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val partialKeyEnabled = tableConfig.getConf.getBoolean(
PlannerConfigOptions.SQL_OPTIMIZER_SHUFFLE_PARTIAL_KEY_ENABLED)
partialKeyEnabled && groupKeysList.containsAll(shuffleKeys)
}
case _ => false
}
if (!canSatisfy) {
return None
}
val inputRequiredDistribution = requiredDistribution.getType match {
case SINGLETON => requiredDistribution
case HASH_DISTRIBUTED =>
val shuffleKeys = requiredDistribution.getKeys
val groupKeysList = ImmutableIntList.of(grouping.indices.toArray: _*)
if (requiredDistribution.requireStrict) {
FlinkRelDistribution.hash(grouping, requireStrict = true)
} else if (Util.startsWith(shuffleKeys, groupKeysList)) {
// Hash[a] can satisfy Hash[a, b]
FlinkRelDistribution.hash(grouping, requireStrict = false)
} else {
// use partial key to satisfy the required distribution
FlinkRelDistribution.hash(shuffleKeys.map(grouping(_)).toArray, requireStrict = false)
}
}
val newInput = RelOptRule.convert(getInput, inputRequiredDistribution)
val newProvidedTraitSet = getTraitSet.replace(requiredDistribution)
Some(copy(newProvidedTraitSet, Seq(newInput)))
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior = DamBehavior.FULL_DAM
......
......@@ -21,7 +21,9 @@ import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.exec.ExecNode
import org.apache.flink.table.plan.util.{FlinkRelMdUtil, JoinUtil}
import org.apache.flink.table.runtime.join.HashJoinType
......@@ -127,6 +129,37 @@ class BatchExecHashJoin(
}
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
if (!isBroadcast) {
satisfyTraitsOnNonBroadcastHashJoin(requiredTraitSet)
} else {
satisfyTraitsOnBroadcastJoin(requiredTraitSet, leftIsBuild)
}
}
private def satisfyTraitsOnNonBroadcastHashJoin(
requiredTraitSet: RelTraitSet): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val (canSatisfyDistribution, leftRequiredDistribution, rightRequiredDistribution) =
satisfyHashDistributionOnNonBroadcastJoin(requiredDistribution)
if (!canSatisfyDistribution) {
return None
}
val toRestrictHashDistributionByKeys = (distribution: FlinkRelDistribution) =>
getCluster.getPlanner
.emptyTraitSet
.replace(FlinkConventions.BATCH_PHYSICAL)
.replace(distribution)
val leftRequiredTraits = toRestrictHashDistributionByKeys(leftRequiredDistribution)
val rightRequiredTraits = toRestrictHashDistributionByKeys(rightRequiredDistribution)
val newLeft = RelOptRule.convert(getLeft, leftRequiredTraits)
val newRight = RelOptRule.convert(getRight, rightRequiredTraits)
val providedTraits = getTraitSet.replace(requiredDistribution)
// HashJoin can not satisfy collation.
Some(copy(providedTraits, Seq(newLeft, newRight)))
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = {
......
......@@ -22,13 +22,19 @@ import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.generated.GeneratedJoinCondition
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin
import org.apache.flink.table.plan.nodes.exec.BatchExecNode
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelDistribution.Type.{HASH_DISTRIBUTED, RANGE_DISTRIBUTED}
import org.apache.calcite.rel.core.{Join, JoinRelType}
import org.apache.calcite.rel.{RelCollations, RelNode}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.util.ImmutableIntList
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
* Batch physical RelNode for [[Join]]
......@@ -70,4 +76,150 @@ abstract class BatchExecJoinBase(
"JoinConditionFunction",
body)
}
/**
* Try to satisfy hash distribution on Non-BroadcastJoin (including SortMergeJoin and
* Non-Broadcast HashJoin).
*
* @param requiredDistribution distribution requirement
* @return a Tuple including 3 element.
* The first element is a flag which indicates whether the requirement can be satisfied.
* The second element is the distribution requirement of left child if the requirement
* can be push down into join.
* The third element is the distribution requirement of right child if the requirement
* can be push down into join.
*/
def satisfyHashDistributionOnNonBroadcastJoin(
requiredDistribution: FlinkRelDistribution
): (Boolean, FlinkRelDistribution, FlinkRelDistribution) = {
// Only Non-broadcast HashJoin could provide HashDistribution
if (requiredDistribution.getType != HASH_DISTRIBUTED) {
return (false, null, null)
}
// Full outer join cannot provide Hash distribute because it will generate null for left/right
// side if there is no match row.
if (joinType == JoinRelType.FULL) {
return (false, null, null)
}
val leftKeys = joinInfo.leftKeys
val rightKeys = joinInfo.rightKeys
val leftKeysToRightKeys = leftKeys.zip(rightKeys).toMap
val rightKeysToLeftKeys = rightKeys.zip(leftKeys).toMap
val leftFieldCnt = getLeft.getRowType.getFieldCount
val requiredShuffleKeys = requiredDistribution.getKeys
val requiredLeftShuffleKeys = mutable.ArrayBuffer[Int]()
val requiredRightShuffleKeys = mutable.ArrayBuffer[Int]()
requiredShuffleKeys.foreach { key =>
if (key < leftFieldCnt && joinType != JoinRelType.RIGHT) {
leftKeysToRightKeys.get(key) match {
case Some(rk) =>
requiredLeftShuffleKeys += key
requiredRightShuffleKeys += rk
case None if requiredDistribution.requireStrict =>
// Cannot satisfy required hash distribution due to required distribution is restrict
// however the key is not found in right
return (false, null, null)
case _ => // do nothing
}
} else if (key >= leftFieldCnt &&
(joinType == JoinRelType.RIGHT || joinType == JoinRelType.INNER)) {
val keysOnRightChild = key - leftFieldCnt
rightKeysToLeftKeys.get(keysOnRightChild) match {
case Some(lk) =>
requiredLeftShuffleKeys += lk
requiredRightShuffleKeys += keysOnRightChild
case None if requiredDistribution.requireStrict =>
// Cannot satisfy required hash distribution due to required distribution is restrict
// however the key is not found in left
return (false, null, null)
case _ => // do nothing
}
} else {
// cannot satisfy required hash distribution if requirement shuffle keys are not come from
// left side when Join is LOJ or are not come from right side when Join is ROJ.
return (false, null, null)
}
}
if (requiredLeftShuffleKeys.isEmpty) {
// the join can not satisfy the required hash distribution
// due to the required input shuffle keys are empty
return (false, null, null)
}
val (leftShuffleKeys, rightShuffleKeys) = if (joinType == JoinRelType.INNER &&
!requiredDistribution.requireStrict) {
(requiredLeftShuffleKeys.distinct, requiredRightShuffleKeys.distinct)
} else {
(requiredLeftShuffleKeys, requiredRightShuffleKeys)
}
(true,
FlinkRelDistribution.hash(ImmutableIntList.of(leftShuffleKeys: _*), requireStrict = true),
FlinkRelDistribution.hash(ImmutableIntList.of(rightShuffleKeys: _*), requireStrict = true))
}
/**
* Try to satisfy the given required traits on BroadcastJoin (including Broadcast-HashJoin and
* NestedLoopJoin).
*
* @param requiredTraitSet requirement traitSets
* @return Equivalent Join which satisfies required traitSet, return null if
* requirement cannot be satisfied.
*/
protected def satisfyTraitsOnBroadcastJoin(
requiredTraitSet: RelTraitSet,
leftIsBroadcast: Boolean): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val keys = requiredDistribution.getKeys
val left = getLeft
val right = getRight
val leftFieldCnt = left.getRowType.getFieldCount
val canSatisfy = requiredDistribution.getType match {
case HASH_DISTRIBUTED | RANGE_DISTRIBUTED =>
// required distribution can be satisfied only if distribution keys all from
// non-broadcast side of BroadcastJoin
if (leftIsBroadcast) {
// all distribution keys must come from right child
keys.forall(_ >= leftFieldCnt)
} else {
// all distribution keys must come from left child
keys.forall(_ < leftFieldCnt)
}
// SINGLETON, BROADCAST_DISTRIBUTED, ANY, RANDOM_DISTRIBUTED, ROUND_ROBIN_DISTRIBUTED
// distribution cannot be pushed down.
case _ => false
}
if (!canSatisfy) {
return None
}
val keysInProbeSide = if (leftIsBroadcast) {
ImmutableIntList.of(keys.map(_ - leftFieldCnt): _ *)
} else {
keys
}
val inputRequiredDistribution = requiredDistribution.getType match {
case HASH_DISTRIBUTED =>
FlinkRelDistribution.hash(keysInProbeSide, requiredDistribution.requireStrict)
case RANGE_DISTRIBUTED =>
FlinkRelDistribution.range(keysInProbeSide)
}
// remove collation traits from input traits and provided traits
val (newLeft, newRight) = if (leftIsBroadcast) {
val rightRequiredTraitSet = right.getTraitSet
.replace(inputRequiredDistribution)
.replace(RelCollations.EMPTY)
val newRight = RelOptRule.convert(right, rightRequiredTraitSet)
(left, newRight)
} else {
val leftRequiredTraitSet = left.getTraitSet
.replace(inputRequiredDistribution)
.replace(RelCollations.EMPTY)
val newLeft = RelOptRule.convert(left, leftRequiredTraitSet)
(newLeft, right)
}
val providedTraitSet = requiredTraitSet.replace(RelCollations.EMPTY)
Some(copy(providedTraitSet, Seq(newLeft, newRight)))
}
}
......@@ -19,21 +19,24 @@ package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.nodes.exec.ExecNode
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.util.RelExplainUtil
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelDistribution.Type
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.tools.RelBuilder
import org.apache.calcite.util.ImmutableIntList
import java.util
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
* Batch physical RelNode for local hash-based aggregate operator.
......@@ -92,6 +95,37 @@ class BatchExecLocalHashAggregate(
isGlobal = false))
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
// Does not to try to satisfy requirement by localAgg's input if enforce to use two-stage agg.
if (isEnforceTwoStageAgg) {
return None
}
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val canSatisfy = requiredDistribution.getType match {
case Type.HASH_DISTRIBUTED | Type.RANGE_DISTRIBUTED =>
val groupCount = grouping.length
// Cannot satisfy distribution if keys are not group keys of agg
requiredDistribution.getKeys.forall(_ < groupCount)
case _ => false
}
if (!canSatisfy) {
return None
}
val keys = requiredDistribution.getKeys.map(grouping(_))
val inputRequiredDistributionKeys = ImmutableIntList.of(keys: _*)
val inputRequiredDistribution = requiredDistribution.getType match {
case Type.HASH_DISTRIBUTED =>
FlinkRelDistribution.hash(inputRequiredDistributionKeys, requiredDistribution.requireStrict)
case Type.RANGE_DISTRIBUTED => FlinkRelDistribution.range(inputRequiredDistributionKeys)
}
val inputRequiredTraits = input.getTraitSet.replace(inputRequiredDistribution)
val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
val providedTraits = getTraitSet.replace(requiredDistribution)
Some(copy(providedTraits, Seq(newInput)))
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = {
......
......@@ -22,16 +22,22 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.util.RelExplainUtil
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelDistribution.Type
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.tools.RelBuilder
import org.apache.calcite.util.ImmutableIntList
import java.util
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
* Batch physical RelNode for local sort-based aggregate operator.
*
......@@ -90,6 +96,44 @@ class BatchExecLocalSortAggregate(
isGlobal = false))
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
// Does not to try to satisfy requirement by localAgg's input if enforce to use two-stage agg.
if (isEnforceTwoStageAgg) {
return None
}
val groupCount = grouping.length
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val canSatisfy = requiredDistribution.getType match {
case Type.HASH_DISTRIBUTED | Type.RANGE_DISTRIBUTED =>
// Cannot satisfy distribution if keys are not group keys of agg
requiredDistribution.getKeys.forall(_ < groupCount)
case _ => false
}
if (!canSatisfy) {
return None
}
val keys = requiredDistribution.getKeys.map(grouping(_))
val inputRequiredDistributionKeys = ImmutableIntList.of(keys: _*)
val inputRequiredDistribution = requiredDistribution.getType match {
case Type.HASH_DISTRIBUTED =>
FlinkRelDistribution.hash(inputRequiredDistributionKeys, requiredDistribution.requireStrict)
case Type.RANGE_DISTRIBUTED =>
FlinkRelDistribution.range(inputRequiredDistributionKeys)
}
val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
val providedFieldCollations = (0 until groupCount).map(FlinkRelOptUtil.ofRelFieldCollation)
val providedCollation = RelCollations.of(providedFieldCollations)
val newProvidedTraits = if (providedCollation.satisfies(requiredCollation)) {
getTraitSet.replace(requiredDistribution).replace(requiredCollation)
} else {
getTraitSet.replace(requiredDistribution)
}
val inputRequiredTraits = getInput.getTraitSet.replace(inputRequiredDistribution)
val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
Some(copy(newProvidedTraits, Seq(newInput)))
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = {
......
......@@ -82,6 +82,7 @@ class BatchExecNestedLoopJoin(
if (leftRowCnt == null || rightRowCnt == null) {
return null
}
val buildRel = if (leftIsBuild) getLeft else getRight
val buildRows = mq.getRowCount(buildRel)
val buildRowSize = mq.getAverageRowSize(buildRel)
......@@ -104,6 +105,11 @@ class BatchExecNestedLoopJoin(
}
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
// Assume NestedLoopJoin always broadcast data from child which smaller.
satisfyTraitsOnBroadcastJoin(requiredTraitSet, leftIsBuild)
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
......
......@@ -22,7 +22,7 @@ import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation}
import org.apache.flink.table.CalcitePair
import org.apache.flink.table.`type`.InternalTypes
import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, TableConfigOptions}
import org.apache.flink.table.api.{BatchTableEnvironment, PlannerConfigOptions, TableConfig, TableConfigOptions}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGeneratorContext
import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator
......@@ -31,17 +31,19 @@ import org.apache.flink.table.codegen.sort.ComparatorCodeGenerator
import org.apache.flink.table.dataformat.{BaseRow, BinaryRow}
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.generated.GeneratedRecordComparator
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.nodes.physical.batch.OverWindowMode.OverWindowMode
import org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.plan.util.OverAggregateUtil.getLongBoundary
import org.apache.flink.table.plan.util.{OverAggregateUtil, RelExplainUtil}
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, OverAggregateUtil, RelExplainUtil}
import org.apache.flink.table.runtime.over.frame.OffsetOverFrame.CalcOffsetFunc
import org.apache.flink.table.runtime.over.frame.{InsensitiveOverFrame, OffsetOverFrame, OverWindowFrame, RangeSlidingOverFrame, RangeUnboundedFollowingOverFrame, RangeUnboundedPrecedingOverFrame, RowSlidingOverFrame, RowUnboundedFollowingOverFrame, RowUnboundedPrecedingOverFrame, UnboundedOverWindowFrame}
import org.apache.flink.table.runtime.over.{BufferDataOverWindowOperator, NonBufferOverWindowOperator}
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelDistribution.Type._
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Window.Group
......@@ -51,6 +53,7 @@ import org.apache.calcite.rex.RexWindowBound
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.fun.SqlLeadLagAggFunction
import org.apache.calcite.tools.RelBuilder
import org.apache.calcite.util.ImmutableIntList
import java.util
......@@ -179,7 +182,7 @@ class BatchExecOverAggregate(
yield new CalcitePair[AggregateCall, String](aggregateCalls.get(i), "windowAgg$" + i)
}
private[flink] def splitOutOffsetOrInsensitiveGroup()
private def splitOutOffsetOrInsensitiveGroup()
: Seq[(OverWindowMode, Window.Group, Seq[(AggregateCall, UserDefinedFunction)])] = {
def compareTo(o1: Window.RexWinAggCall, o2: Window.RexWinAggCall): Boolean = {
......@@ -242,6 +245,102 @@ class BatchExecOverAggregate(
windowGroupInfo
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
if (requiredDistribution.getType == ANY && requiredCollation.getFieldCollations.isEmpty) {
return None
}
val selfProvidedTraitSet = inferProvidedTraitSet()
if (selfProvidedTraitSet.satisfies(requiredTraitSet)) {
// Current node can satisfy the requiredTraitSet,return the current node with ProvidedTraitSet
return Some(copy(selfProvidedTraitSet, Seq(getInput)))
}
val inputFieldCnt = getInput.getRowType.getFieldCount
val canSatisfy = if (requiredDistribution.getType == ANY) {
true
} else {
if (!grouping.isEmpty) {
if (requiredDistribution.requireStrict) {
requiredDistribution.getKeys == ImmutableIntList.of(grouping: _*)
} else {
val isAllFieldsFromInput = requiredDistribution.getKeys.forall(_ < inputFieldCnt)
if (isAllFieldsFromInput) {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
if (tableConfig.getConf.getBoolean(
PlannerConfigOptions.SQL_OPTIMIZER_SHUFFLE_PARTIAL_KEY_ENABLED)) {
ImmutableIntList.of(grouping: _*).containsAll(requiredDistribution.getKeys)
} else {
requiredDistribution.getKeys == ImmutableIntList.of(grouping: _*)
}
} else {
// If requirement distribution keys are not all comes from input directly,
// cannot satisfy requirement distribution and collations.
false
}
}
} else {
requiredDistribution.getType == SINGLETON
}
}
// If OverAggregate can provide distribution, but it's traits cannot satisfy required
// distribution, cannot push down distribution and collation requirement (because later
// shuffle will destroy previous collation.
if (!canSatisfy) {
return None
}
var inputRequiredTraits = getInput.getTraitSet
var providedTraits = selfProvidedTraitSet
val providedCollation = selfProvidedTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
if (!requiredDistribution.isTop) {
inputRequiredTraits = inputRequiredTraits.replace(requiredDistribution)
providedTraits = providedTraits.replace(requiredDistribution)
}
if (providedCollation.satisfies(requiredCollation)) {
// the providedCollation can satisfy the requirement,
// so don't push down the sort into it's input.
} else if (providedCollation.getFieldCollations.isEmpty &&
requiredCollation.getFieldCollations.nonEmpty) {
// If OverAgg cannot provide collation itself, try to push down collation requirements into
// it's input if collation fields all come from input node.
val canPushDownCollation = requiredCollation.getFieldCollations
.forall(_.getFieldIndex < inputFieldCnt)
if (canPushDownCollation) {
inputRequiredTraits = inputRequiredTraits.replace(requiredCollation)
providedTraits = providedTraits.replace(requiredCollation)
}
} else {
// Don't push down the sort into it's input,
// due to the provided collation will destroy the input's provided collation.
}
val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
Some(copy(providedTraits, Seq(newInput)))
}
private def inferProvidedTraitSet(): RelTraitSet = {
var selfProvidedTraitSet = getTraitSet
// provided distribution
val providedDistribution = if (grouping.nonEmpty) {
FlinkRelDistribution.hash(grouping.map(Integer.valueOf).toList, requireStrict = false)
} else {
FlinkRelDistribution.SINGLETON
}
selfProvidedTraitSet = selfProvidedTraitSet.replace(providedDistribution)
// provided collation
val firstGroup = windowGroupToAggCallToAggFunction.head._1
if (OverAggregateUtil.needCollationTrait(logicWindow, firstGroup)) {
val collation = OverAggregateUtil.createCollation(firstGroup)
if (!collation.equals(RelCollations.EMPTY)) {
selfProvidedTraitSet = selfProvidedTraitSet.replace(collation)
}
}
selfProvidedTraitSet
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior = DamBehavior.PIPELINED
......
......@@ -20,22 +20,25 @@ package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation}
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.api.{BatchTableEnvironment, PlannerConfigOptions, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.sort.ComparatorCodeGenerator
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.calcite.Rank
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.util.RelExplainUtil
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.flink.table.runtime.rank.{ConstantRankRange, RankRange, RankType}
import org.apache.flink.table.runtime.sort.RankOperator
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelDistribution.Type
import org.apache.calcite.rel.RelDistribution.Type.{HASH_DISTRIBUTED, SINGLETON}
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.RelDataTypeField
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.util.ImmutableBitSet
import org.apache.calcite.util.{ImmutableBitSet, ImmutableIntList, Util}
import java.util
......@@ -112,6 +115,122 @@ class BatchExecRank(
costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost)
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
if (isGlobal) {
satisfyTraitsOnGlobalRank(requiredTraitSet)
} else {
satisfyTraitsOnLocalRank(requiredTraitSet)
}
}
private def satisfyTraitsOnGlobalRank(requiredTraitSet: RelTraitSet): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val canSatisfy = requiredDistribution.getType match {
case SINGLETON => partitionKey.cardinality() == 0
case HASH_DISTRIBUTED =>
val shuffleKeys = requiredDistribution.getKeys
val partitionKeyList = ImmutableIntList.of(partitionKey.toArray: _*)
if (requiredDistribution.requireStrict) {
shuffleKeys == partitionKeyList
} else if (Util.startsWith(shuffleKeys, partitionKeyList)) {
// If required distribution is not strict, Hash[a] can satisfy Hash[a, b].
// so return true if shuffleKeys(Hash[a, b]) start with partitionKeyList(Hash[a])
true
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val partialKeyEnabled = tableConfig.getConf.getBoolean(
PlannerConfigOptions.SQL_OPTIMIZER_SHUFFLE_PARTIAL_KEY_ENABLED)
partialKeyEnabled && partitionKeyList.containsAll(shuffleKeys)
}
case _ => false
}
if (!canSatisfy) {
return None
}
val inputRequiredDistribution = requiredDistribution.getType match {
case SINGLETON => requiredDistribution
case HASH_DISTRIBUTED =>
val shuffleKeys = requiredDistribution.getKeys
val partitionKeyList = ImmutableIntList.of(partitionKey.toArray: _*)
if (requiredDistribution.requireStrict) {
FlinkRelDistribution.hash(partitionKeyList)
} else if (Util.startsWith(shuffleKeys, partitionKeyList)) {
// Hash[a] can satisfy Hash[a, b]
FlinkRelDistribution.hash(partitionKeyList, requireStrict = false)
} else {
// use partial key to satisfy the required distribution
FlinkRelDistribution.hash(shuffleKeys.map(partitionKeyList(_)), requireStrict = false)
}
}
// sort by partition keys + orderby keys
val providedFieldCollations = partitionKey.toArray.map {
k => FlinkRelOptUtil.ofRelFieldCollation(k)
}.toList ++ orderKey.getFieldCollations
val providedCollation = RelCollations.of(providedFieldCollations)
val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
val newProvidedTraitSet = if (providedCollation.satisfies(requiredCollation)) {
getTraitSet.replace(requiredDistribution).replace(requiredCollation)
} else {
getTraitSet.replace(requiredDistribution)
}
val newInput = RelOptRule.convert(getInput, inputRequiredDistribution)
Some(copy(newProvidedTraitSet, Seq(newInput)))
}
private def satisfyTraitsOnLocalRank(requiredTraitSet: RelTraitSet): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
requiredDistribution.getType match {
case Type.SINGLETON =>
val inputRequiredDistribution = requiredDistribution
// sort by orderby keys
val providedCollation = orderKey
val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
val newProvidedTraitSet = if (providedCollation.satisfies(requiredCollation)) {
getTraitSet.replace(requiredDistribution).replace(requiredCollation)
} else {
getTraitSet.replace(requiredDistribution)
}
val inputRequiredTraits = getInput.getTraitSet.replace(inputRequiredDistribution)
val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
Some(copy(newProvidedTraitSet, Seq(newInput)))
case Type.HASH_DISTRIBUTED =>
val shuffleKeys = requiredDistribution.getKeys
if (outputRankNumber) {
// rank function column is the last one
val rankColumnIndex = getRowType.getFieldCount - 1
if (!shuffleKeys.contains(rankColumnIndex)) {
// Cannot satisfy required distribution if some keys are not from input
return None
}
}
val inputRequiredDistributionKeys = shuffleKeys
val inputRequiredDistribution = FlinkRelDistribution.hash(
inputRequiredDistributionKeys, requiredDistribution.requireStrict)
// sort by partition keys + orderby keys
val providedFieldCollations = partitionKey.toArray.map {
k => FlinkRelOptUtil.ofRelFieldCollation(k)
}.toList ++ orderKey.getFieldCollations
val providedCollation = RelCollations.of(providedFieldCollations)
val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
val newProvidedTraitSet = if (providedCollation.satisfies(requiredCollation)) {
getTraitSet.replace(requiredDistribution).replace(requiredCollation)
} else {
getTraitSet.replace(requiredDistribution)
}
val inputRequiredTraits = getInput.getTraitSet.replace(inputRequiredDistribution)
val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
Some(copy(newProvidedTraitSet, Seq(newInput)))
case _ => None
}
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
......
......@@ -19,19 +19,24 @@ package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.{PlannerConfigOptions, TableConfig}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.util.RelExplainUtil
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelDistribution.Type.{HASH_DISTRIBUTED, SINGLETON}
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.tools.RelBuilder
import org.apache.calcite.util.{ImmutableIntList, Util}
import java.util
import scala.collection.JavaConversions._
/**
* Batch physical RelNode for (global) sort-based aggregate operator.
*
......@@ -95,6 +100,64 @@ class BatchExecSortAggregate(
isGlobal = true))
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val canSatisfy = requiredDistribution.getType match {
case SINGLETON => grouping.length == 0
case HASH_DISTRIBUTED =>
val shuffleKeys = requiredDistribution.getKeys
val groupKeysList = ImmutableIntList.of(grouping.indices.toArray: _*)
if (requiredDistribution.requireStrict) {
shuffleKeys == groupKeysList
} else if (Util.startsWith(shuffleKeys, groupKeysList)) {
// If required distribution is not strict, Hash[a] can satisfy Hash[a, b].
// so return true if shuffleKeys(Hash[a, b]) start with groupKeys(Hash[a])
true
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val partialKeyEnabled = tableConfig.getConf.getBoolean(
PlannerConfigOptions.SQL_OPTIMIZER_SHUFFLE_PARTIAL_KEY_ENABLED)
partialKeyEnabled && groupKeysList.containsAll(shuffleKeys)
}
case _ => false
}
if (!canSatisfy) {
return None
}
val inputRequiredDistribution = requiredDistribution.getType match {
case SINGLETON => requiredDistribution
case HASH_DISTRIBUTED =>
val shuffleKeys = requiredDistribution.getKeys
val groupKeysList = ImmutableIntList.of(grouping.indices.toArray: _*)
if (requiredDistribution.requireStrict) {
FlinkRelDistribution.hash(grouping, requireStrict = true)
} else if (Util.startsWith(shuffleKeys, groupKeysList)) {
// Hash [a] can satisfy Hash[a, b]
FlinkRelDistribution.hash(grouping, requireStrict = false)
} else {
// use partial key to satisfy the required distribution
FlinkRelDistribution.hash(shuffleKeys.map(grouping(_)).toArray, requireStrict = false)
}
}
val providedCollation = if (grouping.length == 0) {
RelCollations.EMPTY
} else {
val providedFieldCollations = grouping.map(FlinkRelOptUtil.ofRelFieldCollation).toList
RelCollations.of(providedFieldCollations)
}
val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
val newProvidedTraitSet = if (providedCollation.satisfies(requiredCollation)) {
getTraitSet.replace(requiredDistribution).replace(requiredCollation)
} else {
getTraitSet.replace(requiredDistribution)
}
val newInput = RelOptRule.convert(getInput, inputRequiredDistribution)
Some(copy(newProvidedTraitSet, Seq(newInput)))
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = {
......
......@@ -26,16 +26,17 @@ import org.apache.flink.table.codegen.CodeGeneratorContext
import org.apache.flink.table.codegen.ProjectionCodeGenerator.generateProjection
import org.apache.flink.table.codegen.sort.SortCodeGenerator
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.ExpressionFormat
import org.apache.flink.table.plan.nodes.exec.ExecNode
import org.apache.flink.table.plan.util.{FlinkRelMdUtil, JoinUtil, SortUtil}
import org.apache.flink.table.plan.util.{FlinkRelMdUtil, FlinkRelOptUtil, JoinUtil, SortUtil}
import org.apache.flink.table.runtime.join.{FlinkJoinType, SortMergeJoinOperator}
import org.apache.calcite.plan._
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.{RelCollationTraitDef, RelNode, RelWriter}
import org.apache.calcite.rex.RexNode
import java.util
......@@ -142,6 +143,55 @@ class BatchExecSortMergeJoin(
costFactory.makeCost(rowCount, cpuCost, 0, 0, sortMemCost)
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val (canSatisfyDistribution, leftRequiredDistribution, rightRequiredDistribution) =
satisfyHashDistributionOnNonBroadcastJoin(requiredDistribution)
if (!canSatisfyDistribution) {
return None
}
val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
val requiredFieldCollations = requiredCollation.getFieldCollations
val shuffleKeysSize = leftRequiredDistribution.getKeys.size
val newLeft = RelOptRule.convert(getLeft, leftRequiredDistribution)
val newRight = RelOptRule.convert(getRight, rightRequiredDistribution)
// SortMergeJoin can provide collation trait, check whether provided collation can satisfy
// required collations
val canProvideCollation = if (requiredCollation.getFieldCollations.isEmpty) {
false
} else if (requiredFieldCollations.size > shuffleKeysSize) {
// Sort by [a, b] can satisfy [a], but cannot satisfy [a, b, c]
false
} else {
val leftKeys = leftRequiredDistribution.getKeys
val leftFieldCnt = getLeft.getRowType.getFieldCount
val rightKeys = rightRequiredDistribution.getKeys.map(_ + leftFieldCnt)
requiredFieldCollations.zipWithIndex.forall { case (collation, index) =>
val idxOfCollation = collation.getFieldIndex
// Full outer join is handled before, so does not need care about it
if (idxOfCollation < leftFieldCnt && joinType != JoinRelType.RIGHT) {
val fieldCollationOnLeftSortKey = FlinkRelOptUtil.ofRelFieldCollation(leftKeys.get(index))
collation == fieldCollationOnLeftSortKey
} else if (idxOfCollation >= leftFieldCnt &&
(joinType == JoinRelType.RIGHT || joinType == JoinRelType.INNER)) {
val fieldCollationOnRightSortKey =
FlinkRelOptUtil.ofRelFieldCollation(rightKeys.get(index))
collation == fieldCollationOnRightSortKey
} else {
false
}
}
}
var newProvidedTraitSet = getTraitSet.replace(requiredDistribution)
if (canProvideCollation) {
newProvidedTraitSet = newProvidedTraitSet.replace(requiredCollation)
}
Some(copy(newProvidedTraitSet, Seq(newLeft, newRight)))
}
//~ ExecNode methods -----------------------------------------------------------
/**
......
......@@ -21,9 +21,11 @@ import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.{StreamTransformation, UnionTransformation}
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelDistribution.Type.{ANY, BROADCAST_DISTRIBUTED, HASH_DISTRIBUTED, RANDOM_DISTRIBUTED, RANGE_DISTRIBUTED, ROUND_ROBIN_DISTRIBUTED, SINGLETON}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{SetOp, Union}
import org.apache.calcite.rel.{RelNode, RelWriter}
......@@ -58,6 +60,41 @@ class BatchExecUnion(
.item("union", getRowType.getFieldNames.mkString(", "))
}
override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
// union will destroy collation trait. So does not handle collation requirement.
val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
val canSatisfy = requiredDistribution.getType match {
case RANDOM_DISTRIBUTED |
ROUND_ROBIN_DISTRIBUTED |
BROADCAST_DISTRIBUTED |
HASH_DISTRIBUTED => true
// range distribution cannot be satisfied because partition's [lower, upper] of each union
// child may be different.
case RANGE_DISTRIBUTED => false
// singleton cannot cannot be satisfied because singleton exchange limits the parallelism of
// exchange output RelNode to 1.
// Push down Singleton into input of union will destroy the limitation.
case SINGLETON => false
// there is no need to satisfy Any distribution
case ANY => false
}
if (!canSatisfy) {
return None
}
val inputRequiredDistribution = requiredDistribution.getType match {
case RANDOM_DISTRIBUTED | ROUND_ROBIN_DISTRIBUTED | BROADCAST_DISTRIBUTED =>
requiredDistribution
case HASH_DISTRIBUTED =>
// apply strict hash distribution of each child
// to avoid inconsistent of shuffle of each child
FlinkRelDistribution.hash(requiredDistribution.getKeys)
}
val newInputs = getInputs.map(RelOptRule.convert(_, inputRequiredDistribution))
val providedTraitSet = getTraitSet.replace(inputRequiredDistribution)
Some(copy(providedTraitSet, newInputs))
}
//~ ExecNode methods -----------------------------------------------------------
override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
......
......@@ -192,7 +192,8 @@ object FlinkBatchRuleSets {
val WINDOW_RULES: RuleSet = RuleSets.ofList(
// slices a project into sections which contain window agg functions and sections which do not.
ProjectToWindowRule.PROJECT,
// TODO add ExchangeWindowGroupRule
//adjust the sequence of window's groups.
WindowGroupReorderRule.INSTANCE,
// Transform window to LogicalWindowAggregate
WindowPropertiesRules.WINDOW_PROPERTIES_RULE,
WindowPropertiesRules.WINDOW_PROPERTIES_HAVING_RULE
......@@ -311,28 +312,44 @@ object FlinkBatchRuleSets {
*/
val PHYSICAL_OPT_RULES: RuleSet = RuleSets.ofList(
FlinkExpandConversionRule.BATCH_INSTANCE,
// source
BatchExecBoundedStreamScanRule.INSTANCE,
BatchExecScanTableSourceRule.INSTANCE,
BatchExecIntermediateTableScanRule.INSTANCE,
BatchExecValuesRule.INSTANCE,
// calc
BatchExecCalcRule.INSTANCE,
// union
BatchExecUnionRule.INSTANCE,
// sort
BatchExecSortRule.INSTANCE,
BatchExecLimitRule.INSTANCE,
BatchExecSortLimitRule.INSTANCE,
// rank
BatchExecRankRule.INSTANCE,
RemoveRedundantLocalRankRule.INSTANCE,
// expand
BatchExecExpandRule.INSTANCE,
// group agg
BatchExecHashAggRule.INSTANCE,
BatchExecSortAggRule.INSTANCE,
RemoveRedundantLocalSortAggRule.WITHOUT_SORT,
RemoveRedundantLocalSortAggRule.WITH_SORT,
RemoveRedundantLocalHashAggRule.INSTANCE,
// over agg
BatchExecOverAggregateRule.INSTANCE,
// window agg
BatchExecWindowAggregateRule.INSTANCE,
// join
BatchExecHashJoinRule.INSTANCE,
BatchExecSortMergeJoinRule.INSTANCE,
BatchExecNestedLoopJoinRule.INSTANCE,
BatchExecSingleRowJoinRule.INSTANCE,
BatchExecCorrelateRule.INSTANCE,
BatchExecOverAggregateRule.INSTANCE,
BatchExecWindowAggregateRule.INSTANCE,
BatchExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
BatchExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
// correlate
BatchExecCorrelateRule.INSTANCE,
// sink
BatchExecSinkRule.INSTANCE
)
}
......@@ -293,27 +293,39 @@ object FlinkStreamRuleSets {
*/
val PHYSICAL_OPT_RULES: RuleSet = RuleSets.ofList(
FlinkExpandConversionRule.STREAM_INSTANCE,
// source
StreamExecDataStreamScanRule.INSTANCE,
StreamExecTableSourceScanRule.INSTANCE,
StreamExecIntermediateTableScanRule.INSTANCE,
StreamExecValuesRule.INSTANCE,
// calc
StreamExecCalcRule.INSTANCE,
// union
StreamExecUnionRule.INSTANCE,
// sort
StreamExecSortRule.INSTANCE,
StreamExecLimitRule.INSTANCE,
StreamExecSortLimitRule.INSTANCE,
StreamExecRankRule.INSTANCE,
StreamExecTemporalSortRule.INSTANCE,
// rank
StreamExecRankRule.INSTANCE,
StreamExecDeduplicateRule.RANK_INSTANCE,
// expand
StreamExecExpandRule.INSTANCE,
// group agg
StreamExecGroupAggregateRule.INSTANCE,
// over agg
StreamExecOverAggregateRule.INSTANCE,
// window agg
StreamExecGroupWindowAggregateRule.INSTANCE,
StreamExecExpandRule.INSTANCE,
// join
StreamExecJoinRule.INSTANCE,
StreamExecWindowJoinRule.INSTANCE,
StreamExecCorrelateRule.INSTANCE,
StreamExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
StreamExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
// correlate
StreamExecCorrelateRule.INSTANCE,
// sink
StreamExecSinkRule.INSTANCE
)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.logical
import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Window.Group
import org.apache.calcite.rel.logical.{LogicalProject, LogicalWindow}
import org.apache.calcite.rel.{RelCollation, RelNode}
import org.apache.calcite.rex.RexInputRef
import java.util
import java.util.Comparator
import scala.collection.JavaConversions._
/**
* Planner rule that makes the over window groups which have the same shuffle keys and order keys
* together.
*/
class WindowGroupReorderRule extends RelOptRule(
operand(classOf[LogicalWindow],
operand(classOf[RelNode], any)),
"ExchangeWindowGroupRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val window: LogicalWindow = call.rel(0)
window.groups.size() > 1
}
override def onMatch(call: RelOptRuleCall): Unit = {
val window: LogicalWindow = call.rel(0)
val input: RelNode = call.rel(1)
val oldGroups: util.List[Group] = new util.ArrayList(window.groups)
val sequenceGroups: util.List[Group] = new util.ArrayList(window.groups)
sequenceGroups.sort(new Comparator[Group] {
override def compare(o1: Group, o2: Group): Int = {
val keyComp = o1.keys.compareTo(o2.keys)
if (keyComp == 0) {
compareRelCollation(o1.orderKeys, o2.orderKeys)
} else {
keyComp
}
}
})
if (!sequenceGroups.equals(oldGroups) && !sequenceGroups.reverse.equals(oldGroups)) {
var offset = input.getRowType.getFieldCount
val aggTypeIndexes = oldGroups.map { group =>
val aggCount = group.aggCalls.size()
val typeIndexes = (0 until aggCount).map(_ + offset).toArray
offset += aggCount
typeIndexes
}
offset = input.getRowType.getFieldCount
val mapToOldTypeIndexes = (0 until offset).toArray ++
sequenceGroups.flatMap { newGroup =>
val aggCount = newGroup.aggCalls.size()
val oldIndex = oldGroups.indexOf(newGroup)
offset += aggCount
(0 until aggCount).map {
aggIndex => aggTypeIndexes(oldIndex)(aggIndex)
}
}.toArray[Int]
val oldRowTypeFields = window.getRowType.getFieldList
val newFieldList = new util.ArrayList[util.Map.Entry[String, RelDataType]]
mapToOldTypeIndexes.foreach { index =>
newFieldList.add(oldRowTypeFields.get(index))
}
val intermediateRowType = window.getCluster.getTypeFactory.createStructType(newFieldList)
val newLogicalWindow = LogicalWindow.create(
window.getCluster.getPlanner.emptyTraitSet(),
input,
window.constants,
intermediateRowType,
sequenceGroups)
val mapToNewTypeIndexes = mapToOldTypeIndexes.zipWithIndex.sortBy(_._1)
val projects = mapToNewTypeIndexes.map { index =>
new RexInputRef(index._2, newFieldList.get(index._2).getValue)
}
val project = LogicalProject.create(
newLogicalWindow,
projects.toList,
window.getRowType)
call.transformTo(project)
}
}
private def compareRelCollation(o1: RelCollation, o2: RelCollation): Int = {
val comp = o1.compareTo(o2)
if (comp == 0) {
val collations1 = o1.getFieldCollations
val collations2 = o2.getFieldCollations
for (index <- 0 until collations1.length) {
val collation1 = collations1(index)
val collation2 = collations2(index)
val direction = collation1.direction.shortString.compareTo(collation2.direction.shortString)
if (direction == 0) {
val nullDirection = collation1.nullDirection.nullComparison.compare(
collation2.nullDirection.nullComparison)
if (nullDirection != 0) {
return nullDirection
}
} else {
return direction
}
}
}
comp
}
}
object WindowGroupReorderRule {
val INSTANCE = new WindowGroupReorderRule
}
......@@ -81,14 +81,16 @@ class FlinkExpandConversionRule(flinkConvention: Convention)
call: RelOptRuleCall): Unit = {
node match {
case batchRel: BatchPhysicalRel =>
var otherChoice = batchRel.satisfyTraitsByInput(requiredTraits)
if (otherChoice != null) {
// It is possible only push down distribution instead of push down both distribution and
// collation. So it is necessary to check whether collation satisfy requirement.
val requiredCollation = requiredTraits.getTrait(RelCollationTraitDef.INSTANCE)
otherChoice = satisfyCollation(flinkConvention, otherChoice, requiredCollation)
checkSatisfyRequiredTrait(otherChoice, requiredTraits)
call.transformTo(otherChoice)
val otherChoice = batchRel.satisfyTraits(requiredTraits)
otherChoice match {
case Some(newRel) =>
// It is possible only push down distribution instead of push down both distribution and
// collation. So it is necessary to check whether collation satisfy requirement.
val requiredCollation = requiredTraits.getTrait(RelCollationTraitDef.INSTANCE)
val finalRel = satisfyCollation(flinkConvention, newRel, requiredCollation)
checkSatisfyRequiredTrait(finalRel, requiredTraits)
call.transformTo(finalRel)
case _ => // do nothing
}
case _ => // ignore
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.physical.batch
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.physical.batch.{BatchExecHashAggregate, BatchExecLocalHashAggregate}
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
/**
* There maybe exist a subTree like localHashAggregate -> globalHashAggregate which the middle
* shuffle is removed. The rule could remove redundant localHashAggregate node.
*/
class RemoveRedundantLocalHashAggRule extends RelOptRule(
operand(classOf[BatchExecHashAggregate],
operand(classOf[BatchExecLocalHashAggregate],
operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))),
"RemoveRedundantLocalHashAggRule") {
override def onMatch(call: RelOptRuleCall): Unit = {
val globalAgg = call.rels(0).asInstanceOf[BatchExecHashAggregate]
val localAgg = call.rels(1).asInstanceOf[BatchExecLocalHashAggregate]
val inputOfLocalAgg = localAgg.getInput
val newGlobalAgg = new BatchExecHashAggregate(
globalAgg.getCluster,
call.builder(),
globalAgg.getTraitSet,
inputOfLocalAgg,
globalAgg.getRowType,
inputOfLocalAgg.getRowType,
inputOfLocalAgg.getRowType,
localAgg.getGrouping,
localAgg.getAuxGrouping,
globalAgg.getAggCallToAggFunction,
isMerge = false)
call.transformTo(newGlobalAgg)
}
}
object RemoveRedundantLocalHashAggRule {
val INSTANCE = new RemoveRedundantLocalHashAggRule
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.physical.batch
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecRank
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import scala.collection.JavaConversions._
/**
* Planner rule that matches a global [[BatchExecRank]] on a local [[BatchExecRank]],
* and merge them into a global [[BatchExecRank]].
*/
class RemoveRedundantLocalRankRule extends RelOptRule(
operand(classOf[BatchExecRank],
operand(classOf[BatchExecRank],
operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))),
"RemoveRedundantLocalRankRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val globalRank: BatchExecRank = call.rel(0)
val localRank: BatchExecRank = call.rel(1)
globalRank.isGlobal && !localRank.isGlobal &&
globalRank.rankType == localRank.rankType &&
globalRank.partitionKey == localRank.partitionKey &&
globalRank.orderKey == globalRank.orderKey &&
globalRank.rankEnd == localRank.rankEnd
}
override def onMatch(call: RelOptRuleCall): Unit = {
val globalRank: BatchExecRank = call.rel(0)
val inputOfLocalRank: RelNode = call.rel(2)
val newGlobalRank = globalRank.copy(globalRank.getTraitSet, List(inputOfLocalRank))
call.transformTo(newGlobalRank)
}
}
object RemoveRedundantLocalRankRule {
val INSTANCE: RelOptRule = new RemoveRedundantLocalRankRule
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.physical.batch
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.physical.batch.{BatchExecLocalSortAggregate, BatchExecSort, BatchExecSortAggregate}
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
import org.apache.calcite.rel.RelNode
/**
* There maybe exist a subTree like localSortAggregate -> globalSortAggregate, or
* localSortAggregate -> sort -> globalSortAggregate which the middle shuffle is removed.
* The rule could remove redundant localSortAggregate node.
*/
abstract class RemoveRedundantLocalSortAggRule(
operand: RelOptRuleOperand,
ruleName: String) extends RelOptRule(operand, ruleName) {
override def onMatch(call: RelOptRuleCall): Unit = {
val globalAgg = getOriginalGlobalAgg(call)
val localAgg = getOriginalLocalAgg(call)
val inputOfLocalAgg = getOriginalInputOfLocalAgg(call)
val newGlobalAgg = new BatchExecSortAggregate(
globalAgg.getCluster,
call.builder(),
globalAgg.getTraitSet,
inputOfLocalAgg,
globalAgg.getRowType,
inputOfLocalAgg.getRowType,
inputOfLocalAgg.getRowType,
localAgg.getGrouping,
localAgg.getAuxGrouping,
globalAgg.getAggCallToAggFunction,
isMerge = false)
call.transformTo(newGlobalAgg)
}
private[table] def getOriginalGlobalAgg(call: RelOptRuleCall): BatchExecSortAggregate
private[table] def getOriginalLocalAgg(call: RelOptRuleCall): BatchExecLocalSortAggregate
private[table] def getOriginalInputOfLocalAgg(call: RelOptRuleCall): RelNode
}
class RemoveRedundantLocalSortAggWithoutSortRule extends RemoveRedundantLocalSortAggRule(
operand(classOf[BatchExecSortAggregate],
operand(classOf[BatchExecLocalSortAggregate],
operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))),
"RemoveRedundantLocalSortAggWithoutSortRule") {
override private[table] def getOriginalGlobalAgg(call: RelOptRuleCall): BatchExecSortAggregate = {
call.rels(0).asInstanceOf[BatchExecSortAggregate]
}
override private[table] def getOriginalLocalAgg(
call: RelOptRuleCall): BatchExecLocalSortAggregate = {
call.rels(1).asInstanceOf[BatchExecLocalSortAggregate]
}
override private[table] def getOriginalInputOfLocalAgg(call: RelOptRuleCall): RelNode = {
call.rels(2)
}
}
class RemoveRedundantLocalSortAggWithSortRule extends RemoveRedundantLocalSortAggRule(
operand(classOf[BatchExecSortAggregate],
operand(classOf[BatchExecSort],
operand(classOf[BatchExecLocalSortAggregate],
operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any)))),
"RemoveRedundantLocalSortAggWithSortRule") {
override private[table] def getOriginalGlobalAgg(call: RelOptRuleCall): BatchExecSortAggregate = {
call.rels(0).asInstanceOf[BatchExecSortAggregate]
}
override private[table] def getOriginalLocalAgg(
call: RelOptRuleCall): BatchExecLocalSortAggregate = {
call.rels(2).asInstanceOf[BatchExecLocalSortAggregate]
}
override private[table] def getOriginalInputOfLocalAgg(call: RelOptRuleCall): RelNode = {
call.rels(3)
}
}
object RemoveRedundantLocalSortAggRule {
val WITHOUT_SORT = new RemoveRedundantLocalSortAggWithoutSortRule
val WITH_SORT = new RemoveRedundantLocalSortAggWithSortRule
}
......@@ -726,15 +726,14 @@ Sink(fields=[a1, b, c1])
:- Exchange(distribution=[hash[a3]], exchange_mode=[BATCH])
: +- Calc(select=[a AS a3, c AS c1], where=[AND(>=(a, 0), <(b, 5))])
: +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
+- Exchange(distribution=[hash[a1]])
+- Calc(select=[a AS a1, b])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a2)], select=[a, b, a2], build=[right])
:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
: +- Calc(select=[a, b], where=[<=(a, 10)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a AS a2], where=[AND(>=(a, 0), >=(b, 5))])
+- Reused(reference_id=[1])
+- Calc(select=[a AS a1, b])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a2)], select=[a, b, a2], build=[right])
:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
: +- Calc(select=[a, b], where=[<=(a, 10)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a AS a2], where=[AND(>=(a, 0), >=(b, 5))])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
......@@ -784,21 +783,19 @@ Sink(fields=[a, b, c])
:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
: +- Calc(select=[a], where=[<=(a, 10)])
: +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
+- Exchange(distribution=[hash[a3]])
+- Calc(select=[a3, b AS b1, c1])
+- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a3, c1, a1, b], build=[right])
:- Exchange(distribution=[hash[a3]], exchange_mode=[BATCH])
: +- Calc(select=[a AS a3, c AS c1], where=[AND(>=(a, 0), <(b, 5))])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a1]])
+- Calc(select=[a AS a1, b])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a2)], select=[a, b, a2], build=[right])
:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
: +- Calc(select=[a, b], where=[<=(a, 10)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a AS a2], where=[AND(>=(a, 0), >=(b, 5))])
+- Reused(reference_id=[1])
+- Calc(select=[a3, b AS b1, c1])
+- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a3, c1, a1, b], build=[right])
:- Exchange(distribution=[hash[a3]], exchange_mode=[BATCH])
: +- Calc(select=[a AS a3, c AS c1], where=[AND(>=(a, 0), <(b, 5))])
: +- Reused(reference_id=[1])
+- Calc(select=[a AS a1, b])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a2)], select=[a, b, a2], build=[right])
:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
: +- Calc(select=[a, b], where=[<=(a, 10)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a AS a2], where=[AND(>=(a, 0), >=(b, 5))])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
......
......@@ -89,22 +89,21 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5], a1=[$6], b1=[$
HashJoin(joinType=[InnerJoin], where=[=(c, c1)], select=[a, b, c, a0, b0, c0, a1, b1, c1, a00, b00, c00], build=[right])
:- Exchange(distribution=[hash[c]], exchange_mode=[BATCH])
: +- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[left])
: :- Exchange(distribution=[hash[a]])
: : +- Calc(select=[a, b, c], where=[>(b, 10)])
: : +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS b, Final_MAX(max$1) AS c], reuse_id=[1])
: : +- Sort(orderBy=[a ASC])
: : +- Exchange(distribution=[hash[a]])
: : +- LocalSortAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0, Partial_MAX(c) AS max$1])
: : +- Sort(orderBy=[a ASC])
: : +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[a]], exchange_mode=[BATCH], reuse_id=[2])
: :- Calc(select=[a, b, c], where=[>(b, 10)])
: : +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS b, Final_MAX(max$1) AS c], reuse_id=[1])
: : +- Sort(orderBy=[a ASC])
: : +- Exchange(distribution=[hash[a]])
: : +- LocalSortAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0, Partial_MAX(c) AS max$1])
: : +- Sort(orderBy=[a ASC])
: : +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[c]])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[left])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b, c], where=[<(b, 5)])
: +- Reused(reference_id=[1])
+- Reused(reference_id=[2])
:- Calc(select=[a, b, c], where=[<(b, 5)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
......@@ -141,17 +140,16 @@ Calc(select=[a, b])
:- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
: +- Calc(select=[a], where=[=(b, 5:BIGINT)])
: +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
+- Exchange(distribution=[hash[a]])
+- HashJoin(joinType=[LeftSemiJoin], where=[=(a, a0)], select=[a, b], build=[left])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b])
: +- Limit(offset=[0], fetch=[10], global=[true])
: +- Exchange(distribution=[single])
: +- Limit(offset=[0], fetch=[10], global=[false])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+- Calc(select=[a], where=[>(b, 5)])
+- Reused(reference_id=[1])
+- HashJoin(joinType=[LeftSemiJoin], where=[=(a, a0)], select=[a, b], build=[left])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b])
: +- Limit(offset=[0], fetch=[10], global=[true])
: +- Exchange(distribution=[single])
: +- Limit(offset=[0], fetch=[10], global=[false])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a]], exchange_mode=[BATCH])
+- Calc(select=[a], where=[>(b, 5)])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
......@@ -298,13 +296,11 @@ HashJoin(joinType=[InnerJoin], where=[=(c, c0)], select=[a, b, c, a0, b0, c0], b
:- Exchange(distribution=[hash[c]], exchange_mode=[BATCH])
: +- Calc(select=[w0$o0 AS a, b, c])
: +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX($2) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[b, c, $2, w0$o0])
: +- Sort(orderBy=[b ASC], reuse_id=[1])
: +- Exchange(distribution=[hash[b]])
: +- Calc(select=[b, c, CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS $2])
: +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1])
: +- Sort(orderBy=[b ASC])
: +- Exchange(distribution=[hash[b]])
: +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Calc(select=[b, c, CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS $2], reuse_id=[1])
: +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1])
: +- Sort(orderBy=[b ASC])
: +- Exchange(distribution=[hash[b]])
: +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[w0$o0 AS a, b, c])
+- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MIN($2) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[b, c, $2, w0$o0])
......
......@@ -240,28 +240,26 @@ LogicalProject(c=[$0], e=[$1], avg_b=[$2], sum_b=[$3], psum=[$4], nsum=[$5], avg
<![CDATA[
Calc(select=[c, e, avg_b, sum_b, sum_b0 AS psum, sum_b1 AS nsum, avg_b0 AS avg_b2])
+- HashJoin(joinType=[InnerJoin], where=[AND(=(c, c0), =(e, e0), =(rn, $f5))], select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0, sum_b1, c0, e0, $f5], build=[left])
:- Exchange(distribution=[hash[c, e, rn]])
: +- Calc(select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0])
: +- HashJoin(joinType=[InnerJoin], where=[AND(=(c, c0), =(e, e0), =(rn, $f5))], select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0, c0, e0, $f5], build=[left])
: :- Exchange(distribution=[hash[c, e, rn]])
: : +- Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, w1$o0 AS rn, c, e], where=[AND(<>(c, _UTF-16LE'':VARCHAR(65536) CHARACTER SET "UTF-16LE"), >(-(sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0)), 3))])
: : +- OverAggregate(partitionBy=[c, e], orderBy=[], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, e, sum_b, w0$o0, w0$o1, w1$o0], reuse_id=[1])
: : +- Sort(orderBy=[c ASC, e ASC], reuse_id=[2])
: : +- Exchange(distribution=[hash[c, e]])
: : +- HashAggregate(isMerge=[true], groupBy=[c, e], select=[c, e, Final_SUM(sum$0) AS sum_b])
: : +- Exchange(distribution=[hash[c, e]])
: : +- LocalHashAggregate(groupBy=[c, e], select=[c, e, Partial_SUM(b) AS sum$0])
: : +- Calc(select=[c, e, b])
: : +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e], build=[right])
: : :- Exchange(distribution=[hash[a]])
: : : +- Calc(select=[a, b, c], where=[IS NOT NULL(c)])
: : : +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : +- Exchange(distribution=[hash[d]])
: : +- Calc(select=[d, e], where=[>(e, 10)])
: : +- TableSourceScan(table=[[y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
: +- Exchange(distribution=[hash[c, e, $f5]], exchange_mode=[BATCH])
: +- Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, c, e, +(w1$o0, 1) AS $f5])
: +- Reused(reference_id=[1])
:- Calc(select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0])
: +- HashJoin(joinType=[InnerJoin], where=[AND(=(c, c0), =(e, e0), =(rn, $f5))], select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0, c0, e0, $f5], build=[left])
: :- Exchange(distribution=[hash[c, e, rn]])
: : +- Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, w1$o0 AS rn, c, e], where=[AND(<>(c, _UTF-16LE'':VARCHAR(65536) CHARACTER SET "UTF-16LE"), >(-(sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0)), 3))])
: : +- OverAggregate(partitionBy=[c, e], orderBy=[], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, e, sum_b, w0$o0, w0$o1, w1$o0], reuse_id=[1])
: : +- Sort(orderBy=[c ASC, e ASC], reuse_id=[2])
: : +- HashAggregate(isMerge=[true], groupBy=[c, e], select=[c, e, Final_SUM(sum$0) AS sum_b])
: : +- Exchange(distribution=[hash[c, e]])
: : +- LocalHashAggregate(groupBy=[c, e], select=[c, e, Partial_SUM(b) AS sum$0])
: : +- Calc(select=[c, e, b])
: : +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e], build=[right])
: : :- Exchange(distribution=[hash[a]])
: : : +- Calc(select=[a, b, c], where=[IS NOT NULL(c)])
: : : +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : +- Exchange(distribution=[hash[d]])
: : +- Calc(select=[d, e], where=[>(e, 10)])
: : +- TableSourceScan(table=[[y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
: +- Exchange(distribution=[hash[c, e, $f5]], exchange_mode=[BATCH])
: +- Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, c, e, +(w1$o0, 1) AS $f5])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[c, e, $f5]], exchange_mode=[BATCH])
+- Calc(select=[sum_b, c, e, -(w0$o0, 1) AS $f5])
+- OverAggregate(partitionBy=[c, e], orderBy=[c ASC, e ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, e, sum_b, w0$o0])
......@@ -651,8 +649,7 @@ HashJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f, a0, b
: : +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[d]])
: +- TableSourceScan(table=[[y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Exchange(distribution=[hash[d]])
+- Reused(reference_id=[1])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
......@@ -1061,16 +1058,14 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
</Resource>
<Resource name="planAfter">
<![CDATA[
SortMergeJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f, a0, b0, c0, d0, e0, f0])
:- Exchange(distribution=[hash[a]])
: +- SortMergeJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], reuse_id=[1])
: :- Exchange(distribution=[hash[a]])
: : +- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')])
: : +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[d]])
: +- TableSourceScan(table=[[y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Exchange(distribution=[hash[d]])
+- Reused(reference_id=[1])
SortMergeJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f, a0, b0, c0, d0, e0, f0], leftSorted=[true], rightSorted=[true])
:- SortMergeJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], reuse_id=[1])
: :- Exchange(distribution=[hash[a]])
: : +- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')])
: : +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[d]])
: +- TableSourceScan(table=[[y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
......
......@@ -279,18 +279,16 @@ LogicalProject(deptno=[$0], gender=[$2], min_name=[$3])
<Resource name="planAfter">
<![CDATA[
Calc(select=[deptno, gender, min_name], where=[OR(>($f5, 2), AND(=(gender, _UTF-16LE'M':VARCHAR(65536) CHARACTER SET "UTF-16LE"), =(deptno, 10)))])
+- SortAggregate(isMerge=[true], groupBy=[deptno, gender, deptno0, $e], select=[deptno, gender, deptno0, $e, Final_MIN(min$0) AS min_name, Final_COUNT(count1$1) AS $f5])
+- SortAggregate(isMerge=[false], groupBy=[deptno, gender, deptno0, $e], select=[deptno, gender, deptno0, $e, MIN(ename) AS min_name, COUNT(*) AS $f5])
+- Sort(orderBy=[deptno ASC, gender ASC, deptno0 ASC, $e ASC])
+- Exchange(distribution=[hash[deptno, gender, deptno0, $e]])
+- LocalSortAggregate(groupBy=[deptno, gender, deptno0, $e], select=[deptno, gender, deptno0, $e, Partial_MIN(ename) AS min$0, Partial_COUNT(*) AS count1$1])
+- Sort(orderBy=[deptno ASC, gender ASC, deptno0 ASC, $e ASC])
+- Expand(projects=[{ename=[$0], deptno=[$1], gender=[$2], deptno0=[$3], $e=[0]}, {ename=[$0], deptno=[$1], gender=[$2], deptno0=[null], $e=[1]}, {ename=[$0], deptno=[$1], gender=[null], deptno0=[$3], $e=[2]}, {ename=[$0], deptno=[$1], gender=[null], deptno0=[null], $e=[3]}, {ename=[$0], deptno=[null], gender=[$2], deptno0=[$3], $e=[4]}, {ename=[$0], deptno=[null], gender=[$2], deptno0=[null], $e=[5]}, {ename=[$0], deptno=[null], gender=[null], deptno0=[$3], $e=[6]}, {ename=[$0], deptno=[null], gender=[null], deptno0=[null], $e=[7]}], projects=[{ename, deptno, gender, deptno0, 0 AS $e}, {ename, deptno, gender, null AS deptno0, 1 AS $e}, {ename, deptno, null AS gender, deptno0, 2 AS $e}, {ename, deptno, null AS gender, null AS deptno0, 3 AS $e}, {ename, null AS deptno, gender, deptno0, 4 AS $e}, {ename, null AS deptno, gender, null AS deptno0, 5 AS $e}, {ename, null AS deptno, null AS gender, deptno0, 6 AS $e}, {ename, null AS deptno, null AS gender, null AS deptno0, 7 AS $e}])
+- HashJoin(joinType=[InnerJoin], where=[=(deptno, deptno0)], select=[ename, deptno, gender, deptno0], build=[right])
:- Exchange(distribution=[hash[deptno]])
: +- TableSourceScan(table=[[emp, source: [TestTableSource(ename, deptno, gender)]]], fields=[ename, deptno, gender])
+- Exchange(distribution=[hash[deptno]])
+- Calc(select=[deptno])
+- TableSourceScan(table=[[dept, source: [TestTableSource(deptno, dname)]]], fields=[deptno, dname])
+- Expand(projects=[{ename=[$0], deptno=[$1], gender=[$2], deptno0=[$3], $e=[0]}, {ename=[$0], deptno=[$1], gender=[$2], deptno0=[null], $e=[1]}, {ename=[$0], deptno=[$1], gender=[null], deptno0=[$3], $e=[2]}, {ename=[$0], deptno=[$1], gender=[null], deptno0=[null], $e=[3]}, {ename=[$0], deptno=[null], gender=[$2], deptno0=[$3], $e=[4]}, {ename=[$0], deptno=[null], gender=[$2], deptno0=[null], $e=[5]}, {ename=[$0], deptno=[null], gender=[null], deptno0=[$3], $e=[6]}, {ename=[$0], deptno=[null], gender=[null], deptno0=[null], $e=[7]}], projects=[{ename, deptno, gender, deptno0, 0 AS $e}, {ename, deptno, gender, null AS deptno0, 1 AS $e}, {ename, deptno, null AS gender, deptno0, 2 AS $e}, {ename, deptno, null AS gender, null AS deptno0, 3 AS $e}, {ename, null AS deptno, gender, deptno0, 4 AS $e}, {ename, null AS deptno, gender, null AS deptno0, 5 AS $e}, {ename, null AS deptno, null AS gender, deptno0, 6 AS $e}, {ename, null AS deptno, null AS gender, null AS deptno0, 7 AS $e}])
+- HashJoin(joinType=[InnerJoin], where=[=(deptno, deptno0)], select=[ename, deptno, gender, deptno0], build=[right])
:- Exchange(distribution=[hash[deptno]])
: +- TableSourceScan(table=[[emp, source: [TestTableSource(ename, deptno, gender)]]], fields=[ename, deptno, gender])
+- Exchange(distribution=[hash[deptno]])
+- Calc(select=[deptno])
+- TableSourceScan(table=[[dept, source: [TestTableSource(deptno, dname)]]], fields=[deptno, dname])
]]>
</Resource>
</TestCase>
......
......@@ -1054,13 +1054,12 @@ Calc(select=[b])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Exchange(distribution=[broadcast])
+- SortAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
+- Exchange(distribution=[single])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalSortAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
]]>
</Resource>
</TestCase>
......
......@@ -124,8 +124,8 @@ LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$6], b=[$7], c=[$8])
<![CDATA[
Calc(select=[d, e, f, g, h, a, b, c])
+- NestedLoopJoin(joinType=[FullOuterJoin], where=[AND(=(a, d), $f5, <(b, h))], select=[d, e, f, g, h, $f5, a, b, c], build=[right])
:- Exchange(distribution=[single])
: +- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
:- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
: +- Exchange(distribution=[single])
: +- TableSourceScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+- Exchange(distribution=[single])
+- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
......
......@@ -1293,13 +1293,12 @@ Calc(select=[b])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Exchange(distribution=[broadcast])
+- SortAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
+- Exchange(distribution=[single])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalSortAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
]]>
</Resource>
</TestCase>
......
......@@ -1000,12 +1000,11 @@ Calc(select=[a])
: :- Exchange(distribution=[hash[e]])
: : +- Calc(select=[d, e])
: : +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
: +- Exchange(distribution=[hash[j]])
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j])
: +- Exchange(distribution=[hash[j]])
: +- LocalHashAggregate(groupBy=[j], select=[j])
: +- Calc(select=[j])
: +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j])
: +- Exchange(distribution=[hash[j]])
: +- LocalHashAggregate(groupBy=[j], select=[j])
: +- Calc(select=[j])
: +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
+- Exchange(distribution=[hash[i, k]])
+- Calc(select=[i, k])
+- Reused(reference_id=[1])
......@@ -1330,13 +1329,12 @@ Calc(select=[b])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Exchange(distribution=[broadcast])
+- SortAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
+- Exchange(distribution=[single])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalSortAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
]]>
</Resource>
</TestCase>
......@@ -1611,15 +1609,14 @@ Calc(select=[a])
:- Exchange(distribution=[hash[b]])
: +- Calc(select=[a, b])
: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[e]])
+- HashAggregate(isMerge=[true], groupBy=[e], select=[e])
+- Exchange(distribution=[hash[e]])
+- LocalHashAggregate(groupBy=[e], select=[e])
+- Union(all=[true], union=[e])
:- Calc(select=[e], where=[>(d, 10)])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Calc(select=[CAST(i) AS i], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+- HashAggregate(isMerge=[true], groupBy=[e], select=[e])
+- Exchange(distribution=[hash[e]])
+- LocalHashAggregate(groupBy=[e], select=[e])
+- Union(all=[true], union=[e])
:- Calc(select=[e], where=[>(d, 10)])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Calc(select=[CAST(i) AS i], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
]]>
</Resource>
</TestCase>
......@@ -1933,19 +1930,18 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 100))])
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left])
:- Exchange(distribution=[hash[a]])
: +- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], build=[right])
: :- Exchange(distribution=[hash[a]])
: : +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], build=[right])
: : :- Exchange(distribution=[hash[b]])
: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
: : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : +- Exchange(distribution=[hash[j]])
: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
: : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- Exchange(distribution=[hash[i]])
: +- Calc(select=[i], where=[<(j, 100)])
: +- Reused(reference_id=[1])
:- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], build=[right])
: :- Exchange(distribution=[hash[a]])
: : +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], build=[right])
: : :- Exchange(distribution=[hash[b]])
: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
: : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : +- Exchange(distribution=[hash[j]])
: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
: : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- Exchange(distribution=[hash[i]])
: +- Calc(select=[i], where=[<(j, 100)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[d]])
+- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
......@@ -2213,12 +2209,11 @@ LogicalFilter(condition=[=($cor0.a, $0)])
HashJoin(joinType=[LeftAntiJoin], where=[=(a, c)], select=[a, b], build=[right])
:- Exchange(distribution=[hash[a]])
: +- TableSourceScan(table=[[leftT, source: [TestTableSource(a, b)]]], fields=[a, b])
+- Exchange(distribution=[hash[c]])
+- HashAggregate(isMerge=[true], groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
+- LocalHashAggregate(groupBy=[c], select=[c])
+- Calc(select=[c])
+- TableSourceScan(table=[[rightT, source: [TestTableSource(c, d)]]], fields=[c, d])
+- HashAggregate(isMerge=[true], groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
+- LocalHashAggregate(groupBy=[c], select=[c])
+- Calc(select=[c])
+- TableSourceScan(table=[[rightT, source: [TestTableSource(c, d)]]], fields=[c, d])
]]>
</Resource>
</TestCase>
......@@ -2312,39 +2307,36 @@ Calc(select=[b])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[b, c, CASE(OR(=(c0, 0), AND(<>(c0, 0), IS NULL(i0), >=(ck, c0), IS NOT NULL(a))), 1, OR(=(c1, 0), AND(<>(c1, 0), IS NULL(i), >=(ck0, c1), IS NOT NULL(a))), 2, 3) AS $f3])
: +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, EXPR$0)], select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], build=[right])
: :- Exchange(distribution=[hash[a]])
: : +- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], build=[right], singleRowJoin=[true])
: : :- Calc(select=[a, b, c, c0, ck, i0])
: : : +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0], build=[right])
: : : :- Exchange(distribution=[hash[a]])
: : : : +- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])
: : : : :- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : : : +- Exchange(distribution=[broadcast])
: : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : : : +- Exchange(distribution=[single])
: : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1])
: : : : +- Calc(select=[i])
: : : : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: : : +- Exchange(distribution=[hash[i]])
: : : +- Calc(select=[i, true AS i0])
: : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
: : : +- Exchange(distribution=[hash[i]])
: : : +- LocalHashAggregate(groupBy=[i], select=[i])
: : : +- Calc(select=[i, true AS i0])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[broadcast])
: : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : +- Exchange(distribution=[single])
: : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1])
: : +- Calc(select=[CAST(j) AS EXPR$0])
: : +- Reused(reference_id=[1])
: +- Exchange(distribution=[hash[EXPR$0]])
: +- Calc(select=[EXPR$0, true AS i])
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
: +- Exchange(distribution=[hash[EXPR$0]])
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
: +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
: +- Reused(reference_id=[1])
: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], build=[right], singleRowJoin=[true])
: : :- Calc(select=[a, b, c, c0, ck, i0])
: : : +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0], build=[right])
: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])
: : : : :- Exchange(distribution=[hash[a]])
: : : : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : : : +- Exchange(distribution=[broadcast])
: : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : : : +- Exchange(distribution=[single])
: : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1])
: : : : +- Calc(select=[i])
: : : : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: : : +- Calc(select=[i, true AS i0])
: : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
: : : +- Exchange(distribution=[hash[i]])
: : : +- LocalHashAggregate(groupBy=[i], select=[i])
: : : +- Calc(select=[i, true AS i0])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[broadcast])
: : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : +- Exchange(distribution=[single])
: : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1])
: : +- Calc(select=[CAST(j) AS EXPR$0])
: : +- Reused(reference_id=[1])
: +- Calc(select=[EXPR$0, true AS i])
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
: +- Exchange(distribution=[hash[EXPR$0]])
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
: +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[f]])
+- Calc(select=[d, f])
+- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
......@@ -2557,36 +2549,33 @@ Calc(select=[b])
+- NestedLoopJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(b, e), IS NULL(b), IS NULL(e)), OR(=($f3, d), IS NULL(d)))], select=[b, $f3], build=[right])
:- Calc(select=[b, CASE(OR(=(c0, 0), AND(<>(c0, 0), IS NULL(i0), >=(ck, c0), IS NOT NULL(a))), 1, OR(=(c, 0), AND(<>(c, 0), IS NULL(i), >=(ck0, c), IS NOT NULL(a))), 2, 3) AS $f3])
: +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, j)], select=[a, b, c0, ck, i0, c, ck0, j, i], build=[right])
: :- Exchange(distribution=[hash[a]])
: : +- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c0, ck, i0, c, ck0], build=[right], singleRowJoin=[true])
: : :- Calc(select=[a, b, c AS c0, ck, i0])
: : : +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, ck, i, i0], build=[right])
: : : :- Exchange(distribution=[hash[a]])
: : : : +- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, ck], build=[right], singleRowJoin=[true])
: : : : :- Calc(select=[a, b])
: : : : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : : : +- Exchange(distribution=[broadcast])
: : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : : : +- Exchange(distribution=[single])
: : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1])
: : : : +- TableSourceScan(table=[[t1, source: [TestTableSource(i)]]], fields=[i], reuse_id=[1])
: : : +- Exchange(distribution=[hash[i]])
: : : +- Calc(select=[i, true AS i0])
: : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
: : : +- Exchange(distribution=[hash[i]])
: : : +- LocalHashAggregate(groupBy=[i], select=[i])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[broadcast])
: : +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : +- Exchange(distribution=[single])
: : +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(j) AS count$1])
: : +- TableSourceScan(table=[[t2, source: [TestTableSource(j)]]], fields=[j], reuse_id=[2])
: +- Exchange(distribution=[hash[j]])
: +- Calc(select=[j, true AS i])
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j])
: +- Exchange(distribution=[hash[j]])
: +- LocalHashAggregate(groupBy=[j], select=[j])
: +- Reused(reference_id=[2])
: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c0, ck, i0, c, ck0], build=[right], singleRowJoin=[true])
: : :- Calc(select=[a, b, c AS c0, ck, i0])
: : : +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, ck, i, i0], build=[right])
: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, ck], build=[right], singleRowJoin=[true])
: : : : :- Exchange(distribution=[hash[a]])
: : : : : +- Calc(select=[a, b])
: : : : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : : : +- Exchange(distribution=[broadcast])
: : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : : : +- Exchange(distribution=[single])
: : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1])
: : : : +- TableSourceScan(table=[[t1, source: [TestTableSource(i)]]], fields=[i], reuse_id=[1])
: : : +- Calc(select=[i, true AS i0])
: : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
: : : +- Exchange(distribution=[hash[i]])
: : : +- LocalHashAggregate(groupBy=[i], select=[i])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[broadcast])
: : +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : +- Exchange(distribution=[single])
: : +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(j) AS count$1])
: : +- TableSourceScan(table=[[t2, source: [TestTableSource(j)]]], fields=[j], reuse_id=[2])
: +- Calc(select=[j, true AS i])
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j])
: +- Exchange(distribution=[hash[j]])
: +- LocalHashAggregate(groupBy=[j], select=[j])
: +- Reused(reference_id=[2])
+- Exchange(distribution=[broadcast])
+- Calc(select=[e, d])
+- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
......
......@@ -59,8 +59,8 @@ LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$6], b=[$7], c=[$8])
<![CDATA[
Calc(select=[d, e, f, g, h, a, b, c])
+- HashJoin(joinType=[FullOuterJoin], where=[AND(=(a, d), $f5, <(b, h))], select=[d, e, f, g, h, $f5, a, b, c], build=[right])
:- Exchange(distribution=[hash[d]])
: +- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
:- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
: +- Exchange(distribution=[hash[d]])
: +- TableSourceScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+- Exchange(distribution=[hash[a]])
+- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
......@@ -162,8 +162,8 @@ LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$6], b=[$7], c=[$8])
<![CDATA[
Calc(select=[d, e, f, g, h, a, b, c])
+- HashJoin(joinType=[LeftOuterJoin], where=[AND(=(a, d), $f5, <(b, h))], select=[d, e, f, g, h, $f5, a, b, c], build=[right])
:- Exchange(distribution=[hash[d]])
: +- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
:- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
: +- Exchange(distribution=[hash[d]])
: +- TableSourceScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+- Exchange(distribution=[hash[a]])
+- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
......
......@@ -790,12 +790,11 @@ Calc(select=[a])
: :- Exchange(distribution=[hash[e]])
: : +- Calc(select=[d, e])
: : +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
: +- Exchange(distribution=[hash[j]])
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j])
: +- Exchange(distribution=[hash[j]])
: +- LocalHashAggregate(groupBy=[j], select=[j])
: +- Calc(select=[j])
: +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j])
: +- Exchange(distribution=[hash[j]])
: +- LocalHashAggregate(groupBy=[j], select=[j])
: +- Calc(select=[j])
: +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
+- Exchange(distribution=[hash[i, k]])
+- Calc(select=[i, k])
+- Reused(reference_id=[1])
......@@ -1070,13 +1069,12 @@ Calc(select=[b])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Exchange(distribution=[broadcast])
+- SortAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
+- Exchange(distribution=[single])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalSortAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
]]>
</Resource>
</TestCase>
......@@ -1324,15 +1322,14 @@ Calc(select=[a])
:- Exchange(distribution=[hash[b]])
: +- Calc(select=[a, b])
: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[e]])
+- HashAggregate(isMerge=[true], groupBy=[e], select=[e])
+- Exchange(distribution=[hash[e]])
+- LocalHashAggregate(groupBy=[e], select=[e])
+- Union(all=[true], union=[e])
:- Calc(select=[e], where=[>(d, 10)])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Calc(select=[CAST(i) AS i], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+- HashAggregate(isMerge=[true], groupBy=[e], select=[e])
+- Exchange(distribution=[hash[e]])
+- LocalHashAggregate(groupBy=[e], select=[e])
+- Union(all=[true], union=[e])
:- Calc(select=[e], where=[>(d, 10)])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Calc(select=[CAST(i) AS i], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
]]>
</Resource>
</TestCase>
......@@ -1439,19 +1436,18 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 100))])
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left])
:- Exchange(distribution=[hash[a]])
: +- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], build=[right])
: :- Exchange(distribution=[hash[a]])
: : +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], build=[right])
: : :- Exchange(distribution=[hash[b]])
: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
: : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : +- Exchange(distribution=[hash[j]])
: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
: : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- Exchange(distribution=[hash[i]])
: +- Calc(select=[i], where=[<(j, 100)])
: +- Reused(reference_id=[1])
:- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], build=[right])
: :- Exchange(distribution=[hash[a]])
: : +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], build=[right])
: : :- Exchange(distribution=[hash[b]])
: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
: : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : +- Exchange(distribution=[hash[j]])
: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
: : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- Exchange(distribution=[hash[i]])
: +- Calc(select=[i], where=[<(j, 100)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[d]])
+- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
......@@ -1805,12 +1801,11 @@ LogicalFilter(condition=[=($cor0.a, $0)])
HashJoin(joinType=[LeftAntiJoin], where=[=(a, c)], select=[a, b], build=[right])
:- Exchange(distribution=[hash[a]])
: +- TableSourceScan(table=[[leftT, source: [TestTableSource(a, b)]]], fields=[a, b])
+- Exchange(distribution=[hash[c]])
+- HashAggregate(isMerge=[true], groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
+- LocalHashAggregate(groupBy=[c], select=[c])
+- Calc(select=[c])
+- TableSourceScan(table=[[rightT, source: [TestTableSource(c, d)]]], fields=[c, d])
+- HashAggregate(isMerge=[true], groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
+- LocalHashAggregate(groupBy=[c], select=[c])
+- Calc(select=[c])
+- TableSourceScan(table=[[rightT, source: [TestTableSource(c, d)]]], fields=[c, d])
]]>
</Resource>
</TestCase>
......@@ -1904,39 +1899,36 @@ Calc(select=[b])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[b, c, CASE(OR(=(c0, 0), AND(<>(c0, 0), IS NULL(i0), >=(ck, c0), IS NOT NULL(a))), 1, OR(=(c1, 0), AND(<>(c1, 0), IS NULL(i), >=(ck0, c1), IS NOT NULL(a))), 2, 3) AS $f3])
: +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, EXPR$0)], select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], build=[right])
: :- Exchange(distribution=[hash[a]])
: : +- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], build=[right], singleRowJoin=[true])
: : :- Calc(select=[a, b, c, c0, ck, i0])
: : : +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0], build=[right])
: : : :- Exchange(distribution=[hash[a]])
: : : : +- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])
: : : : :- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : : : +- Exchange(distribution=[broadcast])
: : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : : : +- Exchange(distribution=[single])
: : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1])
: : : : +- Calc(select=[i])
: : : : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: : : +- Exchange(distribution=[hash[i]])
: : : +- Calc(select=[i, true AS i0])
: : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
: : : +- Exchange(distribution=[hash[i]])
: : : +- LocalHashAggregate(groupBy=[i], select=[i])
: : : +- Calc(select=[i, true AS i0])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[broadcast])
: : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : +- Exchange(distribution=[single])
: : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1])
: : +- Calc(select=[CAST(j) AS EXPR$0])
: : +- Reused(reference_id=[1])
: +- Exchange(distribution=[hash[EXPR$0]])
: +- Calc(select=[EXPR$0, true AS i])
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
: +- Exchange(distribution=[hash[EXPR$0]])
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
: +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
: +- Reused(reference_id=[1])
: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], build=[right], singleRowJoin=[true])
: : :- Calc(select=[a, b, c, c0, ck, i0])
: : : +- HashJoin(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0], build=[right])
: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])
: : : : :- Exchange(distribution=[hash[a]])
: : : : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : : : +- Exchange(distribution=[broadcast])
: : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : : : +- Exchange(distribution=[single])
: : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1])
: : : : +- Calc(select=[i])
: : : : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: : : +- Calc(select=[i, true AS i0])
: : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
: : : +- Exchange(distribution=[hash[i]])
: : : +- LocalHashAggregate(groupBy=[i], select=[i])
: : : +- Calc(select=[i, true AS i0])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[broadcast])
: : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : +- Exchange(distribution=[single])
: : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1])
: : +- Calc(select=[CAST(j) AS EXPR$0])
: : +- Reused(reference_id=[1])
: +- Calc(select=[EXPR$0, true AS i])
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
: +- Exchange(distribution=[hash[EXPR$0]])
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
: +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[f]])
+- Calc(select=[d, f])
+- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
......
......@@ -184,36 +184,6 @@ Calc(select=[a2])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+- Calc(select=[0 AS $f0])
+- TableSourceScan(table=[[B, source: [TestTableSource(b1, b2)]]], fields=[b1, b2])
]]>
</Resource>
</TestCase>
<TestCase name="testSingleRowEquiJoin">
<Resource name="sql">
<![CDATA[SELECT a1, a2 FROM A, (SELECT COUNT(a1) AS cnt FROM A) WHERE a1 = cnt]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a1=[$0], a2=[$1])
+- LogicalFilter(condition=[=(CAST($0):BIGINT, $2)])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[A, source: [TestTableSource(a1, a2)]]])
+- LogicalAggregate(group=[{}], cnt=[COUNT($0)])
+- LogicalProject(a1=[$0])
+- LogicalTableScan(table=[[A, source: [TestTableSource(a1, a2)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a1, a2])
+- NestedLoopJoin(joinType=[InnerJoin], where=[=(CAST(a1), cnt)], select=[a1, a2, cnt], build=[right], singleRowJoin=[true])
:- Exchange(distribution=[any], exchange_mode=[BATCH])
: +- TableSourceScan(table=[[A, source: [TestTableSource(a1, a2)]]], fields=[a1, a2], reuse_id=[1])
+- Exchange(distribution=[broadcast])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS cnt])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(a1) AS count$0])
+- Calc(select=[a1])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
......@@ -274,13 +244,12 @@ Calc(select=[a2, EXPR$1])
: +- TableSourceScan(table=[[A, source: [TestTableSource(a1, a2)]]], fields=[a1, a2], reuse_id=[1])
+- Exchange(distribution=[broadcast])
+- SortAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
+- Exchange(distribution=[single])
+- Calc(select=[*($f0, 0.1:DECIMAL(2, 1)) AS EXPR$0])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_SUM(a1) AS sum$0])
+- Calc(select=[a1])
+- Reused(reference_id=[1])
+- Calc(select=[*($f0, 0.1:DECIMAL(2, 1)) AS EXPR$0])
+- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalSortAggregate(select=[Partial_SUM(a1) AS sum$0])
+- Calc(select=[a1])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
......
......@@ -59,8 +59,8 @@ LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$6], b=[$7], c=[$8])
<![CDATA[
Calc(select=[d, e, f, g, h, a, b, c])
+- SortMergeJoin(joinType=[FullOuterJoin], where=[AND(=(a, d), $f5, <(b, h))], select=[d, e, f, g, h, $f5, a, b, c])
:- Exchange(distribution=[hash[d]])
: +- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
:- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
: +- Exchange(distribution=[hash[d]])
: +- TableSourceScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+- Exchange(distribution=[hash[a]])
+- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
......@@ -162,8 +162,8 @@ LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$6], b=[$7], c=[$8])
<![CDATA[
Calc(select=[d, e, f, g, h, a, b, c])
+- SortMergeJoin(joinType=[LeftOuterJoin], where=[AND(=(a, d), $f5, <(b, h))], select=[d, e, f, g, h, $f5, a, b, c])
:- Exchange(distribution=[hash[d]])
: +- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
:- Calc(select=[d, e, f, g, h, <(d, 2) AS $f5])
: +- Exchange(distribution=[hash[d]])
: +- TableSourceScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+- Exchange(distribution=[hash[a]])
+- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
......
......@@ -847,12 +847,11 @@ Calc(select=[a])
: :- Exchange(distribution=[hash[e]])
: : +- Calc(select=[d, e])
: : +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
: +- Exchange(distribution=[hash[j]])
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j])
: +- Exchange(distribution=[hash[j]])
: +- LocalHashAggregate(groupBy=[j], select=[j])
: +- Calc(select=[j])
: +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j])
: +- Exchange(distribution=[hash[j]])
: +- LocalHashAggregate(groupBy=[j], select=[j])
: +- Calc(select=[j])
: +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
+- Exchange(distribution=[hash[i, k]])
+- Calc(select=[i, k])
+- Reused(reference_id=[1])
......@@ -1127,13 +1126,12 @@ Calc(select=[b])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Exchange(distribution=[broadcast])
+- SortAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
+- Exchange(distribution=[single])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+- Calc(select=[*(0.5:DECIMAL(2, 1), $f0) AS EXPR$0])
+- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+- Exchange(distribution=[single])
+- LocalSortAggregate(select=[Partial_SUM(j) AS sum$0])
+- Calc(select=[j], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
]]>
</Resource>
</TestCase>
......@@ -1409,15 +1407,14 @@ Calc(select=[a])
:- Exchange(distribution=[hash[b]])
: +- Calc(select=[a, b])
: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[e]])
+- HashAggregate(isMerge=[true], groupBy=[e], select=[e])
+- Exchange(distribution=[hash[e]])
+- LocalHashAggregate(groupBy=[e], select=[e])
+- Union(all=[true], union=[e])
:- Calc(select=[e], where=[>(d, 10)])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Calc(select=[CAST(i) AS i], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+- HashAggregate(isMerge=[true], groupBy=[e], select=[e])
+- Exchange(distribution=[hash[e]])
+- LocalHashAggregate(groupBy=[e], select=[e])
+- Union(all=[true], union=[e])
:- Calc(select=[e], where=[>(d, 10)])
: +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Calc(select=[CAST(i) AS i], where=[<(i, 100)])
+- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
]]>
</Resource>
</TestCase>
......@@ -1524,19 +1521,18 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 100))])
<Resource name="planAfter">
<![CDATA[
SortMergeJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f])
:- Exchange(distribution=[hash[a]])
: +- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c])
: :- Exchange(distribution=[hash[a]])
: : +- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c])
: : :- Exchange(distribution=[hash[b]])
: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
: : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : +- Exchange(distribution=[hash[j]])
: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
: : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- Exchange(distribution=[hash[i]])
: +- Calc(select=[i], where=[<(j, 100)])
: +- Reused(reference_id=[1])
:- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c])
: :- Exchange(distribution=[hash[a]])
: : +- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c])
: : :- Exchange(distribution=[hash[b]])
: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
: : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : +- Exchange(distribution=[hash[j]])
: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
: : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: +- Exchange(distribution=[hash[i]])
: +- Calc(select=[i], where=[<(j, 100)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[d]])
+- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
......@@ -1890,12 +1886,11 @@ LogicalFilter(condition=[=($cor0.a, $0)])
SortMergeJoin(joinType=[LeftAntiJoin], where=[=(a, c)], select=[a, b])
:- Exchange(distribution=[hash[a]])
: +- TableSourceScan(table=[[leftT, source: [TestTableSource(a, b)]]], fields=[a, b])
+- Exchange(distribution=[hash[c]])
+- HashAggregate(isMerge=[true], groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
+- LocalHashAggregate(groupBy=[c], select=[c])
+- Calc(select=[c])
+- TableSourceScan(table=[[rightT, source: [TestTableSource(c, d)]]], fields=[c, d])
+- HashAggregate(isMerge=[true], groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
+- LocalHashAggregate(groupBy=[c], select=[c])
+- Calc(select=[c])
+- TableSourceScan(table=[[rightT, source: [TestTableSource(c, d)]]], fields=[c, d])
]]>
</Resource>
</TestCase>
......@@ -1989,39 +1984,36 @@ Calc(select=[b])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[b, c, CASE(OR(=(c0, 0), AND(<>(c0, 0), IS NULL(i0), >=(ck, c0), IS NOT NULL(a))), 1, OR(=(c1, 0), AND(<>(c1, 0), IS NULL(i), >=(ck0, c1), IS NOT NULL(a))), 2, 3) AS $f3])
: +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(a, EXPR$0)], select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i])
: :- Exchange(distribution=[hash[a]])
: : +- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], build=[right], singleRowJoin=[true])
: : :- Calc(select=[a, b, c, c0, ck, i0])
: : : +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0])
: : : :- Exchange(distribution=[hash[a]])
: : : : +- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])
: : : : :- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : : : +- Exchange(distribution=[broadcast])
: : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : : : +- Exchange(distribution=[single])
: : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1])
: : : : +- Calc(select=[i])
: : : : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: : : +- Exchange(distribution=[hash[i]])
: : : +- Calc(select=[i, true AS i0])
: : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
: : : +- Exchange(distribution=[hash[i]])
: : : +- LocalHashAggregate(groupBy=[i], select=[i])
: : : +- Calc(select=[i, true AS i0])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[broadcast])
: : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : +- Exchange(distribution=[single])
: : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1])
: : +- Calc(select=[CAST(j) AS EXPR$0])
: : +- Reused(reference_id=[1])
: +- Exchange(distribution=[hash[EXPR$0]])
: +- Calc(select=[EXPR$0, true AS i])
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
: +- Exchange(distribution=[hash[EXPR$0]])
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
: +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
: +- Reused(reference_id=[1])
: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], build=[right], singleRowJoin=[true])
: : :- Calc(select=[a, b, c, c0, ck, i0])
: : : +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(a, i)], select=[a, b, c, c0, ck, i, i0])
: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])
: : : : :- Exchange(distribution=[hash[a]])
: : : : : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: : : : +- Exchange(distribution=[broadcast])
: : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : : : +- Exchange(distribution=[single])
: : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1])
: : : : +- Calc(select=[i])
: : : : +- TableSourceScan(table=[[t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
: : : +- Calc(select=[i, true AS i0])
: : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
: : : +- Exchange(distribution=[hash[i]])
: : : +- LocalHashAggregate(groupBy=[i], select=[i])
: : : +- Calc(select=[i, true AS i0])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[broadcast])
: : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
: : +- Exchange(distribution=[single])
: : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1])
: : +- Calc(select=[CAST(j) AS EXPR$0])
: : +- Reused(reference_id=[1])
: +- Calc(select=[EXPR$0, true AS i])
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
: +- Exchange(distribution=[hash[EXPR$0]])
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
: +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[f]])
+- Calc(select=[d, f])
+- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册