diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java index 4495620be61b9ce4f0cebe27a8656f9a1a84abd3..d146bbda0d9e888dd74b26ea560c49a8abc4fbc1 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java @@ -27,7 +27,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexProgram; import javax.annotation.Nullable; @@ -38,7 +37,6 @@ public class BatchExecCorrelate extends CommonExecCorrelate implements BatchExec public BatchExecCorrelate( FlinkJoinType joinType, - @Nullable RexProgram project, RexCall invocation, @Nullable RexNode condition, ExecEdge inputEdge, @@ -46,7 +44,6 @@ public class BatchExecCorrelate extends CommonExecCorrelate implements BatchExec String description) { super( joinType, - project, invocation, condition, TableStreamOperator.class, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java index 1ec1ccb47907a161a4a10bba7cd6e89d87467fd3..35dd52a80849e4efb112565aa85a2b4253f2e104 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java @@ -32,7 +32,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexProgram; import javax.annotation.Nullable; @@ -44,8 +43,6 @@ import java.util.Optional; */ public abstract class CommonExecCorrelate extends ExecNodeBase { private final FlinkJoinType joinType; - @Nullable - private final RexProgram project; private final RexCall invocation; @Nullable private final RexNode condition; @@ -54,7 +51,6 @@ public abstract class CommonExecCorrelate extends ExecNodeBase { public CommonExecCorrelate( FlinkJoinType joinType, - @Nullable RexProgram project, RexCall invocation, @Nullable RexNode condition, Class operatorBaseClass, @@ -64,7 +60,6 @@ public abstract class CommonExecCorrelate extends ExecNodeBase { String description) { super(Collections.singletonList(inputEdge), outputType, description); this.joinType = joinType; - this.project = project; this.invocation = invocation; this.condition = condition; this.operatorBaseClass = operatorBaseClass; @@ -83,7 +78,6 @@ public abstract class CommonExecCorrelate extends ExecNodeBase { ctx, inputTransform, (RowType) inputNode.getOutputType(), - JavaScalaConversionUtil.toScala(Optional.ofNullable(project)), invocation, JavaScalaConversionUtil.toScala(Optional.ofNullable(condition)), (RowType) getOutputType(), diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java index 5270af3739d487a1370b0fc9ef9267a387d01b8a..3d86548267d871448a2c5193d90b63c97f06148f 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java @@ -28,7 +28,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexProgram; import javax.annotation.Nullable; @@ -39,7 +38,6 @@ public class StreamExecCorrelate extends CommonExecCorrelate implements StreamEx public StreamExecCorrelate( FlinkJoinType joinType, - @Nullable RexProgram project, RexCall invocation, @Nullable RexNode condition, ExecEdge inputEdge, @@ -47,7 +45,6 @@ public class StreamExecCorrelate extends CommonExecCorrelate implements StreamEx String description) { super( joinType, - project, invocation, condition, AbstractProcessStreamOperator.class, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java index 748b10db762ca92c6f6d33fe3aca9103929edb3e..9467b6ac618479a640a827177bc0bf9f0a4c4b04 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java @@ -118,7 +118,6 @@ public class BatchPhysicalPythonCorrelateRule extends ConverterRule { convInput, scan, condition, - null, correlate.getRowType(), correlate.getJoinType()); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java index b59bbf2c1d64081f76084237ab43a9d2fc9872f3..d44fbee2aca615946292629dd2d7fd1c3c4f97aa 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java @@ -124,7 +124,6 @@ public class StreamPhysicalPythonCorrelateRule extends ConverterRule { correlate.getCluster(), traitSet, convInput, - null, scan, condition, correlate.getRowType(), diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala index 000cefb3ae1d6a573a8b64e166861a71e5868270..79bf7880c6b0ef3ffe6da02704690d0b8e1cb4e6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala @@ -21,8 +21,8 @@ package org.apache.flink.table.planner.codegen import org.apache.flink.api.common.functions.Function import org.apache.flink.api.dag.Transformation import org.apache.flink.table.api.{TableConfig, TableException, ValidationException} +import org.apache.flink.table.data.RowData import org.apache.flink.table.data.utils.JoinedRowData -import org.apache.flink.table.data.{GenericRowData, RowData} import org.apache.flink.table.functions.FunctionKind import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.CodeGenUtils._ @@ -37,8 +37,6 @@ import org.apache.flink.table.types.logical.RowType import org.apache.calcite.rex._ -import scala.collection.JavaConversions._ - object CorrelateCodeGenerator { def generateCorrelateTransformation( @@ -46,7 +44,6 @@ object CorrelateCodeGenerator { operatorCtx: CodeGeneratorContext, inputTransformation: Transformation[RowData], inputType: RowType, - projectProgram: Option[RexProgram], invocation: RexCall, condition: Option[RexNode], outputType: RowType, @@ -68,19 +65,6 @@ object CorrelateCodeGenerator { s"Currently, only table functions can be used in a correlate operation.") } - val swallowInputOnly = if (projectProgram.isDefined) { - val program = projectProgram.get - val selects = program.getProjectList.map(_.getIndex) - val inputFieldCnt = program.getInputRowType.getFieldCount - val swallowInputOnly = selects.head > inputFieldCnt && - (inputFieldCnt - outputType.getFieldCount == inputType.getFieldCount) - // partial output or output right only - swallowInputOnly - } else { - // completely output left input + right - false - } - // adjust indicies of InputRefs to adhere to schema expected by generator val changeInputRefIndexShuttle = new RexShuttle { override def visitInputRef(inputRef: RexInputRef): RexNode = { @@ -92,8 +76,6 @@ object CorrelateCodeGenerator { operatorCtx, config, inputType, - projectProgram, - swallowInputOnly, condition.map(_.accept(changeInputRefIndexShuttle)), outputType, joinType, @@ -117,8 +99,6 @@ object CorrelateCodeGenerator { ctx: CodeGeneratorContext, config: TableConfig, inputType: RowType, - projectProgram: Option[RexProgram], - swallowInputOnly: Boolean = false, condition: Option[RexNode], returnType: RowType, joinType: FlinkJoinType, @@ -136,8 +116,6 @@ object CorrelateCodeGenerator { ctx, config, inputType, - projectProgram, - swallowInputOnly, functionResultType, returnType, condition, @@ -167,78 +145,27 @@ object CorrelateCodeGenerator { // 3. left join if (joinType == FlinkJoinType.LEFT) { - if (swallowInputOnly) { - // and the returned row table function is empty, collect a null - val nullRowTerm = CodeGenUtils.newName("nullRow") - ctx.addReusableOutputRecord(functionResultType, classOf[GenericRowData], nullRowTerm) - ctx.addReusableNullRow(nullRowTerm, functionResultType.getFieldCount) - val header = if (retainHeader) { - s"$nullRowTerm.setRowKind(${exprGenerator.input1Term}.getRowKind());" - } else { - "" - } - body += - s""" - |boolean hasOutput = $correlateCollectorTerm.isCollected(); - |if (!hasOutput) { - | $header - | $correlateCollectorTerm.outputResult($nullRowTerm); - |} - |""".stripMargin - } else if (projectProgram.isDefined) { - // output partial fields of left and right - val outputTerm = CodeGenUtils.newName("projectOut") - ctx.addReusableOutputRecord(returnType, classOf[GenericRowData], outputTerm) - - val header = if (retainHeader) { - s"$outputTerm.setRowKind(${CodeGenUtils.DEFAULT_INPUT1_TERM}.getRowKind());" - } else { - "" - } - val projectionExpression = generateProjectResultExpr( - ctx, - config, - inputType, - functionResultType, - udtfAlwaysNull = true, - returnType, - outputTerm, - projectProgram.get) - - body += - s""" - |boolean hasOutput = $correlateCollectorTerm.isCollected(); - |if (!hasOutput) { - | ${projectionExpression.code} - | $header - | $correlateCollectorTerm.outputResult($outputTerm); - |} - |""".stripMargin - + // output all fields of left and right + // in case of left outer join and the returned row of table function is empty, + // fill all fields of row with null + val joinedRowTerm = CodeGenUtils.newName("joinedRow") + val nullRowTerm = CodeGenUtils.newName("nullRow") + ctx.addReusableOutputRecord(returnType, classOf[JoinedRowData], joinedRowTerm) + ctx.addReusableNullRow(nullRowTerm, functionResultType.getFieldCount) + val header = if (retainHeader) { + s"$joinedRowTerm.setRowKind(${exprGenerator.input1Term}.getRowKind());" } else { - // output all fields of left and right - // in case of left outer join and the returned row of table function is empty, - // fill all fields of row with null - val joinedRowTerm = CodeGenUtils.newName("joinedRow") - val nullRowTerm = CodeGenUtils.newName("nullRow") - ctx.addReusableOutputRecord(returnType, classOf[JoinedRowData], joinedRowTerm) - ctx.addReusableNullRow(nullRowTerm, functionResultType.getFieldCount) - val header = if (retainHeader) { - s"$joinedRowTerm.setRowKind(${exprGenerator.input1Term}.getRowKind());" - } else { - "" - } - body += - s""" - |boolean hasOutput = $correlateCollectorTerm.isCollected(); - |if (!hasOutput) { - | $joinedRowTerm.replace(${exprGenerator.input1Term}, $nullRowTerm); - | $header - | $correlateCollectorTerm.outputResult($joinedRowTerm); - |} - |""".stripMargin - - } + "" + } + body += + s""" + |boolean hasOutput = $correlateCollectorTerm.isCollected(); + |if (!hasOutput) { + | $joinedRowTerm.replace(${exprGenerator.input1Term}, $nullRowTerm); + | $header + | $correlateCollectorTerm.outputResult($joinedRowTerm); + |} + |""".stripMargin } else if (joinType != FlinkJoinType.INNER) { throw new TableException(s"Unsupported JoinRelType: $joinType for correlate join.") } @@ -248,34 +175,6 @@ object CorrelateCodeGenerator { new CodeGenOperatorFactory(genOperator) } - private def generateProjectResultExpr( - ctx: CodeGeneratorContext, - config: TableConfig, - input1Type: RowType, - functionResultType: RowType, - udtfAlwaysNull: Boolean, - returnType: RowType, - outputTerm: String, - program: RexProgram): GeneratedExpression = { - val projectExprGenerator = new ExprCodeGenerator(ctx, udtfAlwaysNull) - .bindInput(input1Type, CodeGenUtils.DEFAULT_INPUT1_TERM) - if (udtfAlwaysNull) { - val udtfNullRow = CodeGenUtils.newName("udtfNullRow") - ctx.addReusableNullRow(udtfNullRow, functionResultType.getFieldCount) - - projectExprGenerator.bindSecondInput( - functionResultType, - udtfNullRow) - } else { - projectExprGenerator.bindSecondInput( - functionResultType) - } - val projection = program.getProjectList.map(program.expandLocalRef) - val projectionExprs = projection.map(projectExprGenerator.generateExpression) - projectExprGenerator.generateResultExpression( - projectionExprs, returnType, classOf[GenericRowData], outputTerm) - } - /** * Generates a collector that correlates input and converted table function results. Returns a * collector term for referencing the collector. @@ -284,8 +183,6 @@ object CorrelateCodeGenerator { ctx: CodeGeneratorContext, config: TableConfig, inputType: RowType, - projectProgram: Option[RexProgram], - swallowInputOnly: Boolean, functionResultType: RowType, resultType: RowType, condition: Option[RexNode], @@ -298,45 +195,7 @@ object CorrelateCodeGenerator { val collectorCtx = CodeGeneratorContext(config) - val body = if (projectProgram.isDefined) { - // partial output - if (swallowInputOnly) { - // output right only - val header = if (retainHeader) { - s"$udtfInputTerm.setRowKind($inputTerm.getRowKind());" - } else { - "" - } - s""" - |$header - |outputResult($udtfInputTerm); - """.stripMargin - } else { - val outputTerm = CodeGenUtils.newName("projectOut") - collectorCtx.addReusableOutputRecord(resultType, classOf[GenericRowData], outputTerm) - - val header = if (retainHeader) { - s"$outputTerm.setRowKind($inputTerm.getRowKind());" - } else { - "" - } - val projectionExpression = generateProjectResultExpr( - collectorCtx, - config, - inputType, - functionResultType, - udtfAlwaysNull = false, - resultType, - outputTerm, - projectProgram.get) - - s""" - |$header - |${projectionExpression.code} - |outputResult(${projectionExpression.resultTerm}); - """.stripMargin - } - } else { + val body = { // completely output left input + right val joinedRowTerm = CodeGenUtils.newName("joinedRow") collectorCtx.addReusableOutputRecord(resultType, classOf[JoinedRowData], joinedRowTerm) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala index 6107acbef6ca14cbd0fca63e695a0a1714a48bc8..2b2026c0da34f16b9789c2ab21309cfe8eca877a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala @@ -27,7 +27,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.{Correlate, JoinRelType} -import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode} /** * Batch physical RelNode for [[Correlate]] (Java/Scala user defined table function). @@ -38,7 +38,6 @@ class BatchPhysicalCorrelate( inputRel: RelNode, scan: FlinkLogicalTableFunctionScan, condition: Option[RexNode], - projectProgram: Option[RexProgram], outputRowType: RelDataType, joinType: JoinRelType) extends BatchPhysicalCorrelateBase( @@ -47,14 +46,12 @@ class BatchPhysicalCorrelate( inputRel, scan, condition, - projectProgram, outputRowType, joinType) { def copy( traitSet: RelTraitSet, child: RelNode, - projectProgram: Option[RexProgram], outputType: RelDataType): RelNode = { new BatchPhysicalCorrelate( cluster, @@ -62,7 +59,6 @@ class BatchPhysicalCorrelate( child, scan, condition, - projectProgram, outputType, joinType) } @@ -70,7 +66,6 @@ class BatchPhysicalCorrelate( override def translateToExecNode(): ExecNode[_] = { new BatchExecCorrelate( JoinTypeUtil.getFlinkJoinType(joinType), - projectProgram.orNull, scan.getCall.asInstanceOf[RexCall], condition.orNull, ExecEdge.DEFAULT, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala index 0462e30c3f0556c157fed1f58a0b618686d64d9a..6d11af24804cc63eaaeb02d272ebc98ba4586824 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala @@ -26,8 +26,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.{Correlate, JoinRelType} import org.apache.calcite.rel.{RelCollationTraitDef, RelDistribution, RelFieldCollation, RelNode, RelWriter, SingleRel} -import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram} -import org.apache.calcite.sql.SqlKind +import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings} import scala.collection.JavaConversions._ @@ -41,7 +40,6 @@ abstract class BatchPhysicalCorrelateBase( inputRel: RelNode, scan: FlinkLogicalTableFunctionScan, condition: Option[RexNode], - projectProgram: Option[RexProgram], outputRowType: RelDataType, joinType: JoinRelType) extends SingleRel(cluster, traitSet, inputRel) @@ -52,7 +50,7 @@ abstract class BatchPhysicalCorrelateBase( override def deriveRowType(): RelDataType = outputRowType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - copy(traitSet, inputs.get(0), projectProgram, outputRowType) + copy(traitSet, inputs.get(0), outputRowType) } /** @@ -61,7 +59,6 @@ abstract class BatchPhysicalCorrelateBase( def copy( traitSet: RelTraitSet, child: RelNode, - projectProgram: Option[RexProgram], outputType: RelDataType): RelNode override def explainTerms(pw: RelWriter): RelWriter = { @@ -85,30 +82,11 @@ abstract class BatchPhysicalCorrelateBase( def getOutputInputMapping: Mapping = { val inputFieldCnt = getInput.getRowType.getFieldCount - projectProgram match { - case Some(program) => - val projects = program.getProjectList.map(program.expandLocalRef) - val mapping = Mappings.create(MappingType.INVERSE_FUNCTION, inputFieldCnt, projects.size) - projects.zipWithIndex.foreach { - case (project, index) => - project match { - case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index) - case call: RexCall if call.getKind == SqlKind.AS => - call.getOperands.head match { - case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index) - case _ => // ignore - } - case _ => // ignore - } - } - mapping.inverse() - case _ => - val mapping = Mappings.create(MappingType.FUNCTION, inputFieldCnt, inputFieldCnt) - (0 until inputFieldCnt).foreach { - index => mapping.set(index, index) - } - mapping + val mapping = Mappings.create(MappingType.FUNCTION, inputFieldCnt, inputFieldCnt) + (0 until inputFieldCnt).foreach { + index => mapping.set(index, index) } + mapping } val mapping = getOutputInputMapping diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala index 17324720581a16da5711b7bdedb6fed6567ca0b9..c939583ffc264358927f529e29b8ddb2c876e414 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala @@ -27,7 +27,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.{Correlate, JoinRelType} -import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode} /** * Batch physical RelNode for [[Correlate]] (Python user defined table function). @@ -38,7 +38,6 @@ class BatchPhysicalPythonCorrelate( inputRel: RelNode, scan: FlinkLogicalTableFunctionScan, condition: Option[RexNode], - projectProgram: Option[RexProgram], outputRowType: RelDataType, joinType: JoinRelType) extends BatchPhysicalCorrelateBase( @@ -47,7 +46,6 @@ class BatchPhysicalPythonCorrelate( inputRel, scan, condition, - projectProgram, outputRowType, joinType) with CommonPythonCorrelate { @@ -55,7 +53,6 @@ class BatchPhysicalPythonCorrelate( def copy( traitSet: RelTraitSet, child: RelNode, - projectProgram: Option[RexProgram], outputType: RelDataType): RelNode = { new BatchPhysicalPythonCorrelate( cluster, @@ -63,7 +60,6 @@ class BatchPhysicalPythonCorrelate( child, scan, condition, - projectProgram, outputType, joinType) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala index 70bf36972230ce33e967279d67d7187457b235e2..7f1e99282e0734d119a165782bc0f6915e00cab1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala @@ -27,7 +27,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode} /** * Flink RelNode which matches along with join a Java/Scala user defined table function. @@ -36,7 +36,6 @@ class StreamPhysicalCorrelate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, - projectProgram: Option[RexProgram], scan: FlinkLogicalTableFunctionScan, condition: Option[RexNode], outputRowType: RelDataType, @@ -45,7 +44,6 @@ class StreamPhysicalCorrelate( cluster, traitSet, inputRel, - projectProgram, scan, condition, outputRowType, @@ -54,13 +52,11 @@ class StreamPhysicalCorrelate( def copy( traitSet: RelTraitSet, newChild: RelNode, - projectProgram: Option[RexProgram], outputType: RelDataType): RelNode = { new StreamPhysicalCorrelate( cluster, traitSet, newChild, - projectProgram, scan, condition, outputType, @@ -70,7 +66,6 @@ class StreamPhysicalCorrelate( override def translateToExecNode(): ExecNode[_] = { new StreamExecCorrelate( JoinTypeUtil.getFlinkJoinType(joinType), - projectProgram.orNull, scan.getCall.asInstanceOf[RexCall], condition.orNull, ExecEdge.DEFAULT, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala index 0f0642cbb32b2842e11bdd3d9d25a12d7aef11bb..a633924c4758db58df7508294c60af7e7a356c8e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala @@ -24,7 +24,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode} import scala.collection.JavaConversions._ @@ -35,7 +35,6 @@ abstract class StreamPhysicalCorrelateBase( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, - val projectProgram: Option[RexProgram], scan: FlinkLogicalTableFunctionScan, condition: Option[RexNode], outputRowType: RelDataType, @@ -50,7 +49,7 @@ abstract class StreamPhysicalCorrelateBase( override def deriveRowType(): RelDataType = outputRowType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - copy(traitSet, inputs.get(0), projectProgram, outputRowType) + copy(traitSet, inputs.get(0), outputRowType) } /** @@ -59,7 +58,6 @@ abstract class StreamPhysicalCorrelateBase( def copy( traitSet: RelTraitSet, newChild: RelNode, - projectProgram: Option[RexProgram], outputType: RelDataType): RelNode override def explainTerms(pw: RelWriter): RelWriter = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala index 9a29323b046841104eb6ed6ca87c37f9d59797d4..1e08b4d900160c52f668a848849ef34cd5ec66fa 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala @@ -27,7 +27,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode} /** * Flink RelNode which matches along with join a python user defined table function. @@ -36,7 +36,6 @@ class StreamPhysicalPythonCorrelate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, - projectProgram: Option[RexProgram], scan: FlinkLogicalTableFunctionScan, condition: Option[RexNode], outputRowType: RelDataType, @@ -45,7 +44,6 @@ class StreamPhysicalPythonCorrelate( cluster, traitSet, inputRel, - projectProgram, scan, condition, outputRowType, @@ -55,13 +53,11 @@ class StreamPhysicalPythonCorrelate( def copy( traitSet: RelTraitSet, newChild: RelNode, - projectProgram: Option[RexProgram], outputType: RelDataType): RelNode = { new StreamPhysicalPythonCorrelate( cluster, traitSet, newChild, - projectProgram, scan, condition, outputType, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala index fa83ab7e1f56ad828402cd81dfbbf8ba5f21a00b..15e001ee84754a3a6c6a30c19effd716a6875573 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala @@ -72,7 +72,6 @@ class BatchPhysicalConstantTableFunctionScanRule values, scan, None, - None, scan.getRowType, JoinRelType.INNER) call.transformTo(correlate) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala index bc94541bfd402ede9ebb23cb0565d290ede6c760..2c321e1a7c22b19ca71eca5037eddce1d874a7d3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala @@ -77,7 +77,6 @@ class BatchPhysicalCorrelateRule extends ConverterRule( convInput, scan, condition, - None, rel.getRowType, join.getJoinType) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala index ed9edbfa142aafbe975bd1f5c1e9235f8c095abf..2d8dd196baef41cda7512db69718cea00887c35a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala @@ -70,7 +70,6 @@ class StreamPhysicalConstantTableFunctionScanRule cluster, traitSet, values, - None, scan, None, scan.getRowType, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala index 182ebeb570e9b9e3fe3ff8032bf58c880889dfc6..bc6972b50b2a651208521124911f0cf01f0ade4c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala @@ -92,7 +92,6 @@ class StreamPhysicalCorrelateRule rel.getCluster, traitSet, convInput, - None, scan, condition, rel.getRowType,