提交 bd3bb365 编写于 作者: T Timo Walther

[FLINK-20522][table] Introduce built-in runtime functions

Completes the envisioned design of FLIP-32 and FLIP-65. It makes
implementing built-in functions as easy as implementing user-defined
functions while keeping function definition and body (aka runtime
implementation) separately.

Adding a built-in function requires changes in only 2 classes:
- BuiltInFunctionDefinitions for the definition and
- the class that contains the actual runtime logic.
上级 4796b3cb
......@@ -19,6 +19,7 @@
package org.apache.flink.table.functions;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.InputTypeStrategy;
......@@ -26,7 +27,11 @@ import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.Optional;
/**
* Definition of a built-in function. It enables unique identification across different modules by
......@@ -38,7 +43,7 @@ import java.util.Arrays;
* <p>Equality is defined by reference equality.
*/
@Internal
public final class BuiltInFunctionDefinition implements FunctionDefinition {
public final class BuiltInFunctionDefinition implements SpecializedFunction {
private final String name;
......@@ -48,13 +53,20 @@ public final class BuiltInFunctionDefinition implements FunctionDefinition {
private final boolean isDeterministic;
private @Nullable String runtimeClass;
private BuiltInFunctionDefinition(
String name, FunctionKind kind, TypeInference typeInference, boolean isDeterministic) {
String name,
FunctionKind kind,
TypeInference typeInference,
boolean isDeterministic,
String runtimeClass) {
this.name = Preconditions.checkNotNull(name, "Name must not be null.");
this.kind = Preconditions.checkNotNull(kind, "Kind must not be null.");
this.typeInference =
Preconditions.checkNotNull(typeInference, "Type inference must not be null.");
this.isDeterministic = isDeterministic;
this.runtimeClass = runtimeClass;
}
/** Builder for configuring and creating instances of {@link BuiltInFunctionDefinition}. */
......@@ -66,6 +78,39 @@ public final class BuiltInFunctionDefinition implements FunctionDefinition {
return name;
}
public Optional<String> getRuntimeClass() {
return Optional.ofNullable(runtimeClass);
}
@Override
public UserDefinedFunction specialize(SpecializedContext context) {
if (runtimeClass == null) {
throw new TableException(
String.format(
"Could not find a runtime implementation for built-in function '%s'. "
+ "The planner should have provided an implementation.",
name));
}
try {
final Class<?> udfClass =
Class.forName(runtimeClass, true, context.getBuiltInClassLoader());
final Constructor<?> udfConstructor = udfClass.getConstructor(SpecializedContext.class);
final UserDefinedFunction udf =
(UserDefinedFunction) udfConstructor.newInstance(context);
// in case another level of specialization is required
if (udf instanceof SpecializedFunction) {
return ((SpecializedFunction) udf).specialize(context);
}
return udf;
} catch (Exception e) {
throw new TableException(
String.format(
"Could not instantiate a runtime implementation for built-in function '%s'.",
name),
e);
}
}
@Override
public FunctionKind getKind() {
return kind;
......@@ -99,6 +144,8 @@ public final class BuiltInFunctionDefinition implements FunctionDefinition {
private boolean isDeterministic = true;
private String runtimeClass;
public Builder() {
// default constructor to allow a fluent definition
}
......@@ -128,11 +175,6 @@ public final class BuiltInFunctionDefinition implements FunctionDefinition {
return this;
}
public Builder accumulatorTypeStrategy(TypeStrategy accumulatorTypeStrategy) {
this.typeInferenceBuilder.accumulatorTypeStrategy(accumulatorTypeStrategy);
return this;
}
public Builder outputTypeStrategy(TypeStrategy outputTypeStrategy) {
this.typeInferenceBuilder.outputTypeStrategy(outputTypeStrategy);
return this;
......@@ -143,9 +185,14 @@ public final class BuiltInFunctionDefinition implements FunctionDefinition {
return this;
}
public Builder runtimeClass(String runtimeClass) {
this.runtimeClass = runtimeClass;
return this;
}
public BuiltInFunctionDefinition build() {
return new BuiltInFunctionDefinition(
name, kind, typeInferenceBuilder.build(), isDeterministic);
name, kind, typeInferenceBuilder.build(), isDeterministic, runtimeClass);
}
}
}
......@@ -25,12 +25,12 @@ import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.functions.ScalarFunctionDefinition;
import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
......@@ -202,10 +202,13 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
FunctionIdentifier identifier,
FunctionDefinition definition) {
// for now, we don't allow other functions than user-defined ones
// all built-in functions need to be mapped to Calcite's SqlFunctions
if (!(definition instanceof UserDefinedFunction)) {
return false;
// built-in functions without implementation are handled separately
if (definition instanceof BuiltInFunctionDefinition) {
final BuiltInFunctionDefinition builtInFunction =
(BuiltInFunctionDefinition) definition;
if (!builtInFunction.getRuntimeClass().isPresent()) {
return false;
}
}
final FunctionKind kind = definition.getKind();
......
......@@ -38,8 +38,13 @@ public class FunctionDefinitionConvertRule implements CallExpressionConvertRule
public Optional<RexNode> convert(CallExpression call, ConvertContext context) {
FunctionDefinition functionDefinition = call.getFunctionDefinition();
// built-in functions without implementation are handled separately
if (functionDefinition instanceof BuiltInFunctionDefinition) {
return Optional.empty();
final BuiltInFunctionDefinition builtInFunction =
(BuiltInFunctionDefinition) functionDefinition;
if (!builtInFunction.getRuntimeClass().isPresent()) {
return Optional.empty();
}
}
TypeInference typeInference =
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.functions.aggregate;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.FunctionRequirement;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Base class for runtime implementation represented as {@link AggregateFunction} that is
* constructed from {@link BuiltInFunctionDefinition#specialize(SpecializedContext)}.
*
* <p>Subclasses must offer a constructor that takes {@link SpecializedContext} if they are
* constructed from a {@link BuiltInFunctionDefinition}. Otherwise the {@link
* #BuiltInAggregateFunction()} constructor might be more appropriate.
*
* <p>By default, all built-in functions work on internal data structures. However, this can be
* changed by overriding {@link #getArgumentDataTypes()}, {@link #getAccumulatorDataType()}, and
* {@link #getOutputDataType()}. Or by overriding {@link #getTypeInference(DataTypeFactory)}
* directly.
*
* <p>Since the accumulator type is runtime specific, it must be declared explicitly; otherwise it
* is derived from the output type.
*/
@Internal
public abstract class BuiltInAggregateFunction<T, ACC> extends AggregateFunction<T, ACC> {
// can be null if a Calcite function definition is the origin
private transient @Nullable BuiltInFunctionDefinition definition;
private transient List<DataType> argumentDataTypes;
private transient DataType outputDataType;
protected BuiltInAggregateFunction(
BuiltInFunctionDefinition definition, SpecializedContext context) {
this.definition = definition;
final CallContext callContext = context.getCallContext();
argumentDataTypes =
callContext.getArgumentDataTypes().stream()
.map(DataTypeUtils::toInternalDataType)
.collect(Collectors.toList());
outputDataType =
callContext
.getOutputDataType()
.map(DataTypeUtils::toInternalDataType)
.orElseThrow(IllegalStateException::new);
}
protected BuiltInAggregateFunction() {
// for overriding the required methods manually
}
public List<DataType> getArgumentDataTypes() {
Preconditions.checkNotNull(argumentDataTypes, "Argument data types not set.");
return argumentDataTypes;
}
public DataType getAccumulatorDataType() {
return getOutputDataType();
}
public DataType getOutputDataType() {
Preconditions.checkNotNull(outputDataType, "Output data type not set.");
return outputDataType;
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.typedArguments(getArgumentDataTypes())
.accumulatorTypeStrategy(TypeStrategies.explicit(getAccumulatorDataType()))
.outputTypeStrategy(TypeStrategies.explicit(getOutputDataType()))
.build();
}
@Override
public Set<FunctionRequirement> getRequirements() {
// in case the function is used for testing
if (definition != null) {
definition.getRequirements();
}
return super.getRequirements();
}
@Override
public boolean isDeterministic() {
// in case the function is used for testing
if (definition != null) {
definition.getRequirements();
}
return super.isDeterministic();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.functions.scalar;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.FunctionRequirement;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Base class for runtime implementation represented as {@link ScalarFunction} that is constructed
* from {@link BuiltInFunctionDefinition#specialize(SpecializedContext)}.
*
* <p>Subclasses must offer a constructor that takes {@link SpecializedContext} if they are
* constructed from a {@link BuiltInFunctionDefinition}. Otherwise the {@link
* #BuiltInScalarFunction()} constructor might be more appropriate.
*
* <p>By default, all built-in functions work on internal data structures. However, this can be
* changed by overriding {@link #getArgumentDataTypes()} and {@link #getOutputDataType()}. Or by
* overriding {@link #getTypeInference(DataTypeFactory)} directly.
*/
@Internal
public abstract class BuiltInScalarFunction extends ScalarFunction {
// can be null if a Calcite function definition is the origin
private transient @Nullable BuiltInFunctionDefinition definition;
private transient List<DataType> argumentDataTypes;
private transient DataType outputDataType;
protected BuiltInScalarFunction(
BuiltInFunctionDefinition definition, SpecializedContext context) {
this.definition = definition;
final CallContext callContext = context.getCallContext();
argumentDataTypes =
callContext.getArgumentDataTypes().stream()
.map(DataTypeUtils::toInternalDataType)
.collect(Collectors.toList());
outputDataType =
callContext
.getOutputDataType()
.map(DataTypeUtils::toInternalDataType)
.orElseThrow(IllegalStateException::new);
}
protected BuiltInScalarFunction() {
// for overriding the required methods manually
}
public List<DataType> getArgumentDataTypes() {
Preconditions.checkNotNull(argumentDataTypes, "Argument data types not set.");
return argumentDataTypes;
}
public DataType getOutputDataType() {
Preconditions.checkNotNull(outputDataType, "Output data type not set.");
return outputDataType;
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.typedArguments(getArgumentDataTypes())
.outputTypeStrategy(TypeStrategies.explicit(getOutputDataType()))
.build();
}
@Override
public Set<FunctionRequirement> getRequirements() {
// in case the function is used for testing
if (definition != null) {
definition.getRequirements();
}
return super.getRequirements();
}
@Override
public boolean isDeterministic() {
// in case the function is used for testing
if (definition != null) {
definition.getRequirements();
}
return super.isDeterministic();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.functions.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.FunctionRequirement;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Base class for runtime implementation represented as {@link TableFunction} that is constructed
* from {@link BuiltInFunctionDefinition#specialize(SpecializedContext)}.
*
* <p>Subclasses must offer a constructor that takes {@link SpecializedContext} if they are
* constructed from a {@link BuiltInFunctionDefinition}. Otherwise the {@link
* #BuiltInTableFunction()} constructor might be more appropriate.
*
* <p>By default, all built-in functions work on internal data structures. However, this can be
* changed by overriding {@link #getArgumentDataTypes()} and {@link #getOutputDataType()}. Or by
* overriding {@link #getTypeInference(DataTypeFactory)} directly.
*/
@Internal
public abstract class BuiltInTableFunction<T> extends TableFunction<T> {
// can be null if a Calcite function definition is the origin
private transient @Nullable BuiltInFunctionDefinition definition;
private transient List<DataType> argumentDataTypes;
private transient DataType outputDataType;
protected BuiltInTableFunction(
BuiltInFunctionDefinition definition, SpecializedContext context) {
this.definition = definition;
final CallContext callContext = context.getCallContext();
argumentDataTypes =
callContext.getArgumentDataTypes().stream()
.map(DataTypeUtils::toInternalDataType)
.collect(Collectors.toList());
outputDataType =
callContext
.getOutputDataType()
.map(DataTypeUtils::toInternalDataType)
.orElseThrow(IllegalStateException::new);
}
protected BuiltInTableFunction() {
// for overriding the required methods manually
}
public List<DataType> getArgumentDataTypes() {
Preconditions.checkNotNull(argumentDataTypes, "Argument data types not set.");
return argumentDataTypes;
}
public DataType getOutputDataType() {
Preconditions.checkNotNull(outputDataType, "Output data type not set.");
return outputDataType;
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.typedArguments(getArgumentDataTypes())
.outputTypeStrategy(TypeStrategies.explicit(getOutputDataType()))
.build();
}
@Override
public Set<FunctionRequirement> getRequirements() {
// in case the function is used for testing
if (definition != null) {
definition.getRequirements();
}
return super.getRequirements();
}
@Override
public boolean isDeterministic() {
// in case the function is used for testing
if (definition != null) {
definition.getRequirements();
}
return super.isDeterministic();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册