提交 b24e4bcb 编写于 作者: B bowen.li

[FLINK-14216][table] introduce temp system functions and temp functions to FunctionCatalog

adapt existing APIs to the introduction of temporary system and temp functions according to FLIP-57.

This closes #9822.
上级 4fe51c19
......@@ -147,7 +147,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction);
functionCatalog.registerTableFunction(
functionCatalog.registerTempSystemTableFunction(
name,
tableFunction,
typeInfo
......@@ -160,7 +160,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
functionCatalog.registerAggregateFunction(
functionCatalog.registerTempSystemAggregateFunction(
name,
aggregateFunction,
typeInfo,
......@@ -175,7 +175,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
functionCatalog.registerAggregateFunction(
functionCatalog.registerTempSystemAggregateFunction(
name,
tableAggregateFunction,
typeInfo,
......
......@@ -163,7 +163,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
@Override
public void registerFunction(String name, ScalarFunction function) {
functionCatalog.registerScalarFunction(
functionCatalog.registerTempSystemScalarFunction(
name,
function);
}
......
......@@ -58,9 +58,8 @@ public class FunctionCatalog implements FunctionLookup {
private final CatalogManager catalogManager;
// For simplicity, currently hold registered Flink functions in memory here
// TODO: should move to catalog
private final Map<String, FunctionDefinition> userFunctions = new LinkedHashMap<>();
private final Map<String, FunctionDefinition> tempSystemFunctions = new LinkedHashMap<>();
private final Map<ObjectIdentifier, FunctionDefinition> tempCatalogFunctions = new LinkedHashMap<>();
/**
* Temporary utility until the new type inference is fully functional. It needs to be set by the planner.
......@@ -75,15 +74,15 @@ public class FunctionCatalog implements FunctionLookup {
this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
}
public void registerScalarFunction(String name, ScalarFunction function) {
public void registerTempSystemScalarFunction(String name, ScalarFunction function) {
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
registerFunction(
registerTempSystemFunction(
name,
new ScalarFunctionDefinition(name, function)
);
}
public <T> void registerTableFunction(
public <T> void registerTempSystemTableFunction(
String name,
TableFunction<T> function,
TypeInformation<T> resultType) {
......@@ -92,7 +91,7 @@ public class FunctionCatalog implements FunctionLookup {
// check if class could be instantiated
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
registerFunction(
registerTempSystemFunction(
name,
new TableFunctionDefinition(
name,
......@@ -101,7 +100,7 @@ public class FunctionCatalog implements FunctionLookup {
);
}
public <T, ACC> void registerAggregateFunction(
public <T, ACC> void registerTempSystemAggregateFunction(
String name,
UserDefinedAggregateFunction<T, ACC> function,
TypeInformation<T> resultType,
......@@ -128,12 +127,71 @@ public class FunctionCatalog implements FunctionLookup {
throw new TableException("Unknown function class: " + function.getClass());
}
registerFunction(
registerTempSystemFunction(
name,
definition
);
}
public void registerTempCatalogScalarFunction(ObjectIdentifier oi, ScalarFunction function) {
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
registerTempCatalogFunction(
oi,
new ScalarFunctionDefinition(oi.getObjectName(), function)
);
}
public <T> void registerTempCatalogTableFunction(
ObjectIdentifier oi,
TableFunction<T> function,
TypeInformation<T> resultType) {
// check if class not Scala object
UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
// check if class could be instantiated
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
registerTempCatalogFunction(
oi,
new TableFunctionDefinition(
oi.getObjectName(),
function,
resultType)
);
}
public <T, ACC> void registerTempCatalogAggregateFunction(
ObjectIdentifier oi,
UserDefinedAggregateFunction<T, ACC> function,
TypeInformation<T> resultType,
TypeInformation<ACC> accType) {
// check if class not Scala object
UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
// check if class could be instantiated
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
final FunctionDefinition definition;
if (function instanceof AggregateFunction) {
definition = new AggregateFunctionDefinition(
oi.getObjectName(),
(AggregateFunction<?, ?>) function,
resultType,
accType);
} else if (function instanceof TableAggregateFunction) {
definition = new TableAggregateFunctionDefinition(
oi.getObjectName(),
(TableAggregateFunction<?, ?>) function,
resultType,
accType);
} else {
throw new TableException("Unknown function class: " + function.getClass());
}
registerTempCatalogFunction(
oi,
definition
);
}
public String[] getUserDefinedFunctions() {
return getUserDefinedFunctionNames().toArray(new String[0]);
}
......@@ -165,7 +223,7 @@ public class FunctionCatalog implements FunctionLookup {
// Get functions registered in memory
result.addAll(
userFunctions.values().stream()
tempSystemFunctions.values().stream()
.map(FunctionDefinition::toString)
.collect(Collectors.toSet()));
......@@ -204,7 +262,7 @@ public class FunctionCatalog implements FunctionLookup {
}
// If no corresponding function is found in catalog, check in-memory functions
userCandidate = userFunctions.get(functionName);
userCandidate = tempSystemFunctions.get(functionName);
final Optional<FunctionDefinition> foundDefinition;
if (userCandidate != null) {
......@@ -240,13 +298,24 @@ public class FunctionCatalog implements FunctionLookup {
return plannerTypeInferenceUtil;
}
private void registerFunction(String name, FunctionDefinition functionDefinition) {
// TODO: should register to catalog
userFunctions.put(normalizeName(name), functionDefinition);
private void registerTempSystemFunction(String name, FunctionDefinition functionDefinition) {
tempSystemFunctions.put(normalizeName(name), functionDefinition);
}
private void registerTempCatalogFunction(ObjectIdentifier oi, FunctionDefinition functionDefinition) {
tempCatalogFunctions.put(normalizeObjectIdentifier(oi), functionDefinition);
}
@VisibleForTesting
static String normalizeName(String name) {
return name.toUpperCase();
}
@VisibleForTesting
static ObjectIdentifier normalizeObjectIdentifier(ObjectIdentifier oi) {
return ObjectIdentifier.of(
oi.getCatalogName(),
oi.getDatabaseName(),
oi.getObjectName().toUpperCase());
}
}
......@@ -137,7 +137,7 @@ class StreamTableEnvironmentImpl (
override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
val typeInfo = UserFunctionsTypeHelper
.getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]])
functionCatalog.registerTableFunction(
functionCatalog.registerTempSystemTableFunction(
name,
tf,
typeInfo
......@@ -152,7 +152,7 @@ class StreamTableEnvironmentImpl (
.getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
val accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
functionCatalog.registerAggregateFunction(
functionCatalog.registerTempSystemAggregateFunction(
name,
f,
typeInfo,
......@@ -168,7 +168,7 @@ class StreamTableEnvironmentImpl (
.getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
val accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
functionCatalog.registerAggregateFunction(
functionCatalog.registerTempSystemAggregateFunction(
name,
f,
typeInfo,
......
......@@ -696,7 +696,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
@Test
def testExtractWithUdf(): Unit = {
functionCatalog.registerScalarFunction("myUdf", Func1)
functionCatalog.registerTempSystemScalarFunction("myUdf", Func1)
// amount
val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
// my_udf(amount)
......
......@@ -912,7 +912,7 @@ class TestingTableEnvironment private(
def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
val typeInfo = UserFunctionsTypeHelper
.getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]])
functionCatalog.registerTableFunction(
functionCatalog.registerTempSystemTableFunction(
name,
tf,
typeInfo
......@@ -944,7 +944,7 @@ class TestingTableEnvironment private(
.getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
val accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
functionCatalog.registerAggregateFunction(
functionCatalog.registerTempSystemAggregateFunction(
name,
f,
typeInfo,
......
......@@ -99,7 +99,7 @@ abstract class TableEnvImpl(
private def isBatchTable: Boolean = !isStreamingMode
override def registerFunction(name: String, function: ScalarFunction): Unit = {
functionCatalog.registerScalarFunction(
functionCatalog.registerTempSystemScalarFunction(
name,
function)
}
......@@ -117,7 +117,7 @@ abstract class TableEnvImpl(
function,
implicitly[TypeInformation[T]])
functionCatalog.registerTableFunction(
functionCatalog.registerTempSystemTableFunction(
name,
function,
resultTypeInfo)
......@@ -141,7 +141,7 @@ abstract class TableEnvImpl(
function,
implicitly[TypeInformation[ACC]])
functionCatalog.registerAggregateFunction(
functionCatalog.registerTempSystemAggregateFunction(
name,
function,
resultTypeInfo,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册