From f44b383767c09d35d21d5c0b87ed8deb44276c3a Mon Sep 17 00:00:00 2001 From: godfreyhe Date: Tue, 22 Dec 2020 11:58:38 +0800 Subject: [PATCH] [FLINK-20705][table-planner-blink] Introduce BatchPhysicalValues, and make BatchExecValues only extended from ExecNode This closes #14454 --- .../nodes/exec/batch/BatchExecValues.java | 38 +++++++++++++++++++ ...Values.scala => BatchPhysicalValues.scala} | 36 +++++------------- .../plan/rules/FlinkBatchRuleSets.scala | 2 +- ...hysicalConstantTableFunctionScanRule.scala | 10 ++--- ...le.scala => BatchPhysicalValuesRule.scala} | 14 +++---- 5 files changed, 61 insertions(+), 39 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/{BatchExecValues.scala => BatchPhysicalValues.scala} (64%) rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/{BatchExecValuesRule.scala => BatchPhysicalValuesRule.scala} (84%) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java new file mode 100644 index 00000000000..8f74f2f8eb4 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecValues.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecValues; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rex.RexLiteral; + +import java.util.List; + +/** + * Batch {@link ExecNode} that read records from given values. + */ +public class BatchExecValues extends CommonExecValues implements BatchExecNode { + + public BatchExecValues(List> tuples, RowType outputType, String description) { + super(tuples, outputType, description); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalValues.scala similarity index 64% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalValues.scala index 2460af4a739..23ff9dc89e1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalValues.scala @@ -18,12 +18,9 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch -import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.codegen.ValuesCodeGenerator -import org.apache.flink.table.planner.delegation.BatchPlanner -import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, LegacyBatchExecNode} +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues import com.google.common.collect.ImmutableList import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} @@ -39,19 +36,18 @@ import scala.collection.JavaConversions._ /** * Batch physical RelNode for [[Values]]. */ -class BatchExecValues( +class BatchPhysicalValues( cluster: RelOptCluster, traitSet: RelTraitSet, tuples: ImmutableList[ImmutableList[RexLiteral]], outputRowType: RelDataType) extends Values(cluster, outputRowType, tuples, traitSet) - with BatchPhysicalRel - with LegacyBatchExecNode[RowData] { + with BatchPhysicalRel { override def deriveRowType(): RelDataType = outputRowType override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchExecValues(cluster, traitSet, getTuples, outputRowType) + new BatchPhysicalValues(cluster, traitSet, getTuples, outputRowType) } override def explainTerms(pw: RelWriter): RelWriter = { @@ -59,23 +55,11 @@ class BatchExecValues( .item("values", getRowType.getFieldNames.toList.mkString(", ")) } - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List() - - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { - val inputFormat = ValuesCodeGenerator.generatorInputFormat( - planner.getTableConfig, - FlinkTypeFactory.toLogicalRowType(getRowType), + override def translateToExecNode(): ExecNode[_] = { + new BatchExecValues( tuples.asList().map(_.asList()), - getRelTypeName) - val transformation = planner.getExecEnv.createInput(inputFormat, - inputFormat.getProducedType).getTransformation - transformation.setName(getRelDetailedDescription) - transformation.setParallelism(1) - transformation + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) } - } - diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 8adf7f556ca..f94869b14ff 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -396,7 +396,7 @@ object FlinkBatchRuleSets { BatchPhysicalTableSourceScanRule.INSTANCE, BatchPhysicalLegacyTableSourceScanRule.INSTANCE, BatchExecIntermediateTableScanRule.INSTANCE, - BatchExecValuesRule.INSTANCE, + BatchPhysicalValuesRule.INSTANCE, // calc BatchPhysicalCalcRule.INSTANCE, BatchPhysicalPythonCalcRule.INSTANCE, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala index 15e001ee847..a5ec16736ec 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCorrelate, BatchExecValues} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCorrelate, BatchPhysicalValues} import com.google.common.collect.ImmutableList import org.apache.calcite.plan.RelOptRule._ @@ -31,9 +31,9 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil} /** * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to * {{{ - * [[BatchPhysicalCorrelate]] - * / \ - * empty [[BatchExecValues]] [[FlinkLogicalTableFunctionScan]] + * [[BatchPhysicalCorrelate]] + * / \ + * empty [[BatchPhysicalValuesRule]] [[FlinkLogicalTableFunctionScan]] * }}} * * Add the rule to support select from a UDF directly, such as the following SQL: @@ -60,7 +60,7 @@ class BatchPhysicalConstantTableFunctionScanRule // create correlate left val cluster = scan.getCluster val traitSet = call.getPlanner.emptyTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) - val values = new BatchExecValues( + val values = new BatchPhysicalValues( cluster, traitSet, ImmutableList.of(ImmutableList.of[RexLiteral]()), diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecValuesRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala similarity index 84% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecValuesRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala index 545193c55e5..28fa2a16003 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecValuesRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalValuesRule.scala @@ -20,26 +20,26 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalValues -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecValues +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalValues import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule /** - * Rule that converts [[FlinkLogicalValues]] to [[BatchExecValues]]. + * Rule that converts [[FlinkLogicalValues]] to [[BatchPhysicalValues]]. */ -class BatchExecValuesRule extends ConverterRule( +class BatchPhysicalValuesRule extends ConverterRule( classOf[FlinkLogicalValues], FlinkConventions.LOGICAL, FlinkConventions.BATCH_PHYSICAL, - "BatchExecValuesRule") { + "BatchPhysicalValuesRule") { def convert(rel: RelNode): RelNode = { val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues] val providedTraitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) - new BatchExecValues( + new BatchPhysicalValues( rel.getCluster, providedTraitSet, values.getTuples, @@ -47,6 +47,6 @@ class BatchExecValuesRule extends ConverterRule( } } -object BatchExecValuesRule { - val INSTANCE: RelOptRule = new BatchExecValuesRule +object BatchPhysicalValuesRule { + val INSTANCE: RelOptRule = new BatchPhysicalValuesRule } -- GitLab