提交 427dfe42 编写于 作者: 军长 提交者: Fabian Hueske

[FLINK-7410] [table] Use UserDefinedFunction.toString() to display operator names of UDFs.

This closes #4624.
上级 dccdba19
......@@ -331,7 +331,9 @@ abstract class TableEnvironment(val config: TableConfig) {
functionCatalog.registerFunction(name, function.getClass)
// register in SQL API
functionCatalog.registerSqlFunction(createScalarSqlFunction(name, function, typeFactory))
functionCatalog.registerSqlFunction(
createScalarSqlFunction(name, name, function, typeFactory)
)
}
/**
......@@ -355,7 +357,7 @@ abstract class TableEnvironment(val config: TableConfig) {
functionCatalog.registerFunction(name, function.getClass)
// register in SQL API
val sqlFunction = createTableSqlFunction(name, function, typeInfo, typeFactory)
val sqlFunction = createTableSqlFunction(name, name, function, typeInfo, typeFactory)
functionCatalog.registerSqlFunction(sqlFunction)
}
......@@ -383,6 +385,7 @@ abstract class TableEnvironment(val config: TableConfig) {
// register in SQL API
val sqlFunctions = createAggregateSqlFunction(
name,
name,
function,
resultTypeInfo,
......
......@@ -261,7 +261,8 @@ case class AggFunctionCall(
override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
AggSqlFunction(
aggregateFunction.getClass.getSimpleName,
aggregateFunction.functionIdentifier,
aggregateFunction.toString,
aggregateFunction,
resultType,
accTypeInfo,
......
......@@ -272,6 +272,7 @@ case class ScalarFunctionCall(
relBuilder.call(
createScalarSqlFunction(
scalarFunction.functionIdentifier,
scalarFunction.toString,
scalarFunction,
typeFactory),
parameters.map(_.toRexNode): _*)
......
......@@ -56,8 +56,6 @@ abstract class ScalarFunction extends UserDefinedFunction {
ScalarFunctionCall(this, params)
}
override def toString: String = getClass.getCanonicalName
// ----------------------------------------------------------------------------------------------
/**
......
......@@ -81,8 +81,6 @@ import org.apache.flink.util.Collector
*/
abstract class TableFunction[T] extends UserDefinedFunction {
override def toString: String = getClass.getCanonicalName
// ----------------------------------------------------------------------------------------------
/**
......
......@@ -41,7 +41,7 @@ abstract class UserDefinedFunction extends Serializable {
def close(): Unit = {}
/**
* @return true iff a call to this function is guaranteed to always return
* @return true if and only if a call to this function is guaranteed to always return
* the same result given the same parameters; true is assumed by default
* if user's function is not pure functional, like random(), date(), now()...
* isDeterministic must return false
......@@ -52,4 +52,10 @@ abstract class UserDefinedFunction extends Serializable {
val md5 = DigestUtils.md5Hex(serialize(this))
getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
}
/**
* Returns the name of the UDF that is used for plan explain and logging.
*/
override def toString: String = getClass.getSimpleName
}
......@@ -35,6 +35,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
* Calcite wrapper for user-defined aggregate functions.
*
* @param name function name (used by SQL parser)
* @param displayName name to be displayed in operator name
* @param aggregateFunction aggregate function to be called
* @param returnType the type information of returned value
* @param accType the type information of the accumulator
......@@ -42,6 +43,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
*/
class AggSqlFunction(
name: String,
displayName: String,
aggregateFunction: AggregateFunction[_, _],
val returnType: TypeInformation[_],
val accType: TypeInformation[_],
......@@ -62,19 +64,29 @@ class AggSqlFunction(
def getFunction: AggregateFunction[_, _] = aggregateFunction
override def isDeterministic: Boolean = aggregateFunction.isDeterministic
override def toString: String = displayName
}
object AggSqlFunction {
def apply(
name: String,
displayName: String,
aggregateFunction: AggregateFunction[_, _],
returnType: TypeInformation[_],
accType: TypeInformation[_],
typeFactory: FlinkTypeFactory,
requiresOver: Boolean): AggSqlFunction = {
new AggSqlFunction(name, aggregateFunction, returnType, accType, typeFactory, requiresOver)
new AggSqlFunction(
name,
displayName,
aggregateFunction,
returnType,
accType,
typeFactory,
requiresOver)
}
private[flink] def createOperandTypeInference(
......
......@@ -35,11 +35,13 @@ import scala.collection.JavaConverters._
* Calcite wrapper for user-defined scalar functions.
*
* @param name function name (used by SQL parser)
* @param displayName name to be displayed in operator name
* @param scalarFunction scalar function to be called
* @param typeFactory type factory for converting Flink's between Calcite's types
*/
class ScalarSqlFunction(
name: String,
displayName: String,
scalarFunction: ScalarFunction,
typeFactory: FlinkTypeFactory)
extends SqlFunction(
......@@ -53,6 +55,8 @@ class ScalarSqlFunction(
def getScalarFunction = scalarFunction
override def isDeterministic: Boolean = scalarFunction.isDeterministic
override def toString: String = displayName
}
object ScalarSqlFunction {
......
......@@ -37,6 +37,7 @@ import org.apache.flink.table.functions.utils.TableSqlFunction._
*/
class TableSqlFunction(
name: String,
displayName: String,
tableFunction: TableFunction[_],
rowTypeInfo: TypeInformation[_],
typeFactory: FlinkTypeFactory,
......@@ -66,6 +67,8 @@ class TableSqlFunction(
def getPojoFieldMapping: Array[Int] = functionImpl.fieldIndexes
override def isDeterministic: Boolean = tableFunction.isDeterministic
override def toString: String = displayName
}
object TableSqlFunction {
......
......@@ -251,10 +251,11 @@ object UserDefinedFunctionUtils {
*/
def createScalarSqlFunction(
name: String,
displayName: String,
function: ScalarFunction,
typeFactory: FlinkTypeFactory)
: SqlFunction = {
new ScalarSqlFunction(name, function, typeFactory)
new ScalarSqlFunction(name, displayName, function, typeFactory)
}
/**
......@@ -268,13 +269,14 @@ object UserDefinedFunctionUtils {
*/
def createTableSqlFunction(
name: String,
displayName: String,
tableFunction: TableFunction[_],
resultType: TypeInformation[_],
typeFactory: FlinkTypeFactory)
: SqlFunction = {
val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames)
new TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
new TableSqlFunction(name, displayName, tableFunction, resultType, typeFactory, function)
}
/**
......@@ -287,6 +289,7 @@ object UserDefinedFunctionUtils {
*/
def createAggregateSqlFunction(
name: String,
displayName: String,
aggFunction: AggregateFunction[_, _],
resultType: TypeInformation[_],
accTypeInfo: TypeInformation[_],
......@@ -297,6 +300,7 @@ object UserDefinedFunctionUtils {
AggSqlFunction(
name,
displayName,
aggFunction,
resultType,
accTypeInfo,
......
......@@ -728,6 +728,7 @@ case class LogicalTableFunctionCall(
val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
val sqlFunction = new TableSqlFunction(
tableFunction.functionIdentifier,
tableFunction.toString,
tableFunction,
resultType,
typeFactory,
......
......@@ -179,21 +179,29 @@ trait CommonCorrelate {
}
private[flink] def selectToString(rowType: RelDataType): String = {
rowType.getFieldNames.asScala.mkString(",")
rowType.getFieldNames.asScala.mkString(", ")
}
private[flink] def correlateOpName(
inputType: RelDataType,
rexCall: RexCall,
sqlFunction: TableSqlFunction,
rowType: RelDataType)
rowType: RelDataType,
expression: (RexNode, List[String], Option[List[RexNode]]) => String)
: String = {
s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, expression)}," +
s" select: ${selectToString(rowType)}"
}
private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
val udtfName = sqlFunction.getName
val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
private[flink] def correlateToString(
inputType: RelDataType,
rexCall: RexCall,
sqlFunction: TableSqlFunction,
expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
val inFields = inputType.getFieldNames.asScala.toList
val udtfName = sqlFunction.toString
val operands = rexCall.getOperands.asScala.map(expression(_, inFields, None)).mkString(", ")
s"table($udtfName($operands))"
}
......
......@@ -76,7 +76,7 @@ class DataSetCorrelate(
override def toString: String = {
val rexCall = scan.getCall.asInstanceOf[RexCall]
val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
correlateToString(rexCall, sqlFunction)
correlateToString(joinRowType, rexCall, sqlFunction, getExpressionString)
}
override def explainTerms(pw: RelWriter): RelWriter = {
......@@ -84,7 +84,11 @@ class DataSetCorrelate(
val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
super.explainTerms(pw)
.item("invocation", scan.getCall)
.item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
.item("correlate", correlateToString(
inputNode.getRowType,
rexCall, sqlFunction,
getExpressionString))
.item("select", selectToString(relRowType))
.item("rowType", relRowType)
.item("joinType", joinType)
.itemIf("condition", condition.orNull, condition.isDefined)
......@@ -103,8 +107,6 @@ class DataSetCorrelate(
val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
val flatMap = generateFunction(
config,
new RowSchema(getInput.getRowType),
......@@ -131,6 +133,14 @@ class DataSetCorrelate(
collector.code,
flatMap.returnType)
inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
inputDS
.flatMap(mapFunc)
.name(correlateOpName(
inputNode.getRowType,
rexCall,
sqlFunction,
relRowType,
getExpressionString)
)
}
}
......@@ -69,7 +69,7 @@ class DataStreamCorrelate(
override def toString: String = {
val rexCall = scan.getCall.asInstanceOf[RexCall]
val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
correlateToString(rexCall, sqlFunction)
correlateToString(inputSchema.relDataType, rexCall, sqlFunction, getExpressionString)
}
override def explainTerms(pw: RelWriter): RelWriter = {
......@@ -77,7 +77,11 @@ class DataStreamCorrelate(
val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
super.explainTerms(pw)
.item("invocation", scan.getCall)
.item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
.item("correlate", correlateToString(
inputSchema.relDataType,
rexCall, sqlFunction,
getExpressionString))
.item("select", selectToString(schema.relDataType))
.item("rowType", schema.relDataType)
.item("joinType", joinType)
.itemIf("condition", condition.orNull, condition.isDefined)
......@@ -130,7 +134,13 @@ class DataStreamCorrelate(
.process(processFunc)
// preserve input parallelism to ensure that acc and retract messages remain in order
.setParallelism(inputParallelism)
.name(correlateOpName(rexCall, sqlFunction, schema.relDataType))
.name(correlateOpName(
inputSchema.relDataType,
rexCall,
sqlFunction,
schema.relDataType,
getExpressionString)
)
}
}
......@@ -83,6 +83,7 @@ class LogicalUnnestRule(
// create table function
val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunction(
"explode",
"explode",
ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo),
FlinkTypeFactory.toTypeInfo(arrayType.getComponentType),
......
......@@ -110,6 +110,11 @@ public class JavaUserDefinedAggFunctions {
acc.sum += a.sum;
}
}
@Override
public String toString() {
return "myWeightedAvg";
}
}
/**
......
......@@ -238,7 +238,7 @@ class TableSourceTest extends TableTestBase {
Array("name", "id", "amount", "price"),
"'amount > 2"),
term("select", "price", "id", "amount"),
term("where", s"<(${func.functionIdentifier}(amount), 32)")
term("where", s"<(${Func0.getClass.getSimpleName}(amount), 32)")
)
util.verifyTable(result, expected)
......
......@@ -42,7 +42,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func1($cor0.c)"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1($$cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......@@ -62,7 +63,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func1($cor0.c, '$')"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1($$cor0.c, '$$'))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......@@ -88,7 +90,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func1($cor0.c)"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1($$cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "LEFT")
......@@ -114,7 +117,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func2($cor0.c)"),
term("function", func2.getClass.getCanonicalName),
term("correlate", s"table(func2($$cor0.c))"),
term("select", "a", "b", "c", "f0", "f1"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
"VARCHAR(65536) f0, INTEGER f1)"),
......@@ -141,7 +145,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "hierarchy($cor0.c)"),
term("function", function.getClass.getCanonicalName),
term("correlate", s"table(hierarchy($$cor0.c))"),
term("select", "a", "b", "c", "f0", "f1", "f2"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
" VARCHAR(65536) f0, BOOLEAN f1, INTEGER f2)"),
......@@ -168,7 +173,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "pojo($cor0.c)"),
term("function", function.getClass.getCanonicalName),
term("correlate", s"table(pojo($$cor0.c))"),
term("select", "a", "b", "c", "age", "name"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
" INTEGER age, VARCHAR(65536) name)"),
......@@ -196,7 +202,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func2($cor0.c)"),
term("function", func2.getClass.getCanonicalName),
term("correlate", s"table(func2($$cor0.c))"),
term("select", "a", "b", "c", "f0", "f1"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
"VARCHAR(65536) f0, INTEGER f1)"),
......@@ -224,7 +231,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1(SUBSTRING($$cor0.c, 2)))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......@@ -250,7 +258,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func1('hello', 'world', $cor0.c)"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1('hello', 'world', $$cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......@@ -272,7 +281,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func2('hello', 'world', $cor0.c)"),
term("function", func2.getClass.getCanonicalName),
term("correlate", s"table(func2('hello', 'world', $$cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......
......@@ -88,10 +88,10 @@ class CalcTest extends TableTestBase {
"DataSetCalc",
batchTableNode(0),
term("select",
s"${giveMeCaseClass.functionIdentifier}().my AS _c0",
s"${giveMeCaseClass.functionIdentifier}().clazz AS _c1",
s"${giveMeCaseClass.functionIdentifier}().my AS _c2",
s"${giveMeCaseClass.functionIdentifier}().clazz AS _c3"
"giveMeCaseClass$().my AS _c0",
"giveMeCaseClass$().clazz AS _c1",
"giveMeCaseClass$().my AS _c2",
"giveMeCaseClass$().clazz AS _c3"
)
)
......@@ -171,7 +171,7 @@ class CalcTest extends TableTestBase {
val expected = unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", s"${MyHashCode.functionIdentifier}(c) AS _c0", "b")
term("select", "MyHashCode$(c) AS _c0", "b")
)
util.verifyTable(resultTable, expected)
......@@ -283,7 +283,7 @@ class CalcTest extends TableTestBase {
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a", "c", s"${MyHashCode.functionIdentifier}(c) AS k")
term("select", "a", "c", "MyHashCode$(c) AS k")
),
term("groupBy", "k"),
term("select", "k", "SUM(a) AS TMP_0")
......
......@@ -21,7 +21,6 @@ package org.apache.flink.table.api.batch.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.runtime.utils._
import org.apache.flink.table.utils.{TableFunc1, TableTestBase}
import org.junit.Test
......@@ -41,7 +40,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2)"),
term("function", function),
term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
term("select", "a", "b", "c", "s"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
term("joinType", "INNER")
......@@ -61,7 +61,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
term("function", function),
term("correlate", s"table(${function.getClass.getSimpleName}(c, '$$'))"),
term("select", "a", "b", "c", "s"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
term("joinType", "INNER")
......@@ -86,7 +87,8 @@ class CorrelateTest extends TableTestBase {
"DataSetCorrelate",
batchTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2)"),
term("function", function),
term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
term("select", "a", "b", "c", "s"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
term("joinType", "LEFT")
......
......@@ -71,7 +71,7 @@ class GroupWindowTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "string"),
term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
)
util.verifyTable(windowedTable, expected)
......@@ -212,7 +212,7 @@ class GroupWindowTest extends TableTestBase {
term("groupBy", "string"),
term("window",
SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
)
util.verifyTable(windowedTable, expected)
......@@ -310,7 +310,7 @@ class GroupWindowTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "string"),
term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
)
util.verifyTable(windowedTable, expected)
......
......@@ -42,7 +42,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func1($cor0.c)"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1($$cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......@@ -62,7 +63,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func1($cor0.c, '$')"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1($$cor0.c, '$$'))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......@@ -88,7 +90,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func1($cor0.c)"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1($$cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "LEFT")
......@@ -114,7 +117,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func2($cor0.c)"),
term("function", func2.getClass.getCanonicalName),
term("correlate", s"table(func2($$cor0.c))"),
term("select", "a", "b", "c", "f0", "f1"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
"VARCHAR(65536) f0, INTEGER f1)"),
......@@ -141,7 +145,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "hierarchy($cor0.c)"),
term("function", function.getClass.getCanonicalName),
term("correlate", s"table(hierarchy($$cor0.c))"),
term("select", "a", "b", "c", "f0", "f1", "f2"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
" VARCHAR(65536) f0, BOOLEAN f1, INTEGER f2)"),
......@@ -168,7 +173,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "pojo($cor0.c)"),
term("function", function.getClass.getCanonicalName),
term("correlate", s"table(pojo($$cor0.c))"),
term("select", "a", "b", "c", "age", "name"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
" INTEGER age, VARCHAR(65536) name)"),
......@@ -196,7 +202,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func2($cor0.c)"),
term("function", func2.getClass.getCanonicalName),
term("correlate", s"table(func2($$cor0.c))"),
term("select", "a", "b", "c", "f0", "f1"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
"VARCHAR(65536) f0, INTEGER f1)"),
......@@ -224,7 +231,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1(SUBSTRING($$cor0.c, 2)))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......@@ -250,7 +258,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func1('hello', 'world', $cor0.c)"),
term("function", func1.getClass.getCanonicalName),
term("correlate", s"table(func1('hello', 'world', $$cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......@@ -272,7 +281,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", "func2('hello', 'world', $cor0.c)"),
term("function", func2.getClass.getCanonicalName),
term("correlate", s"table(func2('hello', 'world', $$cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType", "INNER")
......
......@@ -19,8 +19,8 @@ package org.apache.flink.table.api.stream.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.utils.Func13
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.runtime.utils._
import org.apache.flink.table.utils._
import org.junit.Test
......@@ -40,7 +40,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2)"),
term("function", function),
term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
term("select", "a", "b", "c", "s"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
term("joinType", "INNER")
......@@ -60,7 +61,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
term("function", function),
term("correlate", s"table(${function.getClass.getSimpleName}(c, '$$'))"),
term("select", "a", "b", "c", "s"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
term("joinType", "INNER")
......@@ -85,7 +87,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2)"),
term("function", function),
term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
term("select", "a", "b", "c", "s"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
term("joinType", "LEFT")
......@@ -101,16 +104,19 @@ class CorrelateTest extends TableTestBase {
val util = streamTestUtil()
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = util.addFunction("func2", new TableFunc2)
val scalarFunc = new Func13("pre")
val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len)
val result = table.join(function(scalarFunc('c)) as ('name, 'len)).select('c, 'name, 'len)
val expected = unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2)"),
term("function", function),
term("invocation",
s"${function.functionIdentifier}(${scalarFunc.functionIdentifier}($$2))"),
term("correlate", s"table(${function.getClass.getSimpleName}(Func13(c)))"),
term("select", "a", "b", "c", "name", "len"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
"VARCHAR(65536) name, INTEGER len)"),
......@@ -134,7 +140,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2)"),
term("function", function),
term("correlate", "table(HierarchyTableFunction(c))"),
term("select", "a", "b", "c", "name", "adult", "len"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
" VARCHAR(65536) name, BOOLEAN adult, INTEGER len)"),
......@@ -156,7 +163,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2)"),
term("function", function),
term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
term("select", "a", "b", "c", "age", "name"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
"INTEGER age, VARCHAR(65536) name)"),
......@@ -183,7 +191,8 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", s"${function.functionIdentifier}($$2)"),
term("function", function),
term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
term("select", "a", "b", "c", "name", "len"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
"VARCHAR(65536) name, INTEGER len)"),
......@@ -208,7 +217,9 @@ class CorrelateTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation", s"${function.functionIdentifier}(SUBSTRING($$2, 2, CHAR_LENGTH($$2)))"),
term("function", function),
term("correlate",
s"table(${function.getClass.getSimpleName}(SUBSTRING(c, 2, CHAR_LENGTH(c))))"),
term("select", "a", "b", "c", "s"),
term("rowType",
"RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
term("joinType", "INNER")
......
......@@ -181,7 +181,7 @@ class GroupWindowTest extends TableTestBase {
WindowReference("w"),
'rowtime,
5.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
)
util.verifyTable(windowedTable, expected)
......@@ -319,7 +319,7 @@ class GroupWindowTest extends TableTestBase {
streamTableNode(0),
term("groupBy", "string"),
term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
)
util.verifyTable(windowedTable, expected)
......@@ -363,7 +363,7 @@ class GroupWindowTest extends TableTestBase {
streamTableNode(0),
term("groupBy", "string"),
term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
)
util.verifyTable(windowedTable, expected)
......
......@@ -23,7 +23,6 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.utils.Func1
import org.apache.flink.table.api.Table
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.StreamTableTestUtil
import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
import org.junit.Test
......@@ -65,7 +64,7 @@ class OverWindowTest extends TableTestBase {
"WeightedAvgWithRetract(c, a) AS w0$o2")
),
term("select",
s"${plusOne.functionIdentifier}(w0$$o0) AS d",
s"Func1$$(w0$$o0) AS d",
"EXP(CAST(w0$o1)) AS _c1",
"+(w0$o2, 1) AS _c2",
"||('AVG:', CAST(w0$o2)) AS _c3",
......
......@@ -491,7 +491,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
"DataStreamCalc",
streamTableNode(0),
term("select", "a", "b", "c"),
term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())")
term("where", s"IS NULL(NonDeterministicNullFunc$$())")
)
util.verifyTable(result, expected)
......
......@@ -160,7 +160,8 @@ class TimeIndicatorConversionTest extends TableTestBase {
streamTableNode(0),
term("invocation",
s"${func.functionIdentifier}(CAST($$0):TIMESTAMP(3) NOT NULL, PROCTIME($$3), '')"),
term("function", func),
term("correlate", s"table(TableFunc(CAST(rowtime), PROCTIME(proctime), ''))"),
term("select", "rowtime", "long", "int", "proctime", "s"),
term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, " +
"TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(65536) s)"),
term("joinType", "INNER")
......
......@@ -284,7 +284,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val func1 = new Func13("Sunny")
val func2 = new Func13("kevin2")
val result = t.select(func0('c), func1('c),func2('c))
val result = t.select(func0('c), func1('c), func2('c))
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册