提交 79c17afa 编写于 作者: J Jark Wu 提交者: twalthr

[FLINK-7439] [table] Support variable arguments for UDTF in SQL

This closes #4536.
上级 142bde0e
......@@ -44,7 +44,7 @@ Scalar Functions
If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
In order to define a scalar function one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`.
In order to define a scalar function one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`. Evaluation methods can also support variable arguments, such as `eval(String... strs)`.
The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
......@@ -139,7 +139,7 @@ Table Functions
Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input parameters. However in contrast to a scalar function, it can return an arbitrary number of rows as output instead of a single value. The returned rows may consist of one or more columns.
In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table function. The type of the returned table is determined by the generic type of `TableFunction`. Evaluation methods emit output rows using the protected `collect(T)` method.
In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table function. Evaluation methods can also support variable arguments, such as `eval(String... strs)`. The type of the returned table is determined by the generic type of `TableFunction`. Evaluation methods emit output rows using the protected `collect(T)` method.
In the Table API, a table function is used with `.join(Expression)` or `.leftOuterJoin(Expression)` for Scala users and `.join(String)` or `.leftOuterJoin(String)` for Java users. The `join` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoin` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table. In SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join condition (see examples below).
......@@ -297,7 +297,7 @@ optionally implemented. While some of these methods allow the system more effici
- `merge()` is required for many batch aggreagtions and session window aggregations.
- `resetAccumulator()` is required for many batch aggregations.
All methods of `AggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are defined in the `AggregateFunction` abstract class, while others are contracted methods. In order to define a table function, one has to extend the base class `org.apache.flink.table.functions.AggregateFunction` and implement one (or more) `accumulate` methods.
All methods of `AggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are defined in the `AggregateFunction` abstract class, while others are contracted methods. In order to define a table function, one has to extend the base class `org.apache.flink.table.functions.AggregateFunction` and implement one (or more) `accumulate` methods. The method `accumulate` can be overloaded with different custom types and arguments and also support variable arguments.
Detailed documentation for all methods of `AggregateFunction` is given below.
......
......@@ -355,8 +355,8 @@ abstract class TableEnvironment(val config: TableConfig) {
functionCatalog.registerFunction(name, function.getClass)
// register in SQL API
val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory)
functionCatalog.registerSqlFunctions(sqlFunctions)
val sqlFunction = createTableSqlFunction(name, function, typeInfo, typeFactory)
functionCatalog.registerSqlFunction(sqlFunction)
}
/**
......
......@@ -26,6 +26,8 @@ import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.typeutils.TypeCheckUtils
import scala.collection.mutable
/**
* Generates a call to user-defined [[ScalarFunction]].
*
......@@ -44,21 +46,26 @@ class ScalarFunctionCallGen(
operands: Seq[GeneratedExpression])
: GeneratedExpression = {
// determine function method and result class
val matchingMethod = getUserDefinedMethod(scalarFunction, "eval", typeInfoToClass(signature))
val matchingSignature = getEvalMethodSignature(scalarFunction, signature)
.getOrElse(throw new CodeGenException("No matching signature found."))
val matchingSignature = matchingMethod.getParameterTypes
val resultClass = getResultTypeClassOfScalarFunction(scalarFunction, matchingSignature)
// zip for variable signatures
var paramToOperands = matchingSignature.zip(operands)
if (operands.length > matchingSignature.length) {
operands.drop(matchingSignature.length).foreach(op =>
paramToOperands = paramToOperands :+ (matchingSignature.last.getComponentType, op)
)
// get the expanded parameter types
var paramClasses = new mutable.ArrayBuffer[Class[_]]
for (i <- operands.indices) {
if (i < matchingSignature.length - 1) {
paramClasses += matchingSignature(i)
} else if (matchingSignature.last.isArray) {
// last argument is an array type
paramClasses += matchingSignature.last.getComponentType
} else {
// last argument is not an array type
paramClasses += matchingSignature.last
}
}
// convert parameters for function (output boxing)
val parameters = paramToOperands.map { case (paramClass, operandExpr) =>
val parameters = paramClasses.zip(operands).map { case (paramClass, operandExpr) =>
if (paramClass.isPrimitive) {
operandExpr
} else if (ClassUtils.isPrimitiveWrapper(paramClass)
......
......@@ -27,6 +27,8 @@ import org.apache.flink.table.functions.TableFunction
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.typeutils.TypeCheckUtils
import scala.collection.mutable
/**
* Generates a call to user-defined [[TableFunction]].
*
......@@ -45,20 +47,25 @@ class TableFunctionCallGen(
operands: Seq[GeneratedExpression])
: GeneratedExpression = {
// determine function method
val matchingMethod = getUserDefinedMethod(tableFunction, "eval", typeInfoToClass(signature))
val matchingSignature = getEvalMethodSignature(tableFunction, signature)
.getOrElse(throw new CodeGenException("No matching signature found."))
val matchingSignature = matchingMethod.getParameterTypes
// zip for variable signatures
var paramToOperands = matchingSignature.zip(operands)
if (operands.length > matchingSignature.length) {
operands.drop(matchingSignature.length).foreach(op =>
paramToOperands = paramToOperands :+ (matchingSignature.last.getComponentType, op)
)
// get the expanded parameter types
var paramClasses = new mutable.ArrayBuffer[Class[_]]
for (i <- operands.indices) {
if (i < matchingSignature.length - 1) {
paramClasses += matchingSignature(i)
} else if (matchingSignature.last.isArray) {
// last argument is an array type
paramClasses += matchingSignature.last.getComponentType
} else {
// last argument is not an array type
paramClasses += matchingSignature.last
}
}
// convert parameters for function (output boxing)
val parameters = paramToOperands.map { case (paramClass, operandExpr) =>
val parameters = paramClasses.zip(operands).map { case (paramClass, operandExpr) =>
if (paramClass.isPrimitive) {
operandExpr
} else if (ClassUtils.isPrimitiveWrapper(paramClass)
......
......@@ -49,7 +49,7 @@ abstract class UserDefinedFunction extends Serializable {
def isDeterministic: Boolean = true
final def functionIdentifier: String = {
val md5 = DigestUtils.md5Hex(serialize(this))
val md5 = DigestUtils.md5Hex(serialize(this))
getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
}
}
......@@ -18,20 +18,20 @@
package org.apache.flink.table.functions.utils
import com.google.common.base.Predicate
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql._
import org.apache.calcite.sql.`type`._
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
import org.apache.calcite.util.Util
import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
import scala.collection.JavaConverters._
import java.util
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.utils.TableSqlFunction._
/**
* Calcite wrapper for user-defined table functions.
......@@ -40,17 +40,14 @@ class TableSqlFunction(
name: String,
udtf: TableFunction[_],
rowTypeInfo: TypeInformation[_],
returnTypeInference: SqlReturnTypeInference,
operandTypeInference: SqlOperandTypeInference,
operandTypeChecker: SqlOperandTypeChecker,
paramTypes: util.List[RelDataType],
typeFactory: FlinkTypeFactory,
functionImpl: FlinkTableFunctionImpl[_])
extends SqlUserDefinedTableFunction(
new SqlIdentifier(name, SqlParserPos.ZERO),
returnTypeInference,
operandTypeInference,
operandTypeChecker,
paramTypes,
ReturnTypes.CURSOR,
createOperandTypeInference(name, udtf, typeFactory),
createOperandTypeChecker(name, udtf),
null,
functionImpl) {
/**
......@@ -74,48 +71,102 @@ class TableSqlFunction(
object TableSqlFunction {
/**
* Util function to create a [[TableSqlFunction]].
*
* @param name function name (used by SQL parser)
* @param udtf user-defined table function to be called
* @param rowTypeInfo the row type information generated by the table function
* @param typeFactory type factory for converting Flink's between Calcite's types
* @param functionImpl Calcite table function schema
* @return [[TableSqlFunction]]
*/
def apply(
private[flink] def createOperandTypeInference(
name: String,
udtf: TableFunction[_],
rowTypeInfo: TypeInformation[_],
typeFactory: FlinkTypeFactory,
functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
val typeFamilies: util.List[SqlTypeFamily] = new util.ArrayList[SqlTypeFamily]
// derives operands' data types and type families
functionImpl.getParameters.asScala.foreach{ o =>
val relType: RelDataType = o.getType(typeFactory)
argTypes.add(relType)
typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, SqlTypeFamily.ANY))
typeFactory: FlinkTypeFactory)
: SqlOperandTypeInference = {
/**
* Operand type inference based on [[TableFunction]] given information.
*/
new SqlOperandTypeInference {
override def inferOperandTypes(
callBinding: SqlCallBinding,
returnType: RelDataType,
operandTypes: Array[RelDataType]): Unit = {
val operandTypeInfo = getOperandTypeInfo(callBinding)
val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
.getOrElse(throw new ValidationException(
s"Given parameters of function '$name' do not match any signature. \n" +
s"Actual: ${signatureToString(operandTypeInfo)} \n" +
s"Expected: ${signaturesToString(udtf, "eval")}"))
val inferredTypes = foundSignature
.map(TypeExtractor.getForClass(_))
.map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
for (i <- operandTypes.indices) {
if (i < inferredTypes.length - 1) {
operandTypes(i) = inferredTypes(i)
} else if (null != inferredTypes.last.getComponentType) {
// last argument is a collection, the array type
operandTypes(i) = inferredTypes.last.getComponentType
} else {
operandTypes(i) = inferredTypes.last
}
}
}
}
// derives whether the 'input'th parameter of a method is optional.
val optional: Predicate[Integer] = new Predicate[Integer]() {
def apply(input: Integer): Boolean = {
functionImpl.getParameters.get(input).isOptional
}
private[flink] def createOperandTypeChecker(
name: String,
udtf: TableFunction[_])
: SqlOperandTypeChecker = {
val signatures = getMethodSignatures(udtf, "eval")
/**
* Operand type checker based on [[TableFunction]] given information.
*/
new SqlOperandTypeChecker {
override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
s"$opName[${signaturesToString(udtf, "eval")}]"
}
override def getOperandCountRange: SqlOperandCountRange = {
var min = 255
var max = -1
signatures.foreach( sig => {
var len = sig.length
if (len > 0 && sig(sig.length - 1).isArray) {
max = 254 // according to JVM spec 4.3.3
len = sig.length - 1
}
max = Math.max(len, max)
min = Math.min(len, min)
})
SqlOperandCountRanges.between(min, max)
}
override def checkOperandTypes(
callBinding: SqlCallBinding,
throwOnFailure: Boolean)
: Boolean = {
val operandTypeInfo = getOperandTypeInfo(callBinding)
val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
if (foundSignature.isEmpty) {
if (throwOnFailure) {
throw new ValidationException(
s"Given parameters of function '$name' do not match any signature. \n" +
s"Actual: ${signatureToString(operandTypeInfo)} \n" +
s"Expected: ${signaturesToString(udtf, "eval")}")
} else {
false
}
} else {
true
}
}
override def isOptional(i: Int): Boolean = false
override def getConsistency: Consistency = Consistency.NONE
}
// create type check for the operands
val typeChecker: FamilyOperandTypeChecker = OperandTypes.family(typeFamilies, optional)
new TableSqlFunction(
name,
udtf,
rowTypeInfo,
ReturnTypes.CURSOR,
InferTypes.explicit(argTypes),
typeChecker,
argTypes,
functionImpl)
}
}
......@@ -258,7 +258,7 @@ object UserDefinedFunctionUtils {
}
/**
* Create [[SqlFunction]]s for a [[TableFunction]]'s every eval method
* Create [[SqlFunction]] for a [[TableFunction]]
*
* @param name function name
* @param tableFunction table function
......@@ -266,19 +266,15 @@ object UserDefinedFunctionUtils {
* @param typeFactory type factory
* @return the TableSqlFunction
*/
def createTableSqlFunctions(
def createTableSqlFunction(
name: String,
tableFunction: TableFunction[_],
resultType: TypeInformation[_],
typeFactory: FlinkTypeFactory)
: Seq[SqlFunction] = {
: SqlFunction = {
val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
val evalMethods = checkAndExtractMethods(tableFunction, "eval")
evalMethods.map { method =>
val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, method)
TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
}
val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames)
new TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
}
/**
......
......@@ -723,10 +723,10 @@ case class LogicalTableFunctionCall(
val function = new FlinkTableFunctionImpl(
resultType,
fieldIndexes,
if (fieldNames.isEmpty) generatedNames else fieldNames, evalMethod
if (fieldNames.isEmpty) generatedNames else fieldNames
)
val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
val sqlFunction = TableSqlFunction(
val sqlFunction = new TableSqlFunction(
tableFunction.functionIdentifier,
tableFunction,
resultType,
......
......@@ -82,11 +82,11 @@ class LogicalUnnestRule(
val componentType = arrayType.getComponentType
// create table function
val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunctions(
val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunction(
"explode",
ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo),
FlinkTypeFactory.toTypeInfo(arrayType.getComponentType),
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]).head
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory])
// create table function call
val rexCall = cluster.getRexBuilder.makeCall(
......
......@@ -17,12 +17,12 @@
*/
package org.apache.flink.table.plan.schema
import java.lang.reflect.{Method, Type}
import java.lang.reflect.Type
import java.util
import java.util.Collections
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.schema.TableFunction
import org.apache.calcite.schema.impl.ReflectiveFunctionBase
import org.apache.calcite.schema.{FunctionParameter, TableFunction}
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.api.TableException
......@@ -36,10 +36,8 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
class FlinkTableFunctionImpl[T](
val typeInfo: TypeInformation[T],
val fieldIndexes: Array[Int],
val fieldNames: Array[String],
val evalMethod: Method)
extends ReflectiveFunctionBase(evalMethod)
with TableFunction {
val fieldNames: Array[String])
extends TableFunction {
if (fieldIndexes.length != fieldNames.length) {
throw new TableException(
......@@ -71,6 +69,9 @@ class FlinkTableFunctionImpl[T](
override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]
// we do never use the FunctionParameters, so return an empty list
override def getParameters: util.List[FunctionParameter] = Collections.emptyList()
override def getRowType(typeFactory: RelDataTypeFactory,
arguments: util.List[AnyRef]): RelDataType = {
val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
......
......@@ -48,22 +48,6 @@ class FunctionCatalog {
sqlFunctions += sqlFunction
}
/**
* Register multiple SQL functions at the same time. The functions have the same name.
*/
def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
if (functions.nonEmpty) {
val name = functions.head.getName
// check that all functions have the same name
if (functions.forall(_.getName == name)) {
sqlFunctions --= sqlFunctions.filter(_.getName == name)
sqlFunctions ++= functions
} else {
throw ValidationException("The SQL functions to be registered have different names.")
}
}
}
def getSqlOperatorTable: SqlOperatorTable =
ChainedSqlOperatorTable.of(
new BasicOperatorTable(),
......
......@@ -35,4 +35,21 @@ public class JavaUserDefinedTableFunctions {
collect(c);
}
}
/**
* Emit every input string.
*/
public static class JavaVarsArgTableFunc0 extends TableFunction<String> {
public void eval(String... strs) {
for (String s : strs) {
collect(s);
}
}
public void eval(int val, String str) {
for (int i = 0; i < val; i++) {
collect(str);
}
}
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.table.api.batch.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaVarsArgTableFunc0
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2, _}
import org.junit.Test
......@@ -234,4 +235,52 @@ class CorrelateTest extends TableTestBase {
util.verifySql(sqlQuery, expected)
}
@Test
def testTableFunctionWithVariableArguments(): Unit = {
val util = batchTestUtil()
val func1 = new JavaVarsArgTableFunc0
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.addFunction("func1", func1)
var sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1('hello', 'world', c)) AS T(s)"
var expected = unaryNode(
"DataSetCalc",
unaryNode(
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func1('hello', 'world', $cor0.c)"),
term("function", func1.getClass.getCanonicalName),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
),
term("select", "c", "f0 AS s")
)
util.verifySql(sqlQuery, expected)
// test scala var arg function
val func2 = new VarArgsFunc0
util.addFunction("func2", func2)
sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func2('hello', 'world', c)) AS T(s)"
expected = unaryNode(
"DataSetCalc",
unaryNode(
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func2('hello', 'world', $cor0.c)"),
term("function", func2.getClass.getCanonicalName),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
),
term("select", "c", "f0 AS s")
)
util.verifySql(sqlQuery, expected)
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaVarsArgTableFunc0
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2, _}
import org.junit.Test
......@@ -233,4 +234,52 @@ class CorrelateTest extends TableTestBase {
util.verifySql(sqlQuery, expected)
}
@Test
def testTableFunctionWithVariableArguments(): Unit = {
val util = streamTestUtil()
val func1 = new JavaVarsArgTableFunc0
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.addFunction("func1", func1)
var sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1('hello', 'world', c)) AS T(s)"
var expected = unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func1('hello', 'world', $cor0.c)"),
term("function", func1.getClass.getCanonicalName),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
),
term("select", "c", "f0 AS s")
)
util.verifySql(sqlQuery, expected)
// test scala var arg function
val func2 = new VarArgsFunc0
util.addFunction("func2", func2)
sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func2('hello', 'world', c)) AS T(s)"
expected = unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func2('hello', 'world', $cor0.c)"),
term("function", func2.getClass.getCanonicalName),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
),
term("select", "c", "f0 AS s")
)
util.verifySql(sqlQuery, expected)
}
}
......@@ -173,7 +173,7 @@ class CorrelateValidationTest extends TableTestBase {
// SQL API call
expectExceptionThrown(
util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
"No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
"Given parameters of function 'func2' do not match any signature.")
}
// ----------------------------------------------------------------------------------------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册