提交 91f00ec9 编写于 作者: X Xpray 提交者: sunjincheng121

[FLINK-8312][table] Fix ScalarFunction varargs length exceed 254

This closes #5206
上级 d74869f8
......@@ -140,7 +140,7 @@ object ScalarSqlFunction {
scalarFunction: ScalarFunction)
: SqlOperandTypeChecker = {
val signatures = getMethodSignatures(scalarFunction, "eval")
val methods = checkAndExtractMethods(scalarFunction, "eval")
/**
* Operand type checker based on [[ScalarFunction]] given information.
......@@ -151,17 +151,24 @@ object ScalarSqlFunction {
}
override def getOperandCountRange: SqlOperandCountRange = {
var min = 255
var min = 254 // according to JVM spec 4.3.3
var max = -1
signatures.foreach( sig => {
var len = sig.length
if (len > 0 && sig(sig.length - 1).isArray) {
max = 254 // according to JVM spec 4.3.3
len = sig.length - 1
var isVarargs = false
methods.foreach(
m => {
var len = m.getParameterTypes.length
if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) {
isVarargs = true
len = len - 1
}
max = Math.max(len, max)
min = Math.min(len, min)
}
max = Math.max(len, max)
min = Math.min(len, min)
})
)
if (isVarargs) {
// if eval method is varargs, set max to -1 to skip length check in Calcite
max = -1
}
SqlOperandCountRanges.between(min, max)
}
......
......@@ -26,10 +26,12 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.utils.SplitUDF
import org.apache.flink.table.expressions.utils.Func15
import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
import org.apache.flink.table.utils.MemoryTableSinkUtil
import org.junit.Assert._
import org.junit._
......@@ -516,4 +518,29 @@ class SqlITCase extends StreamingWithStateTestBase {
val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@Test
def testUDFWithLongVarargs(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear
tEnv.registerFunction("func15", Func15)
val parameters = "c," + (0 until 255).map(_ => "a").mkString(",")
val sqlQuery = s"SELECT func15($parameters) FROM T1"
val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("T1", t1)
val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
"Hi255",
"Hello255",
"Hello world255")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册