提交 6a0ada81 编写于 作者: K Kurt Young 提交者: Kurt Young

[FLINK-6149] [table] Add additional flink logical relation nodes and separate...

[FLINK-6149] [table] Add additional flink logical relation nodes and separate current optimization to logical and physical optimize

This closes #3594.
上级 2ef4900a
......@@ -32,7 +32,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
......@@ -206,7 +207,7 @@ abstract class BatchTableEnvironment(
/**
* Returns the built-in optimization rules that are defined by the environment.
*/
protected def getBuiltInOptRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES
protected def getBuiltInPhysicalOptRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES
/**
* Generates the optimized [[RelNode]] tree from the original relational node tree.
......@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
}
// 3. optimize the logical Flink plan
val optRuleSet = getOptRuleSet
val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
val optimizedPlan = if (optRuleSet.iterator().hasNext) {
runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
val logicalOptRuleSet = getLogicalOptRuleSet
val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
} else {
normalizedPlan
}
optimizedPlan
// 4. optimize the physical Flink plan
val physicalOptRuleSet = getPhysicalOptRuleSet
val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASET).simplify()
val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
} else {
logicalPlan
}
physicalPlan
}
/**
......
......@@ -32,7 +32,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
......@@ -240,7 +241,7 @@ abstract class StreamTableEnvironment(
/**
* Returns the built-in optimization rules that are defined by the environment.
*/
protected def getBuiltInOptRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
protected def getBuiltInPhysicalOptRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
/**
* Returns the built-in decoration rules that are defined by the environment.
......@@ -267,21 +268,31 @@ abstract class StreamTableEnvironment(
}
// 3. optimize the logical Flink plan
val optRuleSet = getOptRuleSet
val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify()
val optimizedPlan = if (optRuleSet.iterator().hasNext) {
runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
val logicalOptRuleSet = getLogicalOptRuleSet
val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
} else {
normalizedPlan
}
// 4. decorate the optimized plan
// 4. optimize the physical Flink plan
val physicalOptRuleSet = getPhysicalOptRuleSet
val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
} else {
logicalPlan
}
// 5. decorate the optimized plan
val decoRuleSet = getDecoRuleSet
val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, optimizedPlan, optimizedPlan.getTraitSet)
runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet)
} else {
optimizedPlan
physicalPlan
}
decoratedPlan
}
......
......@@ -54,6 +54,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkFor
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.RelTable
import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.table.sinks.TableSink
......@@ -149,21 +150,41 @@ abstract class TableEnvironment(val config: TableConfig) {
}
/**
* Returns the optimization rule set for this environment
* Returns the logical optimization rule set for this environment
* including a custom RuleSet configuration.
*/
protected def getOptRuleSet: RuleSet = {
protected def getLogicalOptRuleSet: RuleSet = {
val calciteConfig = config.getCalciteConfig
calciteConfig.getOptRuleSet match {
calciteConfig.getLogicalOptRuleSet match {
case None =>
getBuiltInOptRuleSet
getBuiltInLogicalOptRuleSet
case Some(ruleSet) =>
if (calciteConfig.replacesOptRuleSet) {
if (calciteConfig.replacesLogicalOptRuleSet) {
ruleSet
} else {
RuleSets.ofList((getBuiltInOptRuleSet.asScala ++ ruleSet.asScala).asJava)
RuleSets.ofList((getBuiltInLogicalOptRuleSet.asScala ++ ruleSet.asScala).asJava)
}
}
}
/**
* Returns the physical optimization rule set for this environment
* including a custom RuleSet configuration.
*/
protected def getPhysicalOptRuleSet: RuleSet = {
val calciteConfig = config.getCalciteConfig
calciteConfig.getPhysicalOptRuleSet match {
case None =>
getBuiltInPhysicalOptRuleSet
case Some(ruleSet) =>
if (calciteConfig.replacesPhysicalOptRuleSet) {
ruleSet
} else {
RuleSets.ofList((getBuiltInPhysicalOptRuleSet.asScala ++ ruleSet.asScala).asJava)
}
}
}
......@@ -194,9 +215,16 @@ abstract class TableEnvironment(val config: TableConfig) {
protected def getBuiltInNormRuleSet: RuleSet
/**
* Returns the built-in optimization rules that are defined by the environment.
* Returns the built-in logical optimization rules that are defined by the environment.
*/
protected def getBuiltInLogicalOptRuleSet: RuleSet = {
FlinkRuleSets.LOGICAL_OPT_RULES
}
/**
* Returns the built-in physical optimization rules that are defined by the environment.
*/
protected def getBuiltInOptRuleSet: RuleSet
protected def getBuiltInPhysicalOptRuleSet: RuleSet
/**
* run HEP planner
......
......@@ -40,10 +40,16 @@ class CalciteConfigBuilder {
private var normRuleSets: List[RuleSet] = Nil
/**
* Defines the optimization rule set. Optimization rules are used during volcano optimization.
* Defines the logical optimization rule set.
*/
private var replaceOptRules: Boolean = false
private var optRuleSets: List[RuleSet] = Nil
private var replaceLogicalOptRules: Boolean = false
private var logicalOptRuleSets: List[RuleSet] = Nil
/**
* Defines the physical optimization rule set.
*/
private var replacePhysicalOptRules: Boolean = false
private var physicalOptRuleSets: List[RuleSet] = Nil
/**
* Defines the decoration rule set. Decoration rules are dedicated for rewriting predicated
......@@ -85,19 +91,38 @@ class CalciteConfigBuilder {
/**
* Replaces the built-in optimization rule set with the given rule set.
*/
def replaceOptRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
def replaceLogicalOptRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
Preconditions.checkNotNull(replaceRuleSet)
optRuleSets = List(replaceRuleSet)
replaceOptRules = true
logicalOptRuleSets = List(replaceRuleSet)
replaceLogicalOptRules = true
this
}
/**
* Appends the given optimization rule set to the built-in rule set.
*/
def addOptRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
def addLogicalOptRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
Preconditions.checkNotNull(addedRuleSet)
optRuleSets = addedRuleSet :: optRuleSets
logicalOptRuleSets = addedRuleSet :: logicalOptRuleSets
this
}
/**
* Replaces the built-in optimization rule set with the given rule set.
*/
def replacePhysicalOptRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
Preconditions.checkNotNull(replaceRuleSet)
physicalOptRuleSets = List(replaceRuleSet)
replacePhysicalOptRules = true
this
}
/**
* Appends the given optimization rule set to the built-in rule set.
*/
def addPhysicalOptRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
Preconditions.checkNotNull(addedRuleSet)
physicalOptRuleSets = addedRuleSet :: physicalOptRuleSets
this
}
......@@ -156,15 +181,17 @@ class CalciteConfigBuilder {
}
private class CalciteConfigImpl(
val getNormRuleSet: Option[RuleSet],
val replacesNormRuleSet: Boolean,
val getOptRuleSet: Option[RuleSet],
val replacesOptRuleSet: Boolean,
val getDecoRuleSet: Option[RuleSet],
val replacesDecoRuleSet: Boolean,
val getSqlOperatorTable: Option[SqlOperatorTable],
val replacesSqlOperatorTable: Boolean,
val getSqlParserConfig: Option[SqlParser.Config])
val getNormRuleSet: Option[RuleSet],
val replacesNormRuleSet: Boolean,
val getLogicalOptRuleSet: Option[RuleSet],
val replacesLogicalOptRuleSet: Boolean,
val getPhysicalOptRuleSet: Option[RuleSet],
val replacesPhysicalOptRuleSet: Boolean,
val getDecoRuleSet: Option[RuleSet],
val replacesDecoRuleSet: Boolean,
val getSqlOperatorTable: Option[SqlOperatorTable],
val replacesSqlOperatorTable: Boolean,
val getSqlParserConfig: Option[SqlParser.Config])
extends CalciteConfig
......@@ -189,8 +216,10 @@ class CalciteConfigBuilder {
def build(): CalciteConfig = new CalciteConfigImpl(
getRuleSet(normRuleSets),
replaceNormRules,
getRuleSet(optRuleSets),
replaceOptRules,
getRuleSet(logicalOptRuleSets),
replaceLogicalOptRules,
getRuleSet(physicalOptRuleSets),
replacePhysicalOptRules,
getRuleSet(decoRuleSets),
replaceDecoRules,
operatorTables match {
......@@ -220,14 +249,24 @@ trait CalciteConfig {
def getNormRuleSet: Option[RuleSet]
/**
* Returns whether this configuration replaces the built-in optimization rule set.
* Returns whether this configuration replaces the built-in logical optimization rule set.
*/
def replacesLogicalOptRuleSet: Boolean
/**
* Returns a custom logical optimization rule set.
*/
def getLogicalOptRuleSet: Option[RuleSet]
/**
* Returns whether this configuration replaces the built-in physical optimization rule set.
*/
def replacesOptRuleSet: Boolean
def replacesPhysicalOptRuleSet: Boolean
/**
* Returns a custom optimization rule set.
* Returns a custom physical optimization rule set.
*/
def getOptRuleSet: Option[RuleSet]
def getPhysicalOptRuleSet: Option[RuleSet]
/**
* Returns whether this configuration replaces the built-in decoration rule set.
......@@ -257,7 +296,7 @@ trait CalciteConfig {
object CalciteConfig {
val DEFAULT = createBuilder().build()
val DEFAULT: CalciteConfig = createBuilder().build()
/**
* Creates a new builder for constructing a [[CalciteConfig]].
......
......@@ -18,13 +18,12 @@
package org.apache.flink.table.calcite
import java.util.Collections
import org.apache.calcite.plan.volcano.VolcanoPlanner
import java.lang.Iterable
import java.util.Collections
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan._
import org.apache.calcite.plan.volcano.VolcanoPlanner
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.logical.LogicalAggregate
import org.apache.calcite.rex.RexBuilder
......
......@@ -18,18 +18,20 @@
package org.apache.flink.table.plan.cost
import java.lang.Double
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.metadata.{ReflectiveRelMetadataProvider, RelMdRowCount, RelMetadataProvider, RelMetadataQuery}
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort}
import java.lang.Double
import org.apache.flink.table.plan.nodes.dataset.DataSetSort
object FlinkRelMdRowCount extends RelMdRowCount {
val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.ROW_COUNT.method,
this)
val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.ROW_COUNT.method,
this)
def getRowCount(rel: DataSetCalc, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq)
override def getRowCount(rel: Calc, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq)
def getRowCount(rel: DataSetSort, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq)
def getRowCount(rel: DataSetSort, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq)
}
......@@ -25,8 +25,8 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
import org.apache.calcite.rel.{RelNode, RelShuttle}
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkRelBuilder}
import FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.logical.LogicalWindow
class LogicalWindowAggregate(
......@@ -39,18 +39,11 @@ class LogicalWindowAggregate(
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall])
extends Aggregate(
cluster,
traitSet,
child,
indicator,
groupSet,
groupSets,
aggCalls) {
extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls) {
def getWindow = window
def getWindow: LogicalWindow = window
def getNamedProperties = namedProperties
def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
override def copy(
traitSet: RelTraitSet,
......
......@@ -16,32 +16,18 @@
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.dataset
package org.apache.flink.table.plan.nodes
import org.apache.calcite.plan._
import org.apache.calcite.plan.Convention
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalRel
class DataSetConvention extends Convention {
object FlinkConventions {
override def toString: String = getName
val LOGICAL: Convention = new Convention.Impl("LOGICAL", classOf[FlinkLogicalRel])
override def useAbstractConvertersForConversion(
fromTraits: RelTraitSet,
toTraits: RelTraitSet): Boolean = false
val DATASET: Convention = new Convention.Impl("DATASET", classOf[DataSetRel])
override def canConvertConvention(toConvention: Convention): Boolean = false
def getInterface: Class[_] = classOf[DataSetRel]
def getName: String = "DATASET"
def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
def register(planner: RelOptPlanner): Unit = { }
}
object DataSetConvention {
val INSTANCE = new DataSetConvention
val DATASTREAM: Convention = new Convention.Impl("DATASTREAM", classOf[DataStreamRel])
}
......@@ -18,6 +18,7 @@
package org.apache.flink.table.plan.nodes
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlAsOperator
......@@ -26,12 +27,12 @@ import org.apache.flink.table.api.TableException
import scala.collection.JavaConversions._
trait FlinkRel {
trait FlinkRelNode extends RelNode {
private[flink] def getExpressionString(
expr: RexNode,
inFields: List[String],
localExprsTable: Option[List[RexNode]]): String = {
expr: RexNode,
inFields: List[String],
localExprsTable: Option[List[RexNode]]): String = {
expr match {
case i: RexInputRef =>
......@@ -42,7 +43,7 @@ trait FlinkRel {
case l: RexLocalRef if localExprsTable.isEmpty =>
throw new IllegalArgumentException("Encountered RexLocalRef without " +
"local expression table")
"local expression table")
case l: RexLocalRef =>
val lExpr = localExprsTable.get(l.getIndex)
......
......@@ -28,7 +28,7 @@ import org.apache.flink.table.sources.TableSource
import scala.collection.JavaConverters._
abstract class TableSourceScan(
abstract class PhysicalTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
......@@ -66,6 +66,6 @@ abstract class TableSourceScan(
}
}
def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): TableSourceScan
def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): PhysicalTableSourceScan
}
......@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.plan.nodes.TableSourceScan
import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.{BatchTableSource, TableSource}
import org.apache.flink.types.Row
......@@ -34,7 +34,7 @@ class BatchTableSourceScan(
traitSet: RelTraitSet,
table: RelOptTable,
tableSource: BatchTableSource[_])
extends TableSourceScan(cluster, traitSet, table, tableSource)
extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
with BatchScan {
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
......@@ -51,7 +51,11 @@ class BatchTableSourceScan(
)
}
override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = {
override def copy(
traitSet: RelTraitSet,
newTableSource: TableSource[_])
: PhysicalTableSourceScan = {
new BatchTableSourceScan(
cluster,
traitSet,
......
......@@ -19,7 +19,6 @@ package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.logical.LogicalTableFunctionScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexCall, RexNode}
......@@ -29,6 +28,7 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.CommonCorrelate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
import org.apache.flink.types.Row
/**
......@@ -38,7 +38,7 @@ class DataSetCorrelate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputNode: RelNode,
scan: LogicalTableFunctionScan,
scan: FlinkLogicalTableFunctionScan,
condition: Option[RexNode],
relRowType: RelDataType,
joinRowType: RelDataType,
......@@ -92,7 +92,7 @@ class DataSetCorrelate(
// we do not need to specify input type
val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
val rexCall = funcRel.getCall.asInstanceOf[RexCall]
val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
val pojoFieldMapping = sqlFunction.getPojoFieldMapping
......
......@@ -18,13 +18,12 @@
package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.rel.RelNode
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.plan.nodes.FlinkRel
import org.apache.flink.table.plan.nodes.FlinkRelNode
import org.apache.flink.types.Row
trait DataSetRel extends RelNode with FlinkRel {
trait DataSetRel extends FlinkRelNode {
/**
* Translates the [[DataSetRel]] node into a [[DataSet]] operator.
......@@ -32,6 +31,6 @@ trait DataSetRel extends RelNode with FlinkRel {
* @param tableEnv The [[BatchTableEnvironment]] of the translated Table.
* @return DataSet of type [[Row]]
*/
def translateToPlan(tableEnv: BatchTableEnvironment) : DataSet[Row]
def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row]
}
......@@ -19,7 +19,6 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.logical.LogicalTableFunctionScan
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.sql.SemiJoinType
......@@ -28,6 +27,7 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.CommonCorrelate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
import org.apache.flink.types.Row
/**
......@@ -37,7 +37,7 @@ class DataStreamCorrelate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputNode: RelNode,
scan: LogicalTableFunctionScan,
scan: FlinkLogicalTableFunctionScan,
condition: Option[RexNode],
relRowType: RelDataType,
joinRowType: RelDataType,
......@@ -86,7 +86,7 @@ class DataStreamCorrelate(
// we do not need to specify input type
val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
val rexCall = funcRel.getCall.asInstanceOf[RexCall]
val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
val pojoFieldMapping = sqlFunction.getPojoFieldMapping
......
......@@ -18,13 +18,12 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.rel.RelNode
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
import org.apache.flink.table.plan.nodes.FlinkRel
import org.apache.flink.table.plan.nodes.FlinkRelNode
import org.apache.flink.types.Row
trait DataStreamRel extends RelNode with FlinkRel {
trait DataStreamRel extends FlinkRelNode {
/**
* Translates the FlinkRelNode into a Flink operator.
......
......@@ -53,11 +53,11 @@ class DataStreamUnion(
}
override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw).item("union", unionSelectionToString)
super.explainTerms(pw).item("union all", unionSelectionToString)
}
override def toString = {
s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
s"Union All(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
......
......@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
import org.apache.flink.table.plan.nodes.TableSourceScan
import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.types.Row
......@@ -34,7 +34,7 @@ class StreamTableSourceScan(
traitSet: RelTraitSet,
table: RelOptTable,
tableSource: StreamTableSource[_])
extends TableSourceScan(cluster, traitSet, table, tableSource)
extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
with StreamScan {
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
......@@ -51,7 +51,11 @@ class StreamTableSourceScan(
)
}
override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = {
override def copy(
traitSet: RelTraitSet,
newTableSource: TableSource[_])
: PhysicalTableSourceScan = {
new StreamTableSourceScan(
cluster,
traitSet,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import java.util.{List => JList}
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
import org.apache.calcite.rel.logical.LogicalAggregate
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.plan.nodes.FlinkConventions
class FlinkLogicalAggregate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
child: RelNode,
indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: JList[ImmutableBitSet],
aggCalls: JList[AggregateCall])
extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls)
with FlinkLogicalRel {
override def copy(
traitSet: RelTraitSet,
input: RelNode,
indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: JList[ImmutableBitSet],
aggCalls: JList[AggregateCall]): Aggregate = {
new FlinkLogicalAggregate(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls)
}
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val child = this.getInput
val rowCnt = metadata.getRowCount(child)
val rowSize = this.estimateRowSize(child.getRowType)
val aggCnt = this.aggCalls.size
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}
}
private class FlinkLogicalAggregateConverter
extends ConverterRule(
classOf[LogicalAggregate],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalAggregateConverter") {
override def matches(call: RelOptRuleCall): Boolean = {
val agg = call.rel(0).asInstanceOf[LogicalAggregate]
!agg.containsDistinctCall()
}
override def convert(rel: RelNode): RelNode = {
val agg = rel.asInstanceOf[LogicalAggregate]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
new FlinkLogicalAggregate(
rel.getCluster,
traitSet,
newInput,
agg.indicator,
agg.getGroupSet,
agg.getGroupSets,
agg.getAggCallList)
}
}
object FlinkLogicalAggregate {
val CONVERTER: ConverterRule = new FlinkLogicalAggregateConverter()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.logical.LogicalCalc
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexProgram
import org.apache.flink.table.plan.nodes.{CommonCalc, FlinkConventions}
class FlinkLogicalCalc(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
calcProgram: RexProgram)
extends Calc(cluster, traitSet, input, calcProgram)
with FlinkLogicalRel
with CommonCalc {
override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
new FlinkLogicalCalc(cluster, traitSet, child, program)
}
override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val child = this.getInput
val rowCnt = mq.getRowCount(child)
computeSelfCost(calcProgram, planner, rowCnt)
}
override def estimateRowCount(metadata: RelMetadataQuery): Double = {
val child = this.getInput
val rowCnt = metadata.getRowCount(child)
estimateRowCount(calcProgram, rowCnt)
}
}
private class FlinkLogicalCalcConverter
extends ConverterRule(
classOf[LogicalCalc],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalCalcConverter") {
override def convert(rel: RelNode): RelNode = {
val calc = rel.asInstanceOf[LogicalCalc]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)
new FlinkLogicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram)
}
}
object FlinkLogicalCalc {
val CONVERTER: ConverterRule = new FlinkLogicalCalcConverter()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{Correlate, CorrelationId}
import org.apache.calcite.rel.logical.LogicalCorrelate
import org.apache.calcite.sql.SemiJoinType
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.plan.nodes.FlinkConventions
class FlinkLogicalCorrelate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
left: RelNode,
right: RelNode,
correlationId: CorrelationId,
requiredColumns: ImmutableBitSet,
joinType: SemiJoinType)
extends Correlate(cluster, traitSet, left, right, correlationId, requiredColumns, joinType)
with FlinkLogicalRel {
override def copy(
traitSet: RelTraitSet,
left: RelNode,
right: RelNode,
correlationId: CorrelationId,
requiredColumns: ImmutableBitSet,
joinType: SemiJoinType): Correlate = {
new FlinkLogicalCorrelate(
cluster,
traitSet,
left,
right,
correlationId,
requiredColumns,
joinType)
}
}
class FlinkLogicalCorrelateConverter
extends ConverterRule(
classOf[LogicalCorrelate],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalCorrelateConverter") {
override def convert(rel: RelNode): RelNode = {
val correlate = rel.asInstanceOf[LogicalCorrelate]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newLeft = RelOptRule.convert(correlate.getLeft, FlinkConventions.LOGICAL)
val newRight = RelOptRule.convert(correlate.getRight, FlinkConventions.LOGICAL)
new FlinkLogicalCorrelate(
rel.getCluster,
traitSet,
newLeft,
newRight,
correlate.getCorrelationId,
correlate.getRequiredColumns,
correlate.getJoinType)
}
}
object FlinkLogicalCorrelate {
val CONVERTER: ConverterRule = new FlinkLogicalCorrelateConverter()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import java.util.{List => JList}
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{Intersect, SetOp}
import org.apache.calcite.rel.logical.LogicalIntersect
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.table.plan.nodes.FlinkConventions
import scala.collection.JavaConverters._
class FlinkLogicalIntersect(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputs: JList[RelNode],
all: Boolean)
extends Intersect(cluster, traitSet, inputs, all)
with FlinkLogicalRel {
override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: Boolean): SetOp = {
new FlinkLogicalIntersect(cluster, traitSet, inputs, all)
}
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val children = this.getInputs.asScala
children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
val rowCnt = metadata.getRowCount(child)
val rowSize = this.estimateRowSize(child.getRowType)
cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
}
}
}
private class FlinkLogicalIntersectConverter
extends ConverterRule(
classOf[LogicalIntersect],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalIntersectConverter") {
override def convert(rel: RelNode): RelNode = {
val intersect = rel.asInstanceOf[LogicalIntersect]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newInputs = intersect.getInputs.asScala
.map(input => RelOptRule.convert(input, FlinkConventions.LOGICAL)).asJava
new FlinkLogicalIntersect(rel.getCluster, traitSet, newInputs, intersect.all)
}
}
object FlinkLogicalIntersect {
val CONVERTER: ConverterRule = new FlinkLogicalIntersectConverter()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import org.apache.calcite.plan._
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.logical.LogicalJoin
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexNode
import org.apache.flink.table.plan.nodes.FlinkConventions
import scala.collection.JavaConverters._
class FlinkLogicalJoin(
cluster: RelOptCluster,
traitSet: RelTraitSet,
left: RelNode,
right: RelNode,
condition: RexNode,
joinType: JoinRelType)
extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId].asJava, joinType)
with FlinkLogicalRel {
override def copy(
traitSet: RelTraitSet,
conditionExpr: RexNode,
left: RelNode,
right: RelNode,
joinType: JoinRelType,
semiJoinDone: Boolean): Join = {
new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType)
}
override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val leftRowCnt = metadata.getRowCount(getLeft)
val leftRowSize = estimateRowSize(getLeft.getRowType)
val rightRowCnt = metadata.getRowCount(getRight)
val rightRowSize = estimateRowSize(getRight.getRowType)
val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
val cpuCost = leftRowCnt + rightRowCnt
val rowCnt = leftRowCnt + rightRowCnt
planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
}
}
private class FlinkLogicalJoinConverter
extends ConverterRule(
classOf[LogicalJoin],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalJoinConverter") {
override def matches(call: RelOptRuleCall): Boolean = {
val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
val joinInfo = join.analyzeCondition
hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
}
override def convert(rel: RelNode): RelNode = {
val join = rel.asInstanceOf[LogicalJoin]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
new FlinkLogicalJoin(
rel.getCluster,
traitSet,
newLeft,
newRight,
join.getCondition,
join.getJoinType)
}
private def hasEqualityPredicates(join: LogicalJoin, joinInfo: JoinInfo): Boolean = {
// joins require an equi-condition or a conjunctive predicate with at least one equi-condition
// and disable outer joins with non-equality predicates(see FLINK-5520)
!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
}
private def isSingleRowInnerJoin(join: LogicalJoin): Boolean = {
if (join.getJoinType == JoinRelType.INNER) {
isSingleRow(join.getRight) || isSingleRow(join.getLeft)
} else {
false
}
}
/**
* Recursively checks if a [[RelNode]] returns at most a single row.
* Input must be a global aggregation possibly followed by projections or filters.
*/
private def isSingleRow(node: RelNode): Boolean = {
node match {
case ss: RelSubset => isSingleRow(ss.getOriginal)
case lp: Project => isSingleRow(lp.getInput)
case lf: Filter => isSingleRow(lf.getInput)
case lc: Calc => isSingleRow(lc.getInput)
case la: Aggregate => la.getGroupSet.isEmpty
case _ => false
}
}
}
object FlinkLogicalJoin {
val CONVERTER: ConverterRule = new FlinkLogicalJoinConverter()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import java.util.{List => JList}
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{Minus, SetOp}
import org.apache.calcite.rel.logical.LogicalMinus
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.table.plan.nodes.FlinkConventions
import scala.collection.JavaConverters._
class FlinkLogicalMinus(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputs: JList[RelNode],
all: Boolean)
extends Minus(cluster, traitSet, inputs, all)
with FlinkLogicalRel {
override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: Boolean): SetOp = {
new FlinkLogicalMinus(cluster, traitSet, inputs, all)
}
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val children = this.getInputs.asScala
children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
val rowCnt = metadata.getRowCount(child)
val rowSize = this.estimateRowSize(child.getRowType)
cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
}
}
}
private class FlinkLogicalMinusConverter
extends ConverterRule(
classOf[LogicalMinus],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalMinusConverter") {
override def convert(rel: RelNode): RelNode = {
val minus = rel.asInstanceOf[LogicalMinus]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newInputs = minus.getInputs.asScala
.map(input => RelOptRule.convert(input, FlinkConventions.LOGICAL)).asJava
new FlinkLogicalMinus(rel.getCluster, traitSet, newInputs, minus.all)
}
}
object FlinkLogicalMinus {
val CONVERTER: ConverterRule = new FlinkLogicalMinusConverter()
}
......@@ -16,40 +16,61 @@
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.datastream
package org.apache.flink.table.plan.nodes.logical
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.FilterableTableSource
import java.util
class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
operand(classOf[DataStreamCalc],
operand(classOf[StreamTableSourceScan], none)),
"PushFilterIntoStreamTableSourceScanRule")
with PushFilterIntoTableSourceScanRuleBase {
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.schema.{DataSetTable, DataStreamTable}
class FlinkLogicalNativeTableScan (
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable)
extends TableScan(cluster, traitSet, table)
with FlinkLogicalRel {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new FlinkLogicalNativeTableScan(cluster, traitSet, getTable)
}
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val rowCnt = metadata.getRowCount(this)
planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
}
}
class FlinkLogicalNativeTableScanConverter
extends ConverterRule(
classOf[LogicalTableScan],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalNativeTableScanConverter") {
override def matches(call: RelOptRuleCall): Boolean = {
val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
scan.tableSource match {
case source: FilterableTableSource[_] =>
calc.getProgram.getCondition != null && !source.isFilterPushedDown
case _ => false
}
val scan = call.rel[TableScan](0)
val dataSetTable = scan.getTable.unwrap(classOf[DataSetTable[_]])
val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
dataSetTable != null || dataStreamTable != null
}
override def onMatch(call: RelOptRuleCall): Unit = {
val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]]
pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description)
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[TableScan]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
new FlinkLogicalNativeTableScan(
rel.getCluster,
traitSet,
scan.getTable
)
}
}
object PushFilterIntoStreamTableSourceScanRule {
val INSTANCE: RelOptRule = new PushFilterIntoStreamTableSourceScanRule
object FlinkLogicalNativeTableScan {
val CONVERTER = new FlinkLogicalNativeTableScanConverter
}
......@@ -16,40 +16,62 @@
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.FilterableTableSource
class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
operand(classOf[DataSetCalc],
operand(classOf[BatchTableSourceScan], none)),
"PushFilterIntoBatchTableSourceScanRule")
with PushFilterIntoTableSourceScanRuleBase {
override def matches(call: RelOptRuleCall): Boolean = {
val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
scan.tableSource match {
case source: FilterableTableSource[_] =>
calc.getProgram.getCondition != null && !source.isFilterPushedDown
case _ => false
}
package org.apache.flink.table.plan.nodes.logical
import java.util.{List => JList}
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.Window
import org.apache.calcite.rel.logical.LogicalWindow
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.table.plan.nodes.FlinkConventions
class FlinkLogicalOverWindow(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
windowConstants: JList[RexLiteral],
rowType: RelDataType,
windowGroups: JList[Window.Group])
extends Window(cluster, traitSet, input, windowConstants, rowType, windowGroups)
with FlinkLogicalRel {
override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
new FlinkLogicalOverWindow(
cluster,
traitSet,
inputs.get(0),
windowConstants,
rowType,
windowGroups)
}
}
class FlinkLogicalOverWindowConverter
extends ConverterRule(
classOf[LogicalWindow],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalOverWindowConverter") {
override def convert(rel: RelNode): RelNode = {
val window = rel.asInstanceOf[LogicalWindow]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newInput = RelOptRule.convert(window.getInput, FlinkConventions.LOGICAL)
override def onMatch(call: RelOptRuleCall): Unit = {
val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]]
pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description)
new FlinkLogicalOverWindow(
rel.getCluster,
traitSet,
newInput,
window.constants,
window.getRowType,
window.groups)
}
}
object PushFilterIntoBatchTableSourceScanRule {
val INSTANCE: RelOptRule = new PushFilterIntoBatchTableSourceScanRule
object FlinkLogicalOverWindow {
val CONVERTER = new FlinkLogicalOverWindowConverter
}
......@@ -16,32 +16,9 @@
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.datastream
package org.apache.flink.table.plan.nodes.logical
import org.apache.calcite.plan._
import org.apache.flink.table.plan.nodes.FlinkRelNode
class DataStreamConvention extends Convention {
override def toString: String = getName
override def useAbstractConvertersForConversion(
fromTraits: RelTraitSet,
toTraits: RelTraitSet): Boolean = false
override def canConvertConvention(toConvention: Convention): Boolean = false
def getInterface: Class[_] = classOf[DataStreamRel]
def getName: String = "DATASTREAM"
def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
def register(planner: RelOptPlanner): Unit = { }
}
object DataStreamConvention {
val INSTANCE = new DataStreamConvention
trait FlinkLogicalRel extends FlinkRelNode {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import org.apache.calcite.plan._
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.Sort
import org.apache.calcite.rel.logical.LogicalSort
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelNode}
import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.flink.table.plan.nodes.FlinkConventions
class FlinkLogicalSort(
cluster: RelOptCluster,
traits: RelTraitSet,
child: RelNode,
collation: RelCollation,
sortOffset: RexNode,
sortFetch: RexNode)
extends Sort(cluster, traits, child, collation, sortOffset, sortFetch)
with FlinkLogicalRel {
private val limitStart: Long = if (offset != null) {
RexLiteral.intValue(offset)
} else {
0L
}
override def copy(
traitSet: RelTraitSet,
newInput: RelNode,
newCollation: RelCollation,
offset: RexNode,
fetch: RexNode): Sort = {
new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch)
}
override def estimateRowCount(metadata: RelMetadataQuery): Double = {
val inputRowCnt = metadata.getRowCount(this.getInput)
if (inputRowCnt == null) {
inputRowCnt
} else {
val rowCount = (inputRowCnt - limitStart).max(1.0)
if (fetch != null) {
val limit = RexLiteral.intValue(fetch)
rowCount.min(limit)
} else {
rowCount
}
}
}
override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
// by default, assume cost is proportional to number of rows
val rowCount: Double = mq.getRowCount(this)
planner.getCostFactory.makeCost(rowCount, rowCount, 0)
}
}
class FlinkLogicalSortConverter
extends ConverterRule(
classOf[LogicalSort],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalSortConverter") {
override def convert(rel: RelNode): RelNode = {
val sort = rel.asInstanceOf[LogicalSort]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newInput = RelOptRule.convert(sort.getInput, FlinkConventions.LOGICAL)
new FlinkLogicalSort(rel.getCluster,
traitSet,
newInput,
sort.getCollation,
sort.offset,
sort.fetch)
}
}
object FlinkLogicalSort {
val CONVERTER: RelOptRule = new FlinkLogicalSortConverter
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import java.lang.reflect.Type
import java.util.{List => JList, Set => JSet}
import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{TableFunctionScan, TableScan}
import org.apache.calcite.rel.logical.{LogicalTableFunctionScan, LogicalTableScan}
import org.apache.calcite.rel.metadata.RelColumnMapping
import org.apache.calcite.rex.RexNode
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.schema.TableSourceTable
class FlinkLogicalTableFunctionScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputs: JList[RelNode],
rexCall: RexNode,
elementType: Type,
rowType: RelDataType,
columnMappings: JSet[RelColumnMapping])
extends TableFunctionScan(
cluster,
traitSet,
inputs,
rexCall,
elementType,
rowType,
columnMappings)
with FlinkLogicalRel {
override def copy(traitSet: RelTraitSet,
inputs: JList[RelNode],
rexCall: RexNode,
elementType: Type,
rowType: RelDataType,
columnMappings: JSet[RelColumnMapping]): TableFunctionScan = {
new FlinkLogicalTableFunctionScan(
cluster,
traitSet,
inputs,
rexCall,
elementType,
rowType,
columnMappings)
}
}
class FlinkLogicalTableFunctionScanConverter
extends ConverterRule(
classOf[LogicalTableFunctionScan],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalTableFunctionScanConverter") {
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[LogicalTableFunctionScan]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
new FlinkLogicalTableFunctionScan(
rel.getCluster,
traitSet,
scan.getInputs,
scan.getCall,
scan.getElementType,
scan.getRowType,
scan.getColumnMappings
)
}
}
object FlinkLogicalTableFunctionScan {
val CONVERTER = new FlinkLogicalTableFunctionScanConverter
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.TableSource
import scala.collection.JavaConverters._
class FlinkLogicalTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
val tableSource: TableSource[_])
extends TableScan(cluster, traitSet, table)
with FlinkLogicalRel {
def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): FlinkLogicalTableSourceScan = {
new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource)
}
override def deriveRowType(): RelDataType = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
flinkTypeFactory.buildRowDataType(
TableEnvironment.getFieldNames(tableSource),
TableEnvironment.getFieldTypes(tableSource.getReturnType))
}
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val rowCnt = metadata.getRowCount(this)
planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
}
override def explainTerms(pw: RelWriter): RelWriter = {
val terms = super.explainTerms(pw)
.item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
val sourceDesc = tableSource.explainSource()
if (sourceDesc.nonEmpty) {
terms.item("source", sourceDesc)
} else {
terms
}
}
override def toString: String = {
val tableName = getTable.getQualifiedName
val s = s"table:$tableName, fields:(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
val sourceDesc = tableSource.explainSource()
if (sourceDesc.nonEmpty) {
s"Scan($s, source:$sourceDesc)"
} else {
s"Scan($s)"
}
}
}
class FlinkLogicalTableSourceScanConverter
extends ConverterRule(
classOf[LogicalTableScan],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalTableSourceScanConverter") {
override def matches(call: RelOptRuleCall): Boolean = {
val scan = call.rel[TableScan](0)
val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
tableSourceTable match {
case _: TableSourceTable[_] => true
case _ => false
}
}
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[TableScan]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val tableSource = scan.getTable.unwrap(classOf[TableSourceTable[_]]).tableSource
new FlinkLogicalTableSourceScan(
rel.getCluster,
traitSet,
scan.getTable,
tableSource
)
}
}
object FlinkLogicalTableSourceScan {
val CONVERTER = new FlinkLogicalTableSourceScanConverter
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import java.util.{List => JList}
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{SetOp, Union}
import org.apache.calcite.rel.logical.LogicalUnion
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.table.plan.nodes.FlinkConventions
import scala.collection.JavaConverters._
class FlinkLogicalUnion(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputs: JList[RelNode],
all: Boolean)
extends Union(cluster, traitSet, inputs, all)
with FlinkLogicalRel {
override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: Boolean): SetOp = {
new FlinkLogicalUnion(cluster, traitSet, inputs, all)
}
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val children = this.getInputs.asScala
val rowCnt = children.foldLeft(0D) { (rows, child) =>
rows + metadata.getRowCount(child)
}
planner.getCostFactory.makeCost(rowCnt, 0, 0)
}
}
private class FlinkLogicalUnionConverter
extends ConverterRule(
classOf[LogicalUnion],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalUnionConverter") {
/**
* Only translate UNION ALL.
*/
override def matches(call: RelOptRuleCall): Boolean = {
val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion]
union.all
}
override def convert(rel: RelNode): RelNode = {
val union = rel.asInstanceOf[LogicalUnion]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newInputs = union.getInputs.asScala
.map(input => RelOptRule.convert(input, FlinkConventions.LOGICAL)).asJava
new FlinkLogicalUnion(rel.getCluster, traitSet, newInputs, union.all)
}
}
object FlinkLogicalUnion {
val CONVERTER: ConverterRule = new FlinkLogicalUnionConverter()
def create(inputs: JList[RelNode], all: Boolean): FlinkLogicalUnion = {
val cluster: RelOptCluster = inputs.get(0).getCluster
val traitSet: RelTraitSet = cluster.traitSetOf(FlinkConventions.LOGICAL)
new FlinkLogicalUnion(cluster, traitSet, inputs, all)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import java.util.{List => JList}
import com.google.common.base.Supplier
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rel.logical.LogicalValues
import org.apache.calcite.rel.metadata.{RelMdCollation, RelMetadataQuery}
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.table.plan.nodes.FlinkConventions
class FlinkLogicalValues(
cluster: RelOptCluster,
traitSet: RelTraitSet,
rowRelDataType: RelDataType,
tuples: ImmutableList[ImmutableList[RexLiteral]])
extends Values(cluster, rowRelDataType, tuples, traitSet)
with FlinkLogicalRel {
override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val dRows = mq.getRowCount(this)
// Assume CPU is negligible since values are precomputed.
val dCpu = 1
val dIo = 0
planner.getCostFactory.makeCost(dRows, dCpu, dIo)
}
}
private class FlinkLogicalValuesConverter
extends ConverterRule(
classOf[LogicalValues],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalValuesConverter") {
override def convert(rel: RelNode): RelNode = {
val values = rel.asInstanceOf[LogicalValues]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
new FlinkLogicalValues(rel.getCluster, traitSet, values.getRowType, values.getTuples())
}
}
object FlinkLogicalValues {
val CONVERTER: ConverterRule = new FlinkLogicalValuesConverter()
def create(cluster: RelOptCluster,
rowType: RelDataType,
tuples: ImmutableList[ImmutableList[RexLiteral]]): FlinkLogicalValues = {
val mq: RelMetadataQuery = RelMetadataQuery.instance
val traitSet: RelTraitSet = cluster.traitSetOf(FlinkConventions.LOGICAL)
.replaceIfs(
RelCollationTraitDef.INSTANCE,
new Supplier[JList[RelCollation]]() {
def get: JList[RelCollation] = RelMdCollation.values(mq, rowType, tuples)
})
new FlinkLogicalValues(cluster, traitSet, rowType, tuples)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.logical
import java.util
import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelShuttle}
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.logical.LogicalWindow
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import org.apache.flink.table.plan.nodes.FlinkConventions
class FlinkLogicalWindowAggregate(
window: LogicalWindow,
namedProperties: Seq[NamedWindowProperty],
cluster: RelOptCluster,
traitSet: RelTraitSet,
child: RelNode,
indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall])
extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls)
with FlinkLogicalRel {
def getWindow: LogicalWindow = window
def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
override def copy(
traitSet: RelTraitSet,
input: RelNode,
indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall])
: Aggregate = {
new FlinkLogicalWindowAggregate(
window,
namedProperties,
cluster,
traitSet,
input,
indicator,
groupSet,
groupSets,
aggCalls)
}
override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this)
override def deriveRowType(): RelDataType = {
val aggregateRowType = super.deriveRowType()
val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
val builder = typeFactory.builder
builder.addAll(aggregateRowType.getFieldList)
namedProperties.foreach { namedProp =>
builder.add(
namedProp.name,
typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
)
}
builder.build()
}
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val child = this.getInput
val rowCnt = metadata.getRowCount(child)
val rowSize = this.estimateRowSize(child.getRowType)
val aggCnt = this.aggCalls.size
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}
}
class FlinkLogicalWindowAggregateConverter
extends ConverterRule(
classOf[LogicalWindowAggregate],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
override def convert(rel: RelNode): RelNode = {
val agg = rel.asInstanceOf[LogicalWindowAggregate]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
new FlinkLogicalWindowAggregate(
agg.getWindow,
agg.getNamedProperties,
rel.getCluster,
traitSet,
newInput,
agg.indicator,
agg.getGroupSet,
agg.getGroupSets,
agg.getAggCallList)
}
}
object FlinkLogicalWindowAggregate {
val CONVERTER = new FlinkLogicalWindowAggregateConverter
}
......@@ -23,29 +23,12 @@ import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.table.plan.rules.common._
import org.apache.flink.table.plan.rules.dataSet._
import org.apache.flink.table.plan.rules.datastream._
import org.apache.flink.table.plan.rules.logical._
import org.apache.flink.table.plan.nodes.logical._
object FlinkRuleSets {
/**
* RuleSet to normalize plans for batch / DataSet execution
*/
val DATASET_NORM_RULES: RuleSet = RuleSets.ofList(
// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE,
ProjectToWindowRule.PROJECT,
// Transform window to LogicalWindowAggregate
DataSetLogicalWindowAggregateRule.INSTANCE,
WindowStartEndPropertiesRule.INSTANCE
)
/**
* RuleSet to optimize plans for batch / DataSet execution
*/
val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
// convert a logical table scan to a relational expression
TableScanRule.INSTANCE,
......@@ -108,6 +91,48 @@ object FlinkRuleSets {
ProjectToCalcRule.INSTANCE,
CalcMergeRule.INSTANCE,
// scan optimization
PushProjectIntoTableSourceScanRule.INSTANCE,
PushFilterIntoTableSourceScanRule.INSTANCE,
// translate to flink logical rel nodes
FlinkLogicalAggregate.CONVERTER,
FlinkLogicalWindowAggregate.CONVERTER,
FlinkLogicalOverWindow.CONVERTER,
FlinkLogicalCalc.CONVERTER,
FlinkLogicalCorrelate.CONVERTER,
FlinkLogicalIntersect.CONVERTER,
FlinkLogicalJoin.CONVERTER,
FlinkLogicalMinus.CONVERTER,
FlinkLogicalSort.CONVERTER,
FlinkLogicalUnion.CONVERTER,
FlinkLogicalValues.CONVERTER,
FlinkLogicalTableSourceScan.CONVERTER,
FlinkLogicalTableFunctionScan.CONVERTER,
FlinkLogicalNativeTableScan.CONVERTER
)
/**
* RuleSet to normalize plans for batch / DataSet execution
*/
val DATASET_NORM_RULES: RuleSet = RuleSets.ofList(
// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE,
ProjectToWindowRule.PROJECT,
// Transform window to LogicalWindowAggregate
DataSetLogicalWindowAggregateRule.INSTANCE,
WindowStartEndPropertiesRule.INSTANCE
)
/**
* RuleSet to optimize plans for batch / DataSet execution
*/
val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
// translate to Flink DataSet nodes
DataSetWindowAggregateRule.INSTANCE,
DataSetAggregateRule.INSTANCE,
......@@ -123,11 +148,7 @@ object FlinkRuleSets {
DataSetSortRule.INSTANCE,
DataSetValuesRule.INSTANCE,
DataSetCorrelateRule.INSTANCE,
BatchTableSourceScanRule.INSTANCE,
// scan optimization
PushProjectIntoBatchTableSourceScanRule.INSTANCE,
PushFilterIntoBatchTableSourceScanRule.INSTANCE
BatchTableSourceScanRule.INSTANCE
)
/**
......@@ -146,47 +167,18 @@ object FlinkRuleSets {
)
/**
* RuleSet to optimize plans for stream / DataStream execution
*/
* RuleSet to optimize plans for stream / DataStream execution
*/
val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
// convert a logical table scan to a relational expression
TableScanRule.INSTANCE,
EnumerableToLogicalTableScan.INSTANCE,
// calc rules
FilterToCalcRule.INSTANCE,
ProjectToCalcRule.INSTANCE,
FilterCalcMergeRule.INSTANCE,
ProjectCalcMergeRule.INSTANCE,
CalcMergeRule.INSTANCE,
// prune empty results rules
PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.UNION_INSTANCE,
// push and merge projection rules
ProjectFilterTransposeRule.INSTANCE,
FilterProjectTransposeRule.INSTANCE,
ProjectRemoveRule.INSTANCE,
// merge and push unions rules
UnionEliminatorRule.INSTANCE,
// translate to DataStream nodes
DataStreamOverAggregateRule.INSTANCE,
DataStreamAggregateRule.INSTANCE,
DataStreamCalcRule.INSTANCE,
DataStreamScanRule.INSTANCE,
DataStreamUnionRule.INSTANCE,
DataStreamValuesRule.INSTANCE,
DataStreamCorrelateRule.INSTANCE,
StreamTableSourceScanRule.INSTANCE,
// scan optimization
PushProjectIntoStreamTableSourceScanRule.INSTANCE,
PushFilterIntoStreamTableSourceScanRule.INSTANCE
// translate to DataStream nodes
DataStreamOverAggregateRule.INSTANCE,
DataStreamAggregateRule.INSTANCE,
DataStreamCalcRule.INSTANCE,
DataStreamScanRule.INSTANCE,
DataStreamUnionRule.INSTANCE,
DataStreamValuesRule.INSTANCE,
DataStreamCorrelateRule.INSTANCE,
StreamTableSourceScanRule.INSTANCE
)
/**
......@@ -194,7 +186,6 @@ object FlinkRuleSets {
*/
val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
// rules
)
}
......@@ -18,23 +18,22 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetConvention}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan
import org.apache.flink.table.sources.BatchTableSource
/** Rule to convert a [[LogicalTableScan]] into a [[BatchTableSourceScan]]. */
class BatchTableSourceScanRule
extends ConverterRule(
classOf[LogicalTableScan],
Convention.NONE,
DataSetConvention.INSTANCE,
"BatchTableSourceScanRule")
{
classOf[FlinkLogicalTableSourceScan],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"BatchTableSourceScanRule") {
/** Rule must only match if TableScan targets a [[BatchTableSource]] */
override def matches(call: RelOptRuleCall): Boolean = {
......@@ -54,16 +53,13 @@ class BatchTableSourceScanRule
}
def convert(rel: RelNode): RelNode = {
val scan: TableScan = rel.asInstanceOf[TableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val tableSource = scan.getTable.unwrap(classOf[TableSourceTable[_]]).tableSource
.asInstanceOf[BatchTableSource[_]]
val scan: FlinkLogicalTableSourceScan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
new BatchTableSourceScan(
rel.getCluster,
traitSet,
scan.getTable,
tableSource
scan.tableSource.asInstanceOf[BatchTableSource[_]]
)
}
}
......
......@@ -18,22 +18,24 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalAggregate
import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention, DataSetUnion}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetUnion}
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalAggregate
import scala.collection.JavaConversions._
class DataSetAggregateRule
extends ConverterRule(
classOf[LogicalAggregate],
Convention.NONE,
DataSetConvention.INSTANCE,
classOf[FlinkLogicalAggregate],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetAggregateRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
val agg: FlinkLogicalAggregate = call.rel(0).asInstanceOf[FlinkLogicalAggregate]
// for non-grouped agg sets we attach null row to source data
// we need to apply DataSetAggregateWithNullValuesRule
......@@ -56,9 +58,9 @@ class DataSetAggregateRule
}
override def convert(rel: RelNode): RelNode = {
val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
val agg: FlinkLogicalAggregate = rel.asInstanceOf[FlinkLogicalAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConventions.DATASET)
if (agg.indicator) {
agg.groupSets.map(set =>
......
......@@ -17,16 +17,16 @@
*/
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan._
import scala.collection.JavaConversions._
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalUnion, LogicalValues}
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetAggregate
import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalUnion, FlinkLogicalValues}
import scala.collection.JavaConversions._
/**
* Rule for insert [[org.apache.flink.types.Row]] with null records into a [[DataSetAggregate]].
......@@ -34,13 +34,13 @@ import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConve
*/
class DataSetAggregateWithNullValuesRule
extends ConverterRule(
classOf[LogicalAggregate],
Convention.NONE,
DataSetConvention.INSTANCE,
classOf[FlinkLogicalAggregate],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetAggregateWithNullValuesRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
val agg: FlinkLogicalAggregate = call.rel(0).asInstanceOf[FlinkLogicalAggregate]
// group sets shouldn't attach a null row
// we need to apply other rules. i.e. DataSetAggregateRule
......@@ -55,8 +55,8 @@ class DataSetAggregateWithNullValuesRule
}
override def convert(rel: RelNode): RelNode = {
val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val agg: FlinkLogicalAggregate = rel.asInstanceOf[FlinkLogicalAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val cluster: RelOptCluster = rel.getCluster
val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType)
......@@ -68,13 +68,13 @@ class DataSetAggregateWithNullValuesRule
makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral]
}))
val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals)
val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true)
val logicalValues = FlinkLogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals)
val logicalUnion = FlinkLogicalUnion.create(List(logicalValues, agg.getInput), all = true)
new DataSetAggregate(
cluster,
traitSet,
RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE),
RelOptRule.convert(logicalUnion, FlinkConventions.DATASET),
agg.getNamedAggCalls,
rel.getRowType,
agg.getInput.getRowType,
......
......@@ -18,24 +18,24 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalCalc
import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetCalc
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCalc
class DataSetCalcRule
extends ConverterRule(
classOf[LogicalCalc],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetCalcRule")
{
classOf[FlinkLogicalCalc],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetCalcRule") {
def convert(rel: RelNode): RelNode = {
val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE)
val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convInput: RelNode = RelOptRule.convert(calc.getInput, FlinkConventions.DATASET)
new DataSetCalc(
rel.getCluster,
......
......@@ -18,45 +18,41 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
import org.apache.calcite.rex.RexNode
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
/**
* Rule to convert a LogicalCorrelate into a DataSetCorrelate.
*/
class DataSetCorrelateRule
extends ConverterRule(
classOf[LogicalCorrelate],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetCorrelateRule") {
classOf[FlinkLogicalCorrelate],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetCorrelateRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
val join: FlinkLogicalCorrelate = call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
val right = join.getRight.asInstanceOf[RelSubset].getOriginal
right match {
// right node is a table function
case scan: LogicalTableFunctionScan => true
case scan: FlinkLogicalTableFunctionScan => true
// a filter is pushed above the table function
case filter: LogicalFilter =>
filter
.getInput.asInstanceOf[RelSubset]
.getOriginal
.isInstanceOf[LogicalTableFunctionScan]
case calc: FlinkLogicalCalc =>
calc.getInput.asInstanceOf[RelSubset]
.getOriginal.isInstanceOf[FlinkLogicalTableFunctionScan]
case _ => false
}
}
override def convert(rel: RelNode): RelNode = {
val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
val join: FlinkLogicalCorrelate = rel.asInstanceOf[FlinkLogicalCorrelate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convInput: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.DATASET)
val right: RelNode = join.getInput(1)
def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataSetCorrelate = {
......@@ -64,12 +60,12 @@ class DataSetCorrelateRule
case rel: RelSubset =>
convertToCorrelate(rel.getRelList.get(0), condition)
case filter: LogicalFilter =>
case calc: FlinkLogicalCalc =>
convertToCorrelate(
filter.getInput.asInstanceOf[RelSubset].getOriginal,
Some(filter.getCondition))
calc.getInput.asInstanceOf[RelSubset].getOriginal,
Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition)))
case scan: LogicalTableFunctionScan =>
case scan: FlinkLogicalTableFunctionScan =>
new DataSetCorrelate(
rel.getCluster,
traitSet,
......
......@@ -18,22 +18,22 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalAggregate
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetDistinct}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetDistinct
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalAggregate
class DataSetDistinctRule
extends ConverterRule(
classOf[LogicalAggregate],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetDistinctRule")
{
classOf[FlinkLogicalAggregate],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetDistinctRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
val agg: FlinkLogicalAggregate = call.rel(0).asInstanceOf[FlinkLogicalAggregate]
// only accept distinct
agg.getAggCallList.isEmpty &&
......@@ -43,9 +43,9 @@ class DataSetDistinctRule
}
def convert(rel: RelNode): RelNode = {
val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
val agg: FlinkLogicalAggregate = rel.asInstanceOf[FlinkLogicalAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConventions.DATASET)
new DataSetDistinct(
rel.getCluster,
......
......@@ -18,26 +18,26 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalIntersect
import org.apache.flink.table.plan.nodes.dataset.{DataSetIntersect, DataSetConvention}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetIntersect
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalIntersect
class DataSetIntersectRule
extends ConverterRule(
classOf[LogicalIntersect],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetIntersectRule")
{
classOf[FlinkLogicalIntersect],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetIntersectRule") {
def convert(rel: RelNode): RelNode = {
val intersect: LogicalIntersect = rel.asInstanceOf[LogicalIntersect]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convLeft: RelNode = RelOptRule.convert(intersect.getInput(0), DataSetConvention.INSTANCE)
val convRight: RelNode = RelOptRule.convert(intersect.getInput(1), DataSetConvention.INSTANCE)
val intersect: FlinkLogicalIntersect = rel.asInstanceOf[FlinkLogicalIntersect]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convLeft: RelNode = RelOptRule.convert(intersect.getInput(0), FlinkConventions.DATASET)
val convRight: RelNode = RelOptRule.convert(intersect.getInput(1), FlinkConventions.DATASET)
new DataSetIntersect(
rel.getCluster,
......
......@@ -18,24 +18,25 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.logical.LogicalJoin
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetJoin}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetJoin
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
import scala.collection.JavaConversions._
class DataSetJoinRule
extends ConverterRule(
classOf[LogicalJoin],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetJoinRule") {
classOf[FlinkLogicalJoin],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetJoinRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
val joinInfo = join.analyzeCondition
......@@ -46,10 +47,10 @@ class DataSetJoinRule
override def convert(rel: RelNode): RelNode = {
val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
val join: FlinkLogicalJoin = rel.asInstanceOf[FlinkLogicalJoin]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convLeft: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.DATASET)
val convRight: RelNode = RelOptRule.convert(join.getInput(1), FlinkConventions.DATASET)
val joinInfo = join.analyzeCondition
new DataSetJoin(
......
......@@ -18,27 +18,26 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalMinus
import org.apache.calcite.rel.rules.UnionToDistinctRule
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetMinus}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetMinus
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMinus
class DataSetMinusRule
extends ConverterRule(
classOf[LogicalMinus],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetMinusRule")
{
classOf[FlinkLogicalMinus],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetMinusRule") {
def convert(rel: RelNode): RelNode = {
val minus: LogicalMinus = rel.asInstanceOf[LogicalMinus]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), DataSetConvention.INSTANCE)
val convRight: RelNode = RelOptRule.convert(minus.getInput(1), DataSetConvention.INSTANCE)
val minus: FlinkLogicalMinus = rel.asInstanceOf[FlinkLogicalMinus]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), FlinkConventions.DATASET)
val convRight: RelNode = RelOptRule.convert(minus.getInput(1), FlinkConventions.DATASET)
new DataSetMinus(
rel.getCluster,
......
......@@ -18,27 +18,26 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetScan}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetScan
import org.apache.flink.table.plan.schema.DataSetTable
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalNativeTableScan
class DataSetScanRule
extends ConverterRule(
classOf[LogicalTableScan],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetScanRule")
{
classOf[FlinkLogicalNativeTableScan],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetScanRule") {
/**
* If the input is not a DataSetTable, we want the TableScanRule to match instead
*/
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
val scan: FlinkLogicalNativeTableScan = call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan]
val dataSetTable = scan.getTable.unwrap(classOf[DataSetTable[Any]])
dataSetTable match {
case _: DataSetTable[Any] =>
......@@ -49,8 +48,8 @@ class DataSetScanRule
}
def convert(rel: RelNode): RelNode = {
val scan: TableScan = rel.asInstanceOf[TableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val scan: FlinkLogicalNativeTableScan = rel.asInstanceOf[FlinkLogicalNativeTableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
new DataSetScan(
rel.getCluster,
......
......@@ -19,22 +19,23 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.logical._
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowJoin}
import org.apache.calcite.rel.core._
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetSingleRowJoin
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
class DataSetSingleRowJoinRule
extends ConverterRule(
classOf[LogicalJoin],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetSingleRowJoinRule") {
classOf[FlinkLogicalJoin],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetSingleRowJoinRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val join = call.rel(0).asInstanceOf[LogicalJoin]
val join = call.rel(0).asInstanceOf[FlinkLogicalJoin]
if (isInnerJoin(join)) {
isSingleRow(join.getRight) || isSingleRow(join.getLeft)
......@@ -43,7 +44,7 @@ class DataSetSingleRowJoinRule
}
}
private def isInnerJoin(join: LogicalJoin) = {
private def isInnerJoin(join: FlinkLogicalJoin) = {
join.getJoinType == JoinRelType.INNER
}
......@@ -54,19 +55,19 @@ class DataSetSingleRowJoinRule
private def isSingleRow(node: RelNode): Boolean = {
node match {
case ss: RelSubset => isSingleRow(ss.getOriginal)
case lp: LogicalProject => isSingleRow(lp.getInput)
case lf: LogicalFilter => isSingleRow(lf.getInput)
case lc: LogicalCalc => isSingleRow(lc.getInput)
case la: LogicalAggregate => la.getGroupSet.isEmpty
case lp: Project => isSingleRow(lp.getInput)
case lf: Filter => isSingleRow(lf.getInput)
case lc: Calc => isSingleRow(lc.getInput)
case la: Aggregate => la.getGroupSet.isEmpty
case _ => false
}
}
override def convert(rel: RelNode): RelNode = {
val join = rel.asInstanceOf[LogicalJoin]
val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE)
val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE)
val join = rel.asInstanceOf[FlinkLogicalJoin]
val traitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val dataSetLeftNode = RelOptRule.convert(join.getLeft, FlinkConventions.DATASET)
val dataSetRightNode = RelOptRule.convert(join.getRight, FlinkConventions.DATASET)
val leftIsSingle = isSingleRow(join.getLeft)
new DataSetSingleRowJoin(
......
......@@ -18,24 +18,24 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetSort}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetSort
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort
class DataSetSortRule
extends ConverterRule(
classOf[LogicalSort],
Convention.NONE,
DataSetConvention.INSTANCE,
classOf[FlinkLogicalSort],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetSortRule") {
override def convert(rel: RelNode): RelNode = {
val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE)
val sort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convInput: RelNode = RelOptRule.convert(sort.getInput, FlinkConventions.DATASET)
new DataSetSort(
rel.getCluster,
......
......@@ -18,20 +18,20 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalUnion
import org.apache.calcite.rel.rules.UnionToDistinctRule
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetUnion
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion
class DataSetUnionRule
extends ConverterRule(
classOf[LogicalUnion],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetUnionRule")
{
classOf[FlinkLogicalUnion],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetUnionRule") {
/**
* Only translate UNION ALL.
......@@ -39,16 +39,15 @@ class DataSetUnionRule
* an Aggregate on top of a UNION ALL by [[UnionToDistinctRule]]
*/
override def matches(call: RelOptRuleCall): Boolean = {
val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion]
val union: FlinkLogicalUnion = call.rel(0).asInstanceOf[FlinkLogicalUnion]
union.all
}
def convert(rel: RelNode): RelNode = {
val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE)
val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE)
val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convLeft: RelNode = RelOptRule.convert(union.getInput(0), FlinkConventions.DATASET)
val convRight: RelNode = RelOptRule.convert(union.getInput(1), FlinkConventions.DATASET)
new DataSetUnion(
rel.getCluster,
......
......@@ -18,24 +18,24 @@
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{RelOptRule, RelTraitSet, Convention}
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalValues
import org.apache.flink.table.plan.nodes.dataset.{DataSetValues, DataSetConvention}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetValues
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalValues
class DataSetValuesRule
extends ConverterRule(
classOf[LogicalValues],
Convention.NONE,
DataSetConvention.INSTANCE,
classOf[FlinkLogicalValues],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetValuesRule")
{
def convert(rel: RelNode): RelNode = {
val values: LogicalValues = rel.asInstanceOf[LogicalValues]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
new DataSetValues(
rel.getCluster,
......
......@@ -21,20 +21,21 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTrait
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetWindowAggregate}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWindowAggregate
import scala.collection.JavaConversions._
class DataSetWindowAggregateRule
extends ConverterRule(
classOf[LogicalWindowAggregate],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetWindowAggregateRule") {
classOf[FlinkLogicalWindowAggregate],
FlinkConventions.LOGICAL,
FlinkConventions.DATASET,
"DataSetWindowAggregateRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
val agg: FlinkLogicalWindowAggregate = call.rel(0).asInstanceOf[FlinkLogicalWindowAggregate]
// check if we have distinct aggregates
val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
......@@ -52,9 +53,9 @@ class DataSetWindowAggregateRule
}
override def convert(rel: RelNode): RelNode = {
val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
val agg: FlinkLogicalWindowAggregate = rel.asInstanceOf[FlinkLogicalWindowAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConventions.DATASET)
new DataSetWindowAggregate(
agg.getWindow,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.RelOptRule.{none, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase
import org.apache.flink.table.sources.ProjectableTableSource
/**
* This rule tries to push projections into a BatchTableSourceScan.
*/
class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
operand(classOf[DataSetCalc],
operand(classOf[BatchTableSourceScan], none)),
"PushProjectIntoBatchTableSourceScanRule")
with PushProjectIntoTableSourceScanRuleBase {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
scan.tableSource match {
case _: ProjectableTableSource[_] => true
case _ => false
}
}
override def onMatch(call: RelOptRuleCall) {
val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
pushProjectIntoScan(call, calc, scan)
}
}
object PushProjectIntoBatchTableSourceScanRule {
val INSTANCE: RelOptRule = new PushProjectIntoBatchTableSourceScanRule
}
......@@ -18,24 +18,25 @@
package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWindowAggregate
import scala.collection.JavaConversions._
class DataStreamAggregateRule
extends ConverterRule(
classOf[LogicalWindowAggregate],
Convention.NONE,
DataStreamConvention.INSTANCE,
"DataStreamAggregateRule") {
classOf[FlinkLogicalWindowAggregate],
FlinkConventions.LOGICAL,
FlinkConventions.DATASTREAM,
"DataStreamAggregateRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
val agg: FlinkLogicalWindowAggregate = call.rel(0).asInstanceOf[FlinkLogicalWindowAggregate]
// check if we have distinct aggregates
val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
......@@ -53,9 +54,9 @@ class DataStreamAggregateRule
}
override def convert(rel: RelNode): RelNode = {
val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
val agg: FlinkLogicalWindowAggregate = rel.asInstanceOf[FlinkLogicalWindowAggregate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConventions.DATASTREAM)
new DataStreamAggregate(
agg.getWindow,
......
......@@ -18,25 +18,25 @@
package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalCalc
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamCalc
import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCalc
class DataStreamCalcRule
extends ConverterRule(
classOf[LogicalCalc],
Convention.NONE,
DataStreamConvention.INSTANCE,
classOf[FlinkLogicalCalc],
FlinkConventions.LOGICAL,
FlinkConventions.DATASTREAM,
"DataStreamCalcRule")
{
def convert(rel: RelNode): RelNode = {
val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE)
val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
val convInput: RelNode = RelOptRule.convert(calc.getInput, FlinkConventions.DATASTREAM)
new DataStreamCalc(
rel.getCluster,
......
......@@ -18,45 +18,40 @@
package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
import org.apache.calcite.rex.RexNode
import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
/**
* Rule to convert a LogicalCorrelate into a DataStreamCorrelate.
*/
class DataStreamCorrelateRule
extends ConverterRule(
classOf[LogicalCorrelate],
Convention.NONE,
DataStreamConvention.INSTANCE,
classOf[FlinkLogicalCorrelate],
FlinkConventions.LOGICAL,
FlinkConventions.DATASTREAM,
"DataStreamCorrelateRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
val join: FlinkLogicalCorrelate = call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
val right = join.getRight.asInstanceOf[RelSubset].getOriginal
right match {
// right node is a table function
case scan: LogicalTableFunctionScan => true
case scan: FlinkLogicalTableFunctionScan => true
// a filter is pushed above the table function
case filter: LogicalFilter =>
filter
.getInput.asInstanceOf[RelSubset]
.getOriginal
.isInstanceOf[LogicalTableFunctionScan]
case calc: FlinkLogicalCalc =>
calc.getInput.asInstanceOf[RelSubset]
.getOriginal.isInstanceOf[FlinkLogicalTableFunctionScan]
case _ => false
}
}
override def convert(rel: RelNode): RelNode = {
val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE)
val join: FlinkLogicalCorrelate = rel.asInstanceOf[FlinkLogicalCorrelate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
val convInput: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.DATASTREAM)
val right: RelNode = join.getInput(1)
def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataStreamCorrelate = {
......@@ -64,12 +59,12 @@ class DataStreamCorrelateRule
case rel: RelSubset =>
convertToCorrelate(rel.getRelList.get(0), condition)
case filter: LogicalFilter =>
case calc: FlinkLogicalCalc =>
convertToCorrelate(
filter.getInput.asInstanceOf[RelSubset].getOriginal,
Some(filter.getCondition))
calc.getInput.asInstanceOf[RelSubset].getOriginal,
Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition)))
case scan: LogicalTableFunctionScan =>
case scan: FlinkLogicalTableFunctionScan =>
new DataStreamCorrelate(
rel.getCluster,
traitSet,
......
......@@ -19,28 +19,25 @@
package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalWindow
import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalOverWindow
/**
* Rule to convert a LogicalWindow into a DataStreamOverAggregate.
*/
class DataStreamOverAggregateRule
extends ConverterRule(
classOf[LogicalWindow],
Convention.NONE,
DataStreamConvention.INSTANCE,
classOf[FlinkLogicalOverWindow],
FlinkConventions.LOGICAL,
FlinkConventions.DATASTREAM,
"DataStreamOverAggregateRule") {
override def convert(rel: RelNode): RelNode = {
val logicWindow: LogicalWindow = rel.asInstanceOf[LogicalWindow]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
val logicWindow: FlinkLogicalOverWindow = rel.asInstanceOf[FlinkLogicalOverWindow]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
val convertInput: RelNode =
RelOptRule.convert(logicWindow.getInput, DataStreamConvention.INSTANCE)
RelOptRule.convert(logicWindow.getInput, FlinkConventions.DATASTREAM)
val inputRowType = convertInput.asInstanceOf[RelSubset].getOriginal.getRowType
......
......@@ -18,25 +18,24 @@
package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamScan
import org.apache.flink.table.plan.schema.DataStreamTable
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalNativeTableScan
class DataStreamScanRule
extends ConverterRule(
classOf[LogicalTableScan],
Convention.NONE,
DataStreamConvention.INSTANCE,
classOf[FlinkLogicalNativeTableScan],
FlinkConventions.LOGICAL,
FlinkConventions.DATASTREAM,
"DataStreamScanRule")
{
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
val scan: FlinkLogicalNativeTableScan = call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan]
val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]])
dataSetTable match {
case _: DataStreamTable[Any] =>
......@@ -47,8 +46,8 @@ class DataStreamScanRule
}
def convert(rel: RelNode): RelNode = {
val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
val scan: FlinkLogicalNativeTableScan = rel.asInstanceOf[FlinkLogicalNativeTableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
new DataStreamScan(
rel.getCluster,
......
......@@ -18,26 +18,26 @@
package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalUnion
import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion
class DataStreamUnionRule
extends ConverterRule(
classOf[LogicalUnion],
Convention.NONE,
DataStreamConvention.INSTANCE,
classOf[FlinkLogicalUnion],
FlinkConventions.LOGICAL,
FlinkConventions.DATASTREAM,
"DataStreamUnionRule")
{
def convert(rel: RelNode): RelNode = {
val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataStreamConvention.INSTANCE)
val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataStreamConvention.INSTANCE)
val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
val convLeft: RelNode = RelOptRule.convert(union.getInput(0), FlinkConventions.DATASTREAM)
val convRight: RelNode = RelOptRule.convert(union.getInput(1), FlinkConventions.DATASTREAM)
new DataStreamUnion(
rel.getCluster,
......
......@@ -18,24 +18,24 @@
package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalValues
import org.apache.flink.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamValues
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalValues
class DataStreamValuesRule
extends ConverterRule(
classOf[LogicalValues],
Convention.NONE,
DataStreamConvention.INSTANCE,
classOf[FlinkLogicalValues],
FlinkConventions.LOGICAL,
FlinkConventions.DATASTREAM,
"DataStreamValuesRule")
{
def convert(rel: RelNode): RelNode = {
val values: LogicalValues = rel.asInstanceOf[LogicalValues]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
new DataStreamValues(
rel.getCluster,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase
import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource}
/**
* The rule is responsible for push project into a [[StreamTableSourceScan]]
*/
class PushProjectIntoStreamTableSourceScanRule extends RelOptRule(
operand(classOf[DataStreamCalc],
operand(classOf[StreamTableSourceScan], none())),
"PushProjectIntoStreamTableSourceScanRule")
with PushProjectIntoTableSourceScanRuleBase {
/** Rule must only match if [[StreamTableSource]] targets a [[ProjectableTableSource]] */
override def matches(call: RelOptRuleCall): Boolean = {
val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan]
scan.tableSource match {
case _: ProjectableTableSource[_] => true
case _ => false
}
}
override def onMatch(call: RelOptRuleCall): Unit = {
val calc = call.rel(0).asInstanceOf[DataStreamCalc]
val scan = call.rel(1).asInstanceOf[StreamTableSourceScan]
pushProjectIntoScan(call, calc, scan)
}
}
object PushProjectIntoStreamTableSourceScanRule {
val INSTANCE: RelOptRule = new PushProjectIntoStreamTableSourceScanRule
}
......@@ -18,21 +18,21 @@
package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.flink.table.plan.nodes.datastream.{StreamTableSourceScan, DataStreamConvention}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan
import org.apache.flink.table.sources.StreamTableSource
/** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */
class StreamTableSourceScanRule
extends ConverterRule(
classOf[LogicalTableScan],
Convention.NONE,
DataStreamConvention.INSTANCE,
classOf[FlinkLogicalTableSourceScan],
FlinkConventions.LOGICAL,
FlinkConventions.DATASTREAM,
"StreamTableSourceScanRule")
{
......@@ -54,18 +54,14 @@ class StreamTableSourceScanRule
}
def convert(rel: RelNode): RelNode = {
val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
// The original registered table source
val table = scan.getTable.unwrap(classOf[TableSourceTable[_]])
val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]
val scan: FlinkLogicalTableSourceScan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
new StreamTableSourceScan(
rel.getCluster,
traitSet,
scan.getTable,
tableSource
scan.tableSource.asInstanceOf[StreamTableSource[_]]
)
}
}
......
......@@ -16,11 +16,11 @@
* limitations under the License.
*/
package org.apache.flink.table.plan.rules
package org.apache.flink.table.plan.rules.logical
import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.adapter.enumerable.EnumerableTableScan
import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
import org.apache.calcite.rel.logical.LogicalTableScan
/**
......
......@@ -16,29 +16,50 @@
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.common
package org.apache.flink.table.plan.rules.logical
import java.util
import org.apache.calcite.plan.RelOptRuleCall
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.plan.RelOptRule.{none, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rex.RexProgram
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.plan.nodes.TableSourceScan
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.plan.util.RexProgramExtractor
import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan}
import org.apache.flink.table.sources.FilterableTableSource
import org.apache.flink.table.validate.FunctionCatalog
import org.apache.flink.util.Preconditions
import scala.collection.JavaConverters._
trait PushFilterIntoTableSourceScanRuleBase {
class PushFilterIntoTableSourceScanRule extends RelOptRule(
operand(classOf[FlinkLogicalCalc],
operand(classOf[FlinkLogicalTableSourceScan], none)),
"PushFilterIntoTableSourceScanRule") {
private[flink] def pushFilterIntoScan(
override def matches(call: RelOptRuleCall): Boolean = {
val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
scan.tableSource match {
case source: FilterableTableSource[_] =>
calc.getProgram.getCondition != null && !source.isFilterPushedDown
case _ => false
}
}
override def onMatch(call: RelOptRuleCall): Unit = {
val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]]
pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description)
}
private def pushFilterIntoScan(
call: RelOptRuleCall,
calc: Calc,
scan: TableSourceScan,
calc: FlinkLogicalCalc,
scan: FlinkLogicalTableSourceScan,
tableSourceTable: TableSourceTable[_],
filterableSource: FilterableTableSource[_],
description: String): Unit = {
......@@ -102,3 +123,7 @@ trait PushFilterIntoTableSourceScanRuleBase {
}
}
}
object PushFilterIntoTableSourceScanRule {
val INSTANCE: RelOptRule = new PushFilterIntoTableSourceScanRule
}
......@@ -16,22 +16,31 @@
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.common
package org.apache.flink.table.plan.rules.logical
import org.apache.calcite.plan.RelOptRuleCall
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.plan.RelOptRule.{none, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.plan.nodes.TableSourceScan
import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter}
import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan}
import org.apache.flink.table.sources.{NestedFieldsProjectableTableSource, ProjectableTableSource}
trait PushProjectIntoTableSourceScanRuleBase {
class PushProjectIntoTableSourceScanRule extends RelOptRule(
operand(classOf[FlinkLogicalCalc],
operand(classOf[FlinkLogicalTableSourceScan], none)),
"PushProjectIntoTableSourceScanRule") {
private[flink] def pushProjectIntoScan(
call: RelOptRuleCall,
calc: Calc,
scan: TableSourceScan): Unit = {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
scan.tableSource match {
case _: ProjectableTableSource[_] => true
case _ => false
}
}
override def onMatch(call: RelOptRuleCall) {
val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram)
// if no fields can be projected, we keep the original plan.
......@@ -64,3 +73,7 @@ trait PushProjectIntoTableSourceScanRuleBase {
}
}
}
object PushProjectIntoTableSourceScanRule {
val INSTANCE: RelOptRule = new PushProjectIntoTableSourceScanRule
}
......@@ -443,7 +443,10 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
CalciteConfig cc = new CalciteConfigBuilder().replaceOptRuleSet(RuleSets.ofList()).build();
CalciteConfig cc = new CalciteConfigBuilder()
.replaceLogicalOptRuleSet(RuleSets.ofList())
.replacePhysicalOptRuleSet(RuleSets.ofList())
.build();
tableEnv.getConfig().setCalciteConfig(cc);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
......
......@@ -37,8 +37,11 @@ class CalciteConfigBuilderTest {
assertFalse(cc.replacesNormRuleSet)
assertFalse(cc.getNormRuleSet.isDefined)
assertFalse(cc.replacesOptRuleSet)
assertFalse(cc.getOptRuleSet.isDefined)
assertFalse(cc.replacesLogicalOptRuleSet)
assertFalse(cc.getLogicalOptRuleSet.isDefined)
assertFalse(cc.replacesPhysicalOptRuleSet)
assertFalse(cc.getPhysicalOptRuleSet.isDefined)
assertFalse(cc.replacesDecoRuleSet)
assertFalse(cc.getDecoRuleSet.isDefined)
......@@ -48,16 +51,20 @@ class CalciteConfigBuilderTest {
def testRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
.addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
.replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
.build()
.addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
.replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
.build()
assertFalse(cc.replacesNormRuleSet)
assertTrue(cc.getNormRuleSet.isDefined)
assertTrue(cc.replacesOptRuleSet)
assertTrue(cc.getOptRuleSet.isDefined)
assertTrue(cc.replacesLogicalOptRuleSet)
assertTrue(cc.getLogicalOptRuleSet.isDefined)
assertTrue(cc.replacesPhysicalOptRuleSet)
assertTrue(cc.getPhysicalOptRuleSet.isDefined)
assertTrue(cc.replacesDecoRuleSet)
assertTrue(cc.getDecoRuleSet.isDefined)
......@@ -126,30 +133,30 @@ class CalciteConfigBuilderTest {
}
@Test
def testReplaceOptimizationRules(): Unit = {
def testReplaceLogicalOptimizationRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
.replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.build()
.replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.build()
assertEquals(true, cc.replacesOptRuleSet)
assertTrue(cc.getOptRuleSet.isDefined)
val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet
assertEquals(true, cc.replacesLogicalOptRuleSet)
assertTrue(cc.getLogicalOptRuleSet.isDefined)
val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet
assertEquals(1, cSet.size)
assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
}
@Test
def testReplaceOptimizationAddRules(): Unit = {
def testReplaceLogicalOptimizationAddRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
.replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.addOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
.build()
.replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.addLogicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
.build()
assertEquals(true, cc.replacesOptRuleSet)
assertTrue(cc.getOptRuleSet.isDefined)
val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet
assertEquals(true, cc.replacesLogicalOptRuleSet)
assertTrue(cc.getLogicalOptRuleSet.isDefined)
val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet
assertEquals(3, cSet.size)
assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
......@@ -157,30 +164,64 @@ class CalciteConfigBuilderTest {
}
@Test
def testAddOptimizationRules(): Unit = {
def testAddLogicalOptimizationRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
.addOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.build()
.addLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.addLogicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
.build()
assertEquals(false, cc.replacesOptRuleSet)
assertTrue(cc.getOptRuleSet.isDefined)
val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet
assertEquals(false, cc.replacesLogicalOptRuleSet)
assertTrue(cc.getLogicalOptRuleSet.isDefined)
val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet
assertEquals(3, cSet.size)
assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
}
@Test
def testReplacePhysicalOptimizationRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
.replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.build()
assertEquals(true, cc.replacesPhysicalOptRuleSet)
assertTrue(cc.getPhysicalOptRuleSet.isDefined)
val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet
assertEquals(1, cSet.size)
assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
}
@Test
def testAddAddOptimizationRules(): Unit = {
def testReplacePhysicalOptimizationAddRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
.addOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.addOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
.build()
.replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.addPhysicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
.build()
assertEquals(true, cc.replacesPhysicalOptRuleSet)
assertTrue(cc.getPhysicalOptRuleSet.isDefined)
val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet
assertEquals(3, cSet.size)
assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
}
@Test
def testAddPhysicalOptimizationRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
.addPhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
.addPhysicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
.build()
assertEquals(false, cc.replacesOptRuleSet)
assertTrue(cc.getOptRuleSet.isDefined)
val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet
assertEquals(false, cc.replacesPhysicalOptRuleSet)
assertTrue(cc.getPhysicalOptRuleSet.isDefined)
val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet
assertEquals(3, cSet.size)
assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
......
......@@ -116,7 +116,7 @@ class ExternalCatalogTest extends TableTestBase {
sourceStreamTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
),
term("union", "_c0", "e", "_c2")
term("union all", "_c0", "e", "_c2")
)
util.verifyTable(result, expected)
......@@ -143,7 +143,7 @@ class ExternalCatalogTest extends TableTestBase {
sourceStreamTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS EXPR$0", "b", "c")
),
term("union", "EXPR$0", "e", "g"))
term("union all", "EXPR$0", "e", "g"))
util.verifySql(sqlQuery, expected)
}
......
......@@ -333,13 +333,14 @@ class TableEnvironmentTest extends TableTestBase {
val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table")
val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
s"UNION ALL SELECT a, b, c FROM $table")
val expected2 = binaryNode(
"DataStreamUnion",
streamTableNode(1),
streamTableNode(0),
term("union", "d, e, f"))
term("union all", "d, e, f"))
util.verifyTable(sqlTable2, expected2)
}
......
......@@ -516,7 +516,7 @@ class WindowAggregateTest extends TableTestBase {
)
streamUtil.verifySql(sql, expected)
}
@Test
def testBoundPartitionedProcTimeWindowWithRowRange() = {
val sql = "SELECT " +
......
......@@ -18,11 +18,11 @@
package org.apache.flink.table.expressions.utils
import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
import java.util
import java.util.concurrent.Future
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql2rel.RelDecorrelator
......@@ -43,7 +43,8 @@ import org.apache.flink.table.calcite.FlinkPlannerImpl
import org.apache.flink.table.codegen.{CodeGenerator, Compiler, GeneratedFunction}
import org.apache.flink.table.expressions.{Expression, ExpressionParser}
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetCalc
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.types.Row
import org.junit.Assert._
......@@ -66,7 +67,8 @@ abstract class ExpressionTestBase {
context._2.getFrameworkConfig,
context._2.getPlanner,
context._2.getTypeFactory)
private val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
private val logicalOptProgram = Programs.ofRules(FlinkRuleSets.LOGICAL_OPT_RULES)
private val dataSetOptProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
private def hepPlanner = {
val builder = new HepProgramBuilder
......@@ -194,9 +196,14 @@ abstract class ExpressionTestBase {
decorPlan
}
// create DataSetCalc
val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
val dataSetCalc = optProgram.run(context._2.getPlanner, normalizedPlan, flinkOutputProps,
// convert to logical plan
val logicalProps = converted.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
val logicalCalc = logicalOptProgram.run(context._2.getPlanner, normalizedPlan, logicalProps,
ImmutableList.of(), ImmutableList.of())
// convert to dataset plan
val physicalProps = converted.getTraitSet.replace(FlinkConventions.DATASET).simplify()
val dataSetCalc = dataSetOptProgram.run(context._2.getPlanner, logicalCalc, physicalProps,
ImmutableList.of(), ImmutableList.of())
// extract RexNode
......@@ -219,8 +226,15 @@ abstract class ExpressionTestBase {
// create DataSetCalc
val decorPlan = RelDecorrelator.decorrelateQuery(converted)
val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, flinkOutputProps,
// convert to logical plan
val flinkLogicalProps = converted.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
val logicalCalc = logicalOptProgram.run(context._2.getPlanner, decorPlan, flinkLogicalProps,
ImmutableList.of(), ImmutableList.of())
// convert to dataset plan
val flinkPhysicalProps = converted.getTraitSet.replace(FlinkConventions.DATASET).simplify()
val dataSetCalc = dataSetOptProgram.run(context._2.getPlanner, logicalCalc, flinkPhysicalProps,
ImmutableList.of(), ImmutableList.of())
// extract RexNode
......
......@@ -29,14 +29,15 @@ import org.junit.Test
class NormalizationRulesTest extends TableTestBase {
@Test
def testApplyNormalizationRuleForForBatchSQL(): Unit = {
def testApplyNormalizationRuleForBatchSQL(): Unit = {
val util = batchTestUtil()
// rewrite distinct aggregate
val cc: CalciteConfig = new CalciteConfigBuilder()
.replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
.replaceOptRuleSet(RuleSets.ofList())
.build()
.replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
.replaceLogicalOptRuleSet(RuleSets.ofList())
.replacePhysicalOptRuleSet(RuleSets.ofList())
.build()
util.tEnv.getConfig.setCalciteConfig(cc)
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
......@@ -62,14 +63,15 @@ class NormalizationRulesTest extends TableTestBase {
}
@Test
def testApplyNormalizationRuleForForStreamSQL(): Unit = {
def testApplyNormalizationRuleForStreamSQL(): Unit = {
val util = streamTestUtil()
// rewrite distinct aggregate
val cc: CalciteConfig = new CalciteConfigBuilder()
.replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
.replaceOptRuleSet(RuleSets.ofList())
.build()
.replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
.replaceLogicalOptRuleSet(RuleSets.ofList())
.replacePhysicalOptRuleSet(RuleSets.ofList())
.build()
util.tEnv.getConfig.setCalciteConfig(cc)
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
......
......@@ -35,5 +35,5 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
override protected def getBuiltInNormRuleSet: RuleSet = ???
override protected def getBuiltInOptRuleSet: RuleSet = ???
override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
}
......@@ -4,7 +4,7 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataStreamTable_1]])
== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
DataStreamUnion(union all=[count, word])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册