diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md index 86c739ff6472d0d862b3a21b1644e184f61ce55e..907cb15ac1013f7ab4bd01375d71c9a3d43e1e8c 100644 --- a/docs/dev/table/common.md +++ b/docs/dev/table/common.md @@ -1749,11 +1749,11 @@ the result of multiple-sinks plan is {% highlight text %} == Abstract Syntax Tree == -LogicalSink(name=[MySink1], fields=[count, word]) +LogicalLegacySink(name=[MySink1], fields=[count, word]) +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]]) -LogicalSink(name=[MySink2], fields=[count, word]) +LogicalLegacySink(name=[MySink2], fields=[count, word]) +- LogicalUnion(all=[true]) :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) : +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]]) @@ -1763,10 +1763,10 @@ LogicalSink(name=[MySink2], fields=[count, word]) Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1]) +- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word]) -Sink(name=[MySink1], fields=[count, word]) +LegacySink(name=[MySink1], fields=[count, word]) +- Reused(reference_id=[1]) -Sink(name=[MySink2], fields=[count, word]) +LegacySink(name=[MySink2], fields=[count, word]) +- Union(all=[true], union=[count, word]) :- Reused(reference_id=[1]) +- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word]) diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md index a918d6c5ffa3b911e461095ce802eff78ae38f88..bd36b1e1af2d43eb3759fee3da7b91f92e4bbb52 100644 --- a/docs/dev/table/common.zh.md +++ b/docs/dev/table/common.zh.md @@ -1730,11 +1730,11 @@ print(explanation) {% highlight text %} == Abstract Syntax Tree == -LogicalSink(name=[MySink1], fields=[count, word]) +LogicalLegacySink(name=[MySink1], fields=[count, word]) +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]]) -LogicalSink(name=[MySink2], fields=[count, word]) +LogicalLegacySink(name=[MySink2], fields=[count, word]) +- LogicalUnion(all=[true]) :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) : +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]]) @@ -1744,10 +1744,10 @@ LogicalSink(name=[MySink2], fields=[count, word]) Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1]) +- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word]) -Sink(name=[MySink1], fields=[count, word]) +LegacySink(name=[MySink1], fields=[count, word]) +- Reused(reference_id=[1]) -Sink(name=[MySink2], fields=[count, word]) +LegacySink(name=[MySink2], fields=[count, word]) +- Union(all=[true], union=[count, word]) :- Reused(reference_id=[1]) +- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word]) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala index fb42eab8beea060471798c9f804349dc8a06d1f8..72436722e85915e3c6998e66bab4040e3fb35585 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala @@ -18,11 +18,10 @@ package org.apache.flink.table.planner.calcite -import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory, SinkFactory} +import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory} import org.apache.flink.table.planner.plan.nodes.logical._ import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType} -import org.apache.flink.table.sinks.TableSink import com.google.common.collect.ImmutableList import org.apache.calcite.plan.RelOptTable.ToRelContext @@ -60,7 +59,6 @@ object FlinkLogicalRelFactories { val FLINK_LOGICAL_TABLE_SCAN_FACTORY = new TableScanFactoryImpl val FLINK_LOGICAL_EXPAND_FACTORY = new ExpandFactoryImpl val FLINK_LOGICAL_RANK_FACTORY = new RankFactoryImpl - val FLINK_LOGICAL_SINK_FACTORY = new SinkFactoryImpl /** A [[RelBuilderFactory]] that creates a [[RelBuilder]] that will * create logical relational expressions for everything. */ @@ -75,8 +73,7 @@ object FlinkLogicalRelFactories { FLINK_LOGICAL_VALUES_FACTORY, FLINK_LOGICAL_TABLE_SCAN_FACTORY, FLINK_LOGICAL_EXPAND_FACTORY, - FLINK_LOGICAL_RANK_FACTORY, - FLINK_LOGICAL_SINK_FACTORY)) + FLINK_LOGICAL_RANK_FACTORY)) /** * Implementation of [[ProjectFactory]] that returns a [[FlinkLogicalCalc]]. @@ -261,17 +258,4 @@ object FlinkLogicalRelFactories { rankNumberType, outputRankNumber) } } - - /** - * Implementation of [[FlinkRelFactories.SinkFactory]] that returns a [[FlinkLogicalSink]]. - */ - class SinkFactoryImpl extends SinkFactory { - def createSink( - input: RelNode, - sink: TableSink[_], - sinkName: String): RelNode = { - FlinkLogicalSink.create(input, sink, sinkName) - } - } - } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala index 3b8eb0ea9ff1bd5fa77b91d2051edb56441938bc..04442e870af1498de2a15f9b422546f740392254 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala @@ -20,14 +20,13 @@ package org.apache.flink.table.planner.calcite import org.apache.flink.table.operations.QueryOperation import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty -import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory, SinkFactory} +import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory} import org.apache.flink.table.planner.expressions.{PlannerWindowProperty, WindowProperty} import org.apache.flink.table.planner.plan.QueryOperationConverter import org.apache.flink.table.planner.plan.logical.LogicalWindow import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate, LogicalWatermarkAssigner, LogicalWindowAggregate, LogicalWindowTableAggregate} import org.apache.flink.table.planner.plan.utils.AggregateUtil import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType} -import org.apache.flink.table.sinks.TableSink import org.apache.calcite.plan._ import org.apache.calcite.rel.RelCollation @@ -71,10 +70,6 @@ class FlinkRelBuilder( Util.first(context.unwrap(classOf[RankFactory]), FlinkRelFactories.DEFAULT_RANK_FACTORY) } - private val sinkFactory: SinkFactory = { - Util.first(context.unwrap(classOf[SinkFactory]), FlinkRelFactories.DEFAULT_SINK_FACTORY) - } - override def getRelOptSchema: RelOptSchema = relOptSchema override def getCluster: RelOptCluster = relOptCluster @@ -91,12 +86,6 @@ class FlinkRelBuilder( push(expand) } - def sink(sink: TableSink[_], sinkName: String): RelBuilder = { - val input = build() - val sinkNode = sinkFactory.createSink(input, sink, sinkName) - push(sinkNode) - } - def rank( partitionKey: ImmutableBitSet, orderKey: RelCollation, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala index 1f002b2758577e400af29625a1bf5c5eae0578b9..62a1cd6bf21475707b311dbf125b8905edeb9fb9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala @@ -18,9 +18,8 @@ package org.apache.flink.table.planner.calcite -import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalExpand, LogicalRank, LogicalSink} +import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalExpand, LogicalRank} import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType} -import org.apache.flink.table.sinks.TableSink import org.apache.calcite.plan.Contexts import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} @@ -56,8 +55,6 @@ object FlinkRelFactories { val DEFAULT_RANK_FACTORY = new RankFactoryImpl - val DEFAULT_SINK_FACTORY = new SinkFactoryImpl - /** * Can create a [[LogicalExpand]] of the * appropriate type for this rule's calling convention. @@ -112,27 +109,4 @@ object FlinkRelFactories { rankNumberType, outputRankNumber) } } - - /** - * Can create a [[LogicalSink]] of the - * appropriate type for this rule's calling convention. - */ - trait SinkFactory { - - def createSink( - input: RelNode, - sink: TableSink[_], - sinkName: String): RelNode - } - - /** - * Implementation of [[SinkFactory]] that returns a [[LogicalSink]]. - */ - class SinkFactoryImpl extends SinkFactory { - - def createSink( - input: RelNode, - sink: TableSink[_], - sinkName: String): RelNode = LogicalSink.create(input, sink, sinkName) - } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala index b6bec80dbff0b67f5efa38b85da2799a82bf2343..baeff6b65bef86c19a9b011943ae951dec41de8c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala @@ -163,7 +163,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val input = snapshot.getInput.accept(this) snapshot.copy(snapshot.getTraitSet, input, snapshot.getPeriod) - case sink: LogicalSink => + case sink: LogicalLegacySink => var newInput = sink.getInput.accept(this) var needsConversion = false @@ -181,7 +181,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { if (needsConversion) { newInput = LogicalProject.create(newInput, projects, newInput.getRowType.getFieldNames) } - new LogicalSink( + new LogicalLegacySink( sink.getCluster, sink.getTraitSet, newInput, @@ -482,7 +482,7 @@ object RelTimeIndicatorConverter { val convertedRoot = rootRel.accept(converter) // the LogicalSink is converted in RelTimeIndicatorConverter before - if (rootRel.isInstanceOf[LogicalSink] || !needFinalTimeIndicatorConversion) { + if (rootRel.isInstanceOf[LogicalLegacySink] || !needFinalTimeIndicatorConversion) { return convertedRoot } var needsConversion = false diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index d87a670be84385d2780d93d091923e3bb4b594fe..8927987fcdc8beb627c4ee8edb8556ec1a66a55a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -33,7 +33,7 @@ import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl, import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl import org.apache.flink.table.planner.hint.FlinkHints -import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink import org.apache.flink.table.planner.plan.nodes.exec.ExecNode import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel import org.apache.flink.table.planner.plan.optimize.Optimizer @@ -178,7 +178,7 @@ abstract class PlannerBase( val sinkSchema = s.getSink.getTableSchema // validate query schema and sink schema, and apply cast if possible val query = validateSchemaAndApplyImplicitCast(input, sinkSchema, getTypeFactory) - LogicalSink.create( + LogicalLegacySink.create( query, s.getSink, "UnregisteredSink", @@ -201,7 +201,7 @@ abstract class PlannerBase( TableSchemaUtils.getPhysicalSchema(table.getSchema), getTypeFactory, Some(catalogSink.getTableIdentifier.asSummaryString())) - LogicalSink.create( + LogicalLegacySink.create( query, sink, identifier.toString, @@ -233,7 +233,7 @@ abstract class PlannerBase( typeInfo, needUpdateBefore, withChangeFlag) - LogicalSink.create( + LogicalLegacySink.create( query, tableSink, "DataStreamTableSink", diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LegacySink.scala similarity index 98% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LegacySink.scala index 6e989f8194e2bee8295b49d76faeed93ffbee152..8326370fa82e399691e27be9fd57ef19402b1281 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LegacySink.scala @@ -35,7 +35,7 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} * @param sink Table sink to write into * @param sinkName Name of tableSink, which is not required property, that is, it could be null */ -abstract class Sink( +abstract class LegacySink( cluster: RelOptCluster, traitSet: RelTraitSet, input: RelNode, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalLegacySink.scala similarity index 85% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalLegacySink.scala index 96afefb964ae9943d6fc9cd7257a0a6faba49933..c185eb683e7a2fd5c783d730d11d21225aa54ecf 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalLegacySink.scala @@ -29,11 +29,11 @@ import java.util import scala.collection.JavaConversions._ /** - * Sub-class of [[Sink]] that is a relational expression + * Sub-class of [[LegacySink]] that is a relational expression * which writes out data of input node into a [[TableSink]]. * This class corresponds to Calcite logical rel. */ -final class LogicalSink( +final class LogicalLegacySink( cluster: RelOptCluster, traitSet: RelTraitSet, input: RelNode, @@ -41,24 +41,24 @@ final class LogicalSink( sinkName: String, val catalogTable: CatalogTable, val staticPartitions: Map[String, String]) - extends Sink(cluster, traitSet, input, sink, sinkName) { + extends LegacySink(cluster, traitSet, input, sink, sinkName) { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new LogicalSink( + new LogicalLegacySink( cluster, traitSet, inputs.head, sink, sinkName, catalogTable, staticPartitions) } } -object LogicalSink { +object LogicalLegacySink { def create(input: RelNode, sink: TableSink[_], sinkName: String, catalogTable: CatalogTable = null, - staticPartitions: Map[String, String] = Map()): LogicalSink = { + staticPartitions: Map[String, String] = Map()): LogicalLegacySink = { val traits = input.getCluster.traitSetOf(Convention.NONE) - new LogicalSink( + new LogicalLegacySink( input.getCluster, traits, input, sink, sinkName, catalogTable, staticPartitions) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala similarity index 80% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala index e12428468fc159463a5a8dbc4c4d3cebec93ce6d..ffe360285a14111e93ec6c5e141679f41286a2f4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacySink.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.logical import org.apache.flink.table.catalog.CatalogTable import org.apache.flink.table.planner.plan.nodes.FlinkConventions -import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalSink, Sink} +import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalLegacySink, LegacySink} import org.apache.flink.table.sinks.TableSink import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet} @@ -32,10 +32,10 @@ import java.util import scala.collection.JavaConversions._ /** - * Sub-class of [[Sink]] that is a relational expression + * Sub-class of [[LegacySink]] that is a relational expression * which writes out data of input node into a [[TableSink]]. */ -class FlinkLogicalSink( +class FlinkLogicalLegacySink( cluster: RelOptCluster, traitSet: RelTraitSet, input: RelNode, @@ -43,27 +43,27 @@ class FlinkLogicalSink( sinkName: String, val catalogTable: CatalogTable, val staticPartitions: Map[String, String]) - extends Sink(cluster, traitSet, input, sink, sinkName) - with FlinkLogicalRel { + extends LegacySink(cluster, traitSet, input, sink, sinkName) + with FlinkLogicalRel { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new FlinkLogicalSink( + new FlinkLogicalLegacySink( cluster, traitSet, inputs.head, sink, sinkName, catalogTable, staticPartitions) } } -private class FlinkLogicalSinkConverter +private class FlinkLogicalLegacySinkConverter extends ConverterRule( - classOf[LogicalSink], + classOf[LogicalLegacySink], Convention.NONE, FlinkConventions.LOGICAL, - "FlinkLogicalSinkConverter") { + "FlinkLogicalLegacySinkConverter") { override def convert(rel: RelNode): RelNode = { - val sink = rel.asInstanceOf[LogicalSink] + val sink = rel.asInstanceOf[LogicalLegacySink] val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL) - FlinkLogicalSink.create( + FlinkLogicalLegacySink.create( newInput, sink.sink, sink.sinkName, @@ -72,18 +72,18 @@ private class FlinkLogicalSinkConverter } } -object FlinkLogicalSink { - val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter() +object FlinkLogicalLegacySink { + val CONVERTER: ConverterRule = new FlinkLogicalLegacySinkConverter() def create( input: RelNode, sink: TableSink[_], sinkName: String, catalogTable: CatalogTable = null, - staticPartitions: Map[String, String] = Map()): FlinkLogicalSink = { + staticPartitions: Map[String, String] = Map()): FlinkLogicalLegacySink = { val cluster = input.getCluster val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify() - new FlinkLogicalSink( + new FlinkLogicalLegacySink( cluster, traitSet, input, sink, sinkName, catalogTable, staticPartitions) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLegacySink.scala similarity index 95% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLegacySink.scala index 16f28c2ed59e41ac615344e94eac896feacefd7a..e985c8cb12b0f753122c5a123cb2f6b87fec0a11 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLegacySink.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.codegen.SinkCodeGenerator._ import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext} import org.apache.flink.table.planner.delegation.BatchPlanner -import org.apache.flink.table.planner.plan.nodes.calcite.Sink +import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode} import org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker import org.apache.flink.table.planner.sinks.DataStreamTableSink @@ -46,18 +46,18 @@ import scala.collection.JavaConversions._ /** * Batch physical RelNode to to write data into an external sink defined by a [[TableSink]]. */ -class BatchExecSink[T]( +class BatchExecLegacySink[T]( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, sink: TableSink[T], sinkName: String) - extends Sink(cluster, traitSet, inputRel, sink, sinkName) - with BatchPhysicalRel - with BatchExecNode[Any] { + extends LegacySink(cluster, traitSet, inputRel, sink, sinkName) + with BatchPhysicalRel + with BatchExecNode[Any] { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchExecSink(cluster, traitSet, inputs.get(0), sink, sinkName) + new BatchExecLegacySink(cluster, traitSet, inputs.get(0), sink, sinkName) } //~ ExecNode methods ----------------------------------------------------------- diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala similarity index 96% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala index e40c2250db2736c217621be044eb74615ed3d992..f8d5006a52a097d2b2654504846bd2092b931e82 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext} import org.apache.flink.table.planner.delegation.StreamPlanner -import org.apache.flink.table.planner.plan.nodes.calcite.Sink +import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode} import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, UpdatingPlanChecker} import org.apache.flink.table.planner.sinks.DataStreamTableSink @@ -45,20 +45,20 @@ import scala.collection.JavaConversions._ /** * Stream physical RelNode to to write data into an external sink defined by a [[TableSink]]. */ -class StreamExecSink[T]( +class StreamExecLegacySink[T]( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, sink: TableSink[T], sinkName: String) - extends Sink(cluster, traitSet, inputRel, sink, sinkName) - with StreamPhysicalRel - with StreamExecNode[Any] { + extends LegacySink(cluster, traitSet, inputRel, sink, sinkName) + with StreamPhysicalRel + with StreamExecNode[Any] { override def requireWatermark: Boolean = false override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new StreamExecSink(cluster, traitSet, inputs.get(0), sink, sinkName) + new StreamExecLegacySink(cluster, traitSet, inputs.get(0), sink, sinkName) } //~ ExecNode methods ----------------------------------------------------------- diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala index 0b80bdd8dd18dee48d1a46bfc054faa4ed264d21..20fd8b2a95567c7cde8914b33a6912d4ebd7db9a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala @@ -22,7 +22,7 @@ import org.apache.flink.table.api.TableConfig import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} import org.apache.flink.table.planner.calcite.{FlinkContext, SqlExprToRexConverterFactory} import org.apache.flink.table.planner.delegation.BatchPlanner -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLegacySink import org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContext, FlinkBatchProgram} import org.apache.flink.table.planner.plan.schema.IntermediateRelTable import org.apache.flink.table.planner.utils.TableConfigUtils @@ -57,7 +57,7 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner) val optimizedTree = optimizeTree(originTree) optimizedTree match { - case _: BatchExecSink[_] => // ignore + case _: BatchExecLegacySink[_] => // ignore case _ => val name = createUniqueIntermediateRelTableName val intermediateRelTable = new IntermediateRelTable(Collections.singletonList(name), diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala index a692425687fdd16540456912785ff0d7645c986d..4bc20f869d6624f0d48158ef8bd81ab7c5fc326e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala @@ -23,7 +23,7 @@ import org.apache.flink.configuration.ConfigOption import org.apache.flink.configuration.ConfigOptions.key import org.apache.flink.table.api.TableConfig import org.apache.flink.table.planner.plan.`trait`.MiniBatchInterval -import org.apache.flink.table.planner.plan.nodes.calcite.Sink +import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink import org.apache.flink.table.planner.plan.reuse.SubplanReuser.{SubplanReuseContext, SubplanReuseShuttle} import org.apache.flink.table.planner.plan.rules.logical.WindowPropertiesRules import org.apache.flink.table.planner.plan.utils.{DefaultRelShuttle, ExpandTableScanShuttle} @@ -43,7 +43,7 @@ import scala.collection.mutable /** * A [[RelNodeBlock]] is a sub-tree in the [[RelNode]] DAG, and represents common sub-graph * in [[CommonSubGraphBasedOptimizer]]. All [[RelNode]]s in each block have - * only one [[Sink]] output. + * only one [[LegacySink]] output. * * The algorithm works as follows: * 1. If there is only one tree, the whole tree is in one block. (the next steps is needless.) @@ -91,7 +91,7 @@ import scala.collection.mutable * }}} * * This [[RelNode]] DAG will be decomposed into three [[RelNodeBlock]]s, the break-point - * is the [[RelNode]](`Join(a1=b2)`) which data outputs to multiple [[Sink]]s. + * is the [[RelNode]](`Join(a1=b2)`) which data outputs to multiple [[LegacySink]]s. *
Notes: Although `Project(a,b,c)` has two parents (outputs), * they eventually merged at `Join(a1=b2)`. So `Project(a,b,c)` is not a break-point. *
the first [[RelNodeBlock]] includes TableScan, Project(a,b,c), Filter(a>0),
@@ -111,8 +111,8 @@ import scala.collection.mutable
* will be wrapped as an IntermediateRelTable first, and then be converted to a new TableScan
* which is the new output node of current block and is also the input of its parent blocks.
*
- * @param outputNode A RelNode of the output in the block, which could be a [[Sink]] or
- * other RelNode which data outputs to multiple [[Sink]]s.
+ * @param outputNode A RelNode of the output in the block, which could be a [[LegacySink]] or
+ * other RelNode which data outputs to multiple [[LegacySink]]s.
*/
class RelNodeBlock(val outputNode: RelNode) {
// child (or input) blocks
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index aa52ca586067c716e136c15b250af11ca7ca87ae..9f31f3ba742d3c9cfa8a5ef274d0236c8510729c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.planner.calcite.{FlinkContext, SqlExprToRexConvert
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode, ModifyKindSet, ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef}
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-import org.apache.flink.table.planner.plan.nodes.calcite.Sink
+import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecIntermediateTableScan, StreamPhysicalRel}
import org.apache.flink.table.planner.plan.optimize.program.{FlinkStreamProgram, StreamOptimizeContext}
import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
@@ -109,7 +109,7 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
val blockLogicalPlan = block.getPlan
blockLogicalPlan match {
- case s: Sink =>
+ case s: LegacySink =>
require(isSinkBlock)
val optimizedTree = optimizeTree(
s,
@@ -210,7 +210,7 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
val blockLogicalPlan = block.getPlan
blockLogicalPlan match {
- case n: Sink =>
+ case n: LegacySink =>
require(isSinkBlock)
val optimizedPlan = optimizeTree(
n, updateBeforeRequired, miniBatchInterval, isSinkBlock = true)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 57b56a1e27a1c510b04a878e015296918697383b..dff27837f9f3231a0c830e020746dc516c869aa1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -112,7 +112,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
rel: StreamPhysicalRel,
requiredTrait: ModifyKindSetTrait,
requester: String): StreamPhysicalRel = rel match {
- case sink: StreamExecSink[_] =>
+ case sink: StreamExecLegacySink[_] =>
val (sinkRequiredTrait, name) = sink.sink match {
case _: UpsertStreamTableSink[_] =>
(ModifyKindSetTrait.ALL_CHANGES, "UpsertStreamTableSink")
@@ -370,7 +370,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
def visit(
rel: StreamPhysicalRel,
requiredTrait: UpdateKindTrait): Option[StreamPhysicalRel] = rel match {
- case sink: StreamExecSink[_] =>
+ case sink: StreamExecLegacySink[_] =>
val childModifyKindSet = getModifyKindSet(sink.getInput)
val onlyAfter = onlyAfterOrNone(childModifyKindSet)
val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
index 79a12e24eab63bfc18a524c2663d053ea1043f20..87e06e9b5329859489ba7f87d26c1c98e7bc869c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.reuse
import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.planner.plan.nodes.calcite.Sink
+import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacyTableSourceScan
import org.apache.flink.table.planner.plan.nodes.physical.PhysicalLegacyTableSourceScan
import org.apache.flink.table.planner.plan.utils.{DefaultRelShuttle, FlinkRelOptUtil}
@@ -157,7 +157,7 @@ object SubplanReuser {
// Exchange node can not be reused if its input is reusable disabled
case e: Exchange => isNodeReusableDisabled(e.getInput)
// TableFunctionScan and sink can not be reused
- case _: TableFunctionScan | _: Sink => true
+ case _: TableFunctionScan | _: LegacySink => true
case _ => false
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index c480a8cc8ef4e41f264dccae3ee8af47169050b9..7bb9273cdc4c4b296fc473f51231e6482a91a545 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -335,7 +335,7 @@ object FlinkBatchRuleSets {
FlinkLogicalRank.CONVERTER,
FlinkLogicalWindowAggregate.CONVERTER,
FlinkLogicalSnapshot.CONVERTER,
- FlinkLogicalSink.CONVERTER
+ FlinkLogicalLegacySink.CONVERTER
)
/**
@@ -423,7 +423,7 @@ object FlinkBatchRuleSets {
BatchExecCorrelateRule.INSTANCE,
BatchExecPythonCorrelateRule.INSTANCE,
// sink
- BatchExecSinkRule.INSTANCE
+ BatchExecLegacySinkRule.INSTANCE
)
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 04da7bdbb2f17fdb6d1bcecaf40558694c08a690..296d3ffb69d8893c9958c725fccf3f21895b178b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -310,7 +310,7 @@ object FlinkStreamRuleSets {
FlinkLogicalWindowTableAggregate.CONVERTER,
FlinkLogicalSnapshot.CONVERTER,
FlinkLogicalMatch.CONVERTER,
- FlinkLogicalSink.CONVERTER
+ FlinkLogicalLegacySink.CONVERTER
)
/**
@@ -406,7 +406,7 @@ object FlinkStreamRuleSets {
StreamExecCorrelateRule.INSTANCE,
StreamExecPythonCorrelateRule.INSTANCE,
// sink
- StreamExecSinkRule.INSTANCE
+ StreamExecLegacySinkRule.INSTANCE
)
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecLegacySinkRule.scala
similarity index 91%
rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSinkRule.scala
rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecLegacySinkRule.scala
index 294b7cb1fe5d95eb9f12d91a061ee0ab2d9aa256..0339e41d041f5b6f07715ce54b48cea0dd7affe1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSinkRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecLegacySinkRule.scala
@@ -22,8 +22,8 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.filesystem.FileSystemTableFactory
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSink
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacySink
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLegacySink
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.sinks.PartitionableTableSink
@@ -33,14 +33,14 @@ import org.apache.calcite.rel.{RelCollations, RelNode}
import scala.collection.JavaConversions._
-class BatchExecSinkRule extends ConverterRule(
- classOf[FlinkLogicalSink],
+class BatchExecLegacySinkRule extends ConverterRule(
+ classOf[FlinkLogicalLegacySink],
FlinkConventions.LOGICAL,
FlinkConventions.BATCH_PHYSICAL,
- "BatchExecSinkRule") {
+ "BatchExecLegacySinkRule") {
def convert(rel: RelNode): RelNode = {
- val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
+ val sinkNode = rel.asInstanceOf[FlinkLogicalLegacySink]
val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
var requiredTraitSet = sinkNode.getInput.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
if (sinkNode.catalogTable != null && sinkNode.catalogTable.isPartitioned) {
@@ -78,7 +78,7 @@ class BatchExecSinkRule extends ConverterRule(
val newInput = RelOptRule.convert(sinkNode.getInput, requiredTraitSet)
- new BatchExecSink(
+ new BatchExecLegacySink(
rel.getCluster,
newTrait,
newInput,
@@ -87,8 +87,8 @@ class BatchExecSinkRule extends ConverterRule(
}
}
-object BatchExecSinkRule {
+object BatchExecLegacySinkRule {
- val INSTANCE: RelOptRule = new BatchExecSinkRule
+ val INSTANCE: RelOptRule = new BatchExecLegacySinkRule
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecLegacySinkRule.scala
similarity index 90%
rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala
rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecLegacySinkRule.scala
index ed1d7d445b13c643ea7f432354b5a86baa79ab54..f4e23b347b9954d9080579bbb9d7b874de3d5c0b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecLegacySinkRule.scala
@@ -22,8 +22,8 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.filesystem.FileSystemTableFactory
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSink
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacySink
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink
import org.apache.flink.table.sinks.PartitionableTableSink
import org.apache.calcite.plan.RelOptRule
@@ -32,14 +32,14 @@ import org.apache.calcite.rel.convert.ConverterRule
import scala.collection.JavaConversions._
-class StreamExecSinkRule extends ConverterRule(
- classOf[FlinkLogicalSink],
+class StreamExecLegacySinkRule extends ConverterRule(
+ classOf[FlinkLogicalLegacySink],
FlinkConventions.LOGICAL,
FlinkConventions.STREAM_PHYSICAL,
- "StreamExecSinkRule") {
+ "StreamExecLegacySinkRule") {
def convert(rel: RelNode): RelNode = {
- val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
+ val sinkNode = rel.asInstanceOf[FlinkLogicalLegacySink]
val newTrait = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
var requiredTraitSet = sinkNode.getInput.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
if (sinkNode.catalogTable != null && sinkNode.catalogTable.isPartitioned) {
@@ -75,7 +75,7 @@ class StreamExecSinkRule extends ConverterRule(
val newInput = RelOptRule.convert(sinkNode.getInput, requiredTraitSet)
- new StreamExecSink(
+ new StreamExecLegacySink(
rel.getCluster,
newTrait,
newInput,
@@ -84,8 +84,8 @@ class StreamExecSinkRule extends ConverterRule(
}
}
-object StreamExecSinkRule {
+object StreamExecLegacySinkRule {
- val INSTANCE: RelOptRule = new StreamExecSinkRule
+ val INSTANCE: RelOptRule = new StreamExecLegacySinkRule
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
index 91a5d33636a7333de24d3f3144764e9e36686171..059fc244b525eaff8686b2a01644e4472d73e5e5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.plan.utils
-import org.apache.flink.table.planner.plan.nodes.calcite.Sink
+import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, ExecNodeVisitorImpl}
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel
@@ -116,7 +116,7 @@ object ExecNodePlanDumper {
}
val reuseId = reuseInfoBuilder.getReuseId(node)
val isReuseNode = reuseId.isDefined
- if (node.isInstanceOf[Sink] || (isReuseNode && !reuseInfoMap.containsKey(node))) {
+ if (node.isInstanceOf[LegacySink] || (isReuseNode && !reuseInfoMap.containsKey(node))) {
if (isReuseNode) {
reuseInfoMap.put(node, (reuseId.get, true))
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala
index bf0e53d7c7865c6e12fbf145c3cf2e4b62cc7468..da0144ed42851b3df56bbd19d9c0acce27f30470 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.utils
import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-import org.apache.flink.table.planner.plan.nodes.calcite.Sink
+import org.apache.flink.table.planner.plan.nodes.calcite.LegacySink
import org.apache.flink.table.sinks.UpsertStreamTableSink
import scala.collection.JavaConversions._
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
object UpdatingPlanChecker {
def getUniqueKeyForUpsertSink(
- sinkNode: Sink,
+ sinkNode: LegacySink,
planner: PlannerBase,
sink: UpsertStreamTableSink[_]): Option[Array[String]] = {
// extract unique key fields
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
index b9c6c8520ef422d4d0b44b1492bbd62c110d012a..235e3aeb64429e64460f3b53f881d09361b60ced 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
@@ -1,11 +1,11 @@
== Abstract Syntax Tree ==
-LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+- LogicalProject(d=[$0], e=[$1])
+- LogicalFilter(condition=[>($0, 10)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
== Optimized Logical Plan ==
-Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+- Calc(select=[a, b], where=[>(a, 10)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
index 2af200c6428b5460003c4049463080ef4bcc538b..fbdf9f19d06cc588d85a00d36e3f17fc085824f4 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
@@ -1,11 +1,11 @@
== Abstract Syntax Tree ==
-LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+- LogicalProject(a=[$0], b=[$1])
+- LogicalFilter(condition=[>($0, 10)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
== Optimized Logical Plan ==
-Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+- Calc(select=[a, b], where=[>(a, 10)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testFromToDataStreamAndSqlUpdate.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testFromToDataStreamAndSqlUpdate.out
index f3bc0cb93f4b1c5b216e2e75222c995ffcfb992e..fb0829b723312490940f95a5889c32319bcae60b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testFromToDataStreamAndSqlUpdate.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testFromToDataStreamAndSqlUpdate.out
@@ -1,10 +1,10 @@
== Abstract Syntax Tree ==
-LogicalSink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+- LogicalProject(first=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
== Optimized Logical Plan ==
-Sink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+- Calc(select=[first])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[first, id, score, last])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testSqlUpdateAndToDataStream.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testSqlUpdateAndToDataStream.out
index 3ac5cf12148a20d076fd133192a81466a0c82bfe..f97b69188dd1aa4e18896ca3f58bf5af51e7d56e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testSqlUpdateAndToDataStream.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testSqlUpdateAndToDataStream.out
@@ -1,10 +1,10 @@
== Abstract Syntax Tree ==
-LogicalSink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+- LogicalProject(first=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first, id, score, last)]]])
== Optimized Logical Plan ==
-Sink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
== Physical Execution Plan ==
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSet.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSet.out
index 71fdca93f3f4b65b1ad9d4a719fd32aafdb7e3f1..166ae69153d90ebd6fac566b6280dd002087a746 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSet.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSet.out
@@ -1,17 +1,17 @@
== Abstract Syntax Tree ==
-LogicalSink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+- LogicalProject(first=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first, id, score, last)]]])
-LogicalSink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[last])
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[last])
+- LogicalProject(last=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first, id, score, last)]]])
== Optimized Logical Plan ==
-Sink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[first])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
-Sink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[last])
+LegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[last])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
== Physical Execution Plan ==
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExplain.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExplain.out
index 454d10365cbf9e34bd2b2bec56e84743b86e6eaa..e1270156f50c1df50ed56d5e441008537e20ceda 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExplain.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExplain.out
@@ -1,10 +1,10 @@
== Abstract Syntax Tree ==
-LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[first])
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[first])
+- LogicalProject(first=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first, id, score, last)]]])
== Optimized Logical Plan ==
-Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[first])
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[first])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
== Physical Execution Plan ==
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml
index e464dfbdf462b4a736d1719c96931973340f594c..98a40735554cdfcdabb081167f4e5a1105a94fe9 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml
@@ -283,14 +283,14 @@ Calc(select=[a, b, c, e, f]): rowcount = , cumulative cost = {rows, cpu, io, net