提交 f44b3837 编写于 作者: G godfreyhe

[FLINK-20705][table-planner-blink] Introduce BatchPhysicalValues, and make...

[FLINK-20705][table-planner-blink] Introduce BatchPhysicalValues, and make BatchExecValues only extended from ExecNode

This closes #14454
上级 acbda639
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.batch;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecValues;
import org.apache.flink.table.types.logical.RowType;
import org.apache.calcite.rex.RexLiteral;
import java.util.List;
/**
* Batch {@link ExecNode} that read records from given values.
*/
public class BatchExecValues extends CommonExecValues implements BatchExecNode<RowData> {
public BatchExecValues(List<List<RexLiteral>> tuples, RowType outputType, String description) {
super(tuples, outputType, description);
}
}
...@@ -18,12 +18,9 @@ ...@@ -18,12 +18,9 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.ValuesCodeGenerator import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
import org.apache.flink.table.planner.delegation.BatchPlanner import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, LegacyBatchExecNode}
import com.google.common.collect.ImmutableList import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
...@@ -39,19 +36,18 @@ import scala.collection.JavaConversions._ ...@@ -39,19 +36,18 @@ import scala.collection.JavaConversions._
/** /**
* Batch physical RelNode for [[Values]]. * Batch physical RelNode for [[Values]].
*/ */
class BatchExecValues( class BatchPhysicalValues(
cluster: RelOptCluster, cluster: RelOptCluster,
traitSet: RelTraitSet, traitSet: RelTraitSet,
tuples: ImmutableList[ImmutableList[RexLiteral]], tuples: ImmutableList[ImmutableList[RexLiteral]],
outputRowType: RelDataType) outputRowType: RelDataType)
extends Values(cluster, outputRowType, tuples, traitSet) extends Values(cluster, outputRowType, tuples, traitSet)
with BatchPhysicalRel with BatchPhysicalRel {
with LegacyBatchExecNode[RowData] {
override def deriveRowType(): RelDataType = outputRowType override def deriveRowType(): RelDataType = outputRowType
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new BatchExecValues(cluster, traitSet, getTuples, outputRowType) new BatchPhysicalValues(cluster, traitSet, getTuples, outputRowType)
} }
override def explainTerms(pw: RelWriter): RelWriter = { override def explainTerms(pw: RelWriter): RelWriter = {
...@@ -59,23 +55,11 @@ class BatchExecValues( ...@@ -59,23 +55,11 @@ class BatchExecValues(
.item("values", getRowType.getFieldNames.toList.mkString(", ")) .item("values", getRowType.getFieldNames.toList.mkString(", "))
} }
//~ ExecNode methods ----------------------------------------------------------- override def translateToExecNode(): ExecNode[_] = {
new BatchExecValues(
override def getInputEdges: util.List[ExecEdge] = List()
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[RowData] = {
val inputFormat = ValuesCodeGenerator.generatorInputFormat(
planner.getTableConfig,
FlinkTypeFactory.toLogicalRowType(getRowType),
tuples.asList().map(_.asList()), tuples.asList().map(_.asList()),
getRelTypeName) FlinkTypeFactory.toLogicalRowType(getRowType),
val transformation = planner.getExecEnv.createInput(inputFormat, getRelDetailedDescription
inputFormat.getProducedType).getTransformation )
transformation.setName(getRelDetailedDescription)
transformation.setParallelism(1)
transformation
} }
} }
...@@ -396,7 +396,7 @@ object FlinkBatchRuleSets { ...@@ -396,7 +396,7 @@ object FlinkBatchRuleSets {
BatchPhysicalTableSourceScanRule.INSTANCE, BatchPhysicalTableSourceScanRule.INSTANCE,
BatchPhysicalLegacyTableSourceScanRule.INSTANCE, BatchPhysicalLegacyTableSourceScanRule.INSTANCE,
BatchExecIntermediateTableScanRule.INSTANCE, BatchExecIntermediateTableScanRule.INSTANCE,
BatchExecValuesRule.INSTANCE, BatchPhysicalValuesRule.INSTANCE,
// calc // calc
BatchPhysicalCalcRule.INSTANCE, BatchPhysicalCalcRule.INSTANCE,
BatchPhysicalPythonCalcRule.INSTANCE, BatchPhysicalPythonCalcRule.INSTANCE,
......
...@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch ...@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCorrelate, BatchExecValues} import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCorrelate, BatchPhysicalValues}
import com.google.common.collect.ImmutableList import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.plan.RelOptRule._
...@@ -31,9 +31,9 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil} ...@@ -31,9 +31,9 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil}
/** /**
* Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to
* {{{ * {{{
* [[BatchPhysicalCorrelate]] * [[BatchPhysicalCorrelate]]
* / \ * / \
* empty [[BatchExecValues]] [[FlinkLogicalTableFunctionScan]] * empty [[BatchPhysicalValuesRule]] [[FlinkLogicalTableFunctionScan]]
* }}} * }}}
* *
* Add the rule to support select from a UDF directly, such as the following SQL: * Add the rule to support select from a UDF directly, such as the following SQL:
...@@ -60,7 +60,7 @@ class BatchPhysicalConstantTableFunctionScanRule ...@@ -60,7 +60,7 @@ class BatchPhysicalConstantTableFunctionScanRule
// create correlate left // create correlate left
val cluster = scan.getCluster val cluster = scan.getCluster
val traitSet = call.getPlanner.emptyTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) val traitSet = call.getPlanner.emptyTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
val values = new BatchExecValues( val values = new BatchPhysicalValues(
cluster, cluster,
traitSet, traitSet,
ImmutableList.of(ImmutableList.of[RexLiteral]()), ImmutableList.of(ImmutableList.of[RexLiteral]()),
......
...@@ -20,26 +20,26 @@ package org.apache.flink.table.planner.plan.rules.physical.batch ...@@ -20,26 +20,26 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalValues import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalValues
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecValues import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalValues
import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.convert.ConverterRule
/** /**
* Rule that converts [[FlinkLogicalValues]] to [[BatchExecValues]]. * Rule that converts [[FlinkLogicalValues]] to [[BatchPhysicalValues]].
*/ */
class BatchExecValuesRule extends ConverterRule( class BatchPhysicalValuesRule extends ConverterRule(
classOf[FlinkLogicalValues], classOf[FlinkLogicalValues],
FlinkConventions.LOGICAL, FlinkConventions.LOGICAL,
FlinkConventions.BATCH_PHYSICAL, FlinkConventions.BATCH_PHYSICAL,
"BatchExecValuesRule") { "BatchPhysicalValuesRule") {
def convert(rel: RelNode): RelNode = { def convert(rel: RelNode): RelNode = {
val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues] val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues]
val providedTraitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) val providedTraitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
new BatchExecValues( new BatchPhysicalValues(
rel.getCluster, rel.getCluster,
providedTraitSet, providedTraitSet,
values.getTuples, values.getTuples,
...@@ -47,6 +47,6 @@ class BatchExecValuesRule extends ConverterRule( ...@@ -47,6 +47,6 @@ class BatchExecValuesRule extends ConverterRule(
} }
} }
object BatchExecValuesRule { object BatchPhysicalValuesRule {
val INSTANCE: RelOptRule = new BatchExecValuesRule val INSTANCE: RelOptRule = new BatchPhysicalValuesRule
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册