提交 5c37e55c 编写于 作者: T tonycox 提交者: Fabian Hueske

[FLINK-5698] [table] Add NestedFieldsProjectableTableSource interface.

This closes #3269.
上级 cac9fa02
......@@ -23,7 +23,7 @@ import org.apache.calcite.rel.core.Calc
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.plan.nodes.TableSourceScan
import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter}
import org.apache.flink.table.sources.ProjectableTableSource
import org.apache.flink.table.sources.{NestedFieldsProjectableTableSource, ProjectableTableSource}
trait PushProjectIntoTableSourceScanRuleBase {
......@@ -35,9 +35,18 @@ trait PushProjectIntoTableSourceScanRuleBase {
val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram)
// if no fields can be projected, we keep the original plan.
if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
val newTableSource = originTableSource.projectFields(usedFields)
val source = scan.tableSource
if (TableEnvironment.getFieldNames(source).length != usedFields.length) {
val newTableSource = source match {
case nested: NestedFieldsProjectableTableSource[_] =>
val nestedFields = RexProgramExtractor
.extractRefNestedInputFields(calc.getProgram, usedFields)
nested.projectNestedFields(usedFields, nestedFields)
case projecting: ProjectableTableSource[_] =>
val newScan = scan.copy(scan.getTraitSet, newTableSource)
val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection(
......@@ -92,6 +92,26 @@ object RexProgramExtractor {
case _ => (Array.empty, Array.empty)
* Extracts the name of nested input fields accessed by the RexProgram and returns the
* prefix of the accesses.
* @param rexProgram The RexProgram to analyze
* @return The full names of accessed input fields. e.g. field.subfield
def extractRefNestedInputFields(
rexProgram: RexProgram, usedFields: Array[Int]): Array[Array[String]] = {
val visitor = new RefFieldAccessorVisitor(usedFields)
rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
val condition = rexProgram.getCondition
if (condition != null) {
......@@ -181,3 +201,64 @@ class RexNodeToExpressionConverter(
* A RexVisitor to extract used nested input fields
class RefFieldAccessorVisitor(usedFields: Array[Int]) extends RexVisitorImpl[Unit](true) {
private val projectedFields: Array[Array[String]] = Array.fill(usedFields.length)(Array.empty)
private val order: Map[Int, Int] = usedFields.zipWithIndex.toMap
/** Returns the prefix of the nested field accesses */
def getProjectedFields: Array[Array[String]] = {
projectedFields.map { nestedFields =>
// sort nested field accesses
val sorted = nestedFields.sorted
// get prefix field accesses
val prefixAccesses = sorted.foldLeft(Nil: List[String]) {
(prefixAccesses, nestedAccess) => prefixAccesses match {
// first access => add access
case Nil => List[String](nestedAccess)
// top-level access already found => return top-level access
case head :: Nil if head.equals("*") => prefixAccesses
// access is top-level access => return top-level access
case _ :: _ if nestedAccess.equals("*") => List("*")
// previous access is not prefix of this access => add access
case head :: _ if !nestedAccess.startsWith(head) =>
nestedAccess :: prefixAccesses
// previous access is a prefix of this access => do not add access
case _ => prefixAccesses
override def visitFieldAccess(fieldAccess: RexFieldAccess): Unit = {
def internalVisit(fieldAccess: RexFieldAccess): (Int, String) = {
fieldAccess.getReferenceExpr match {
case ref: RexInputRef =>
(ref.getIndex, fieldAccess.getField.getName)
case fac: RexFieldAccess =>
val (i, n) = internalVisit(fac)
(i, s"$n.${fieldAccess.getField.getName}")
val (index, fullName) = internalVisit(fieldAccess)
val outputIndex = order.getOrElse(index, -1)
val fields: Array[String] = projectedFields(outputIndex)
projectedFields(outputIndex) = fields :+ fullName
override def visitInputRef(inputRef: RexInputRef): Unit = {
val outputIndex = order.getOrElse(inputRef.getIndex, -1)
val fields: Array[String] = projectedFields(outputIndex)
projectedFields(outputIndex) = fields :+ "*"
override def visitCall(call: RexCall): Unit =
call.operands.foreach(operand => operand.accept(this))
package org.apache.flink.table.sources
* Adds support for projection push-down to a [[TableSource]] with nested fields.
* A [[TableSource]] extending this interface is able
* to project the nested fields of the returned table.
* @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
trait NestedFieldsProjectableTableSource[T] {
* Creates a copy of the [[TableSource]] that projects its output on the specified nested fields.
* @param fields The indexes of the fields to return.
* @param nestedFields The accessed nested fields of the fields to return.
* e.g.
* tableSchema = {
* id,
* student<\school<\city, tuition>, age, name>,
* teacher<\age, name>
* }
* select (id, student.school.city, student.age, teacher)
* fields = field = [0, 1, 2]
* nestedFields \[\["*"], ["school.city", "age"], ["*"\]\]
* @return A copy of the [[TableSource]] that projects its output.
def projectNestedFields(
fields: Array[Int],
nestedFields: Array[Array[String]]): TableSource[T]
......@@ -20,12 +20,15 @@ package org.apache.flink.table.plan.util
import java.math.BigDecimal
import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder}
import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
import org.apache.calcite.sql.SqlPostfixOperator
import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.table.expressions.{Expression, ExpressionParser}
import org.apache.flink.table.utils.InputTypeBuilder.inputOf
import org.apache.flink.table.validate.FunctionCatalog
import org.junit.Assert.{assertArrayEquals, assertEquals}
import org.hamcrest.CoreMatchers.is
import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat}
import org.junit.Test
import scala.collection.JavaConverters._
......@@ -306,6 +309,180 @@ class RexProgramExtractorTest extends RexProgramTestBase {
def testExtractRefNestedInputFields(): Unit = {
val rexProgram = buildRexProgramWithNesting()
val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram)
val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields)
val expected = Array(Array("amount"), Array("*"))
assertThat(usedNestedFields, is(expected))
def testExtractRefNestedInputFieldsWithNoNesting(): Unit = {
val rexProgram = buildSimpleRexProgram()
val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram)
val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields)
val expected = Array(Array("*"), Array("*"), Array("*"))
assertThat(usedNestedFields, is(expected))
def testExtractDeepRefNestedInputFields(): Unit = {
val rexProgram = buildRexProgramWithDeepNesting()
val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram)
val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields)
val expected = Array(
Array("with.deeper.entry", "with.deep.entry"))
assertThat(usedFields, is(Array(1, 0, 2)))
assertThat(usedNestedFields, is(expected))
private def buildRexProgramWithDeepNesting(): RexProgram = {
// person input
val passportRow = inputOf(typeFactory)
.field("id", VARCHAR)
.field("status", VARCHAR)
val personRow = inputOf(typeFactory)
.field("name", VARCHAR)
.field("age", INTEGER)
.nestedField("passport", passportRow)
// payment input
val paymentRow = inputOf(typeFactory)
.field("id", BIGINT)
.field("amount", INTEGER)
// deep field input
val deepRowType = inputOf(typeFactory)
.field("entry", VARCHAR)
val entryRowType = inputOf(typeFactory)
.nestedField("inside", deepRowType)
val deeperRowType = inputOf(typeFactory)
.nestedField("entry", entryRowType)
val withRowType = inputOf(typeFactory)
.nestedField("deep", deepRowType)
.nestedField("deeper", deeperRowType)
val fieldRowType = inputOf(typeFactory)
.nestedField("with", withRowType)
// main input
val inputRowType = inputOf(typeFactory)
.nestedField("persons", personRow)
.nestedField("payments", paymentRow)
.nestedField("field", fieldRowType)
// inputRowType
// [ persons: [ name: VARCHAR, age: INT, passport: [id: VARCHAR, status: VARCHAR ] ],
// payments: [ id: BIGINT, amount: INT ],
// field: [ with: [ deep: [ entry: VARCHAR ],
// deeper: [ entry: [ inside: [entry: VARCHAR ] ] ]
// ] ]
// ]
val builder = new RexProgramBuilder(inputRowType, rexBuilder)
val t0 = rexBuilder.makeInputRef(personRow, 0)
val t1 = rexBuilder.makeInputRef(paymentRow, 1)
val t2 = rexBuilder.makeInputRef(fieldRowType, 2)
val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(10L))
// person
val person$pass = rexBuilder.makeFieldAccess(t0, "passport", false)
val person$pass$stat = rexBuilder.makeFieldAccess(person$pass, "status", false)
// payment
val pay$amount = rexBuilder.makeFieldAccess(t1, "amount", false)
val multiplyAmount = builder.addExpr(
rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, pay$amount, t3))
// field
val field$with = rexBuilder.makeFieldAccess(t2, "with", false)
val field$with$deep = rexBuilder.makeFieldAccess(field$with, "deep", false)
val field$with$deeper = rexBuilder.makeFieldAccess(field$with, "deeper", false)
val field$with$deep$entry = rexBuilder.makeFieldAccess(field$with$deep, "entry", false)
val field$with$deeper$entry = rexBuilder.makeFieldAccess(field$with$deeper, "entry", false)
val field$with$deeper$entry$inside = rexBuilder
.makeFieldAccess(field$with$deeper$entry, "inside", false)
val field$with$deeper$entry$inside$entry = rexBuilder
.makeFieldAccess(field$with$deeper$entry$inside, "entry", false)
builder.addProject(multiplyAmount, "amount")
builder.addProject(person$pass$stat, "status")
builder.addProject(field$with$deep$entry, "entry")
builder.addProject(field$with$deeper$entry$inside$entry, "entry")
builder.addProject(field$with$deeper$entry, "entry2")
builder.addProject(t0, "person")
// Program
// (
// payments.amount * 10),
// persons.passport.status,
// field.with.deep.entry
// field.with.deeper.entry.inside.entry
// field.with.deeper.entry
// persons
// )
private def buildRexProgramWithNesting(): RexProgram = {
val personRow = inputOf(typeFactory)
.field("name", INTEGER)
.field("age", VARCHAR)
val paymentRow = inputOf(typeFactory)
.field("id", BIGINT)
.field("amount", INTEGER)
val types = List(personRow, paymentRow).asJava
val names = List("persons", "payments").asJava
val inputRowType = typeFactory.createStructType(types, names)
val builder = new RexProgramBuilder(inputRowType, rexBuilder)
val t0 = rexBuilder.makeInputRef(types.get(0), 0)
val t1 = rexBuilder.makeInputRef(types.get(1), 1)
val t2 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
val payment$amount = rexBuilder.makeFieldAccess(t1, "amount", false)
builder.addProject(payment$amount, "amount")
builder.addProject(t0, "persons")
builder.addProject(t2, "number")
private def testExtractSinglePostfixCondition(
fieldIndex: Integer,
op: SqlPostfixOperator,
package org.apache.flink.table.utils
import org.apache.calcite.adapter.java.JavaTypeFactory
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql.`type`.SqlTypeName
import scala.collection.JavaConverters._
import scala.collection.mutable
class InputTypeBuilder(typeFactory: JavaTypeFactory) {
private val names = mutable.ListBuffer[String]()
private val types = mutable.ListBuffer[RelDataType]()
def field(name: String, `type`: SqlTypeName): InputTypeBuilder = {
names += name
types += typeFactory.createSqlType(`type`)
def nestedField(name: String, `type`: RelDataType): InputTypeBuilder = {
names += name
types += `type`
def build: RelDataType = {
typeFactory.createStructType(types.asJava, names.asJava)
object InputTypeBuilder {
def inputOf(typeFactory: JavaTypeFactory) = new InputTypeBuilder(typeFactory)
