提交 11007c0d 编写于 作者: T Timo Walther

[FLINK-15612][table-planner-blink] Propagate the data type factory in the planner

This closes #10917.
上级 c16121f9
......@@ -19,11 +19,13 @@
package org.apache.flink.table.planner.functions.bridging;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.functions.FunctionRequirement;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.calcite.rel.type.RelDataType;
......@@ -49,6 +51,10 @@ import static org.apache.flink.util.Preconditions.checkState;
@Internal
public final class BridgingSqlAggFunction extends SqlAggFunction {
private final DataTypeFactory dataTypeFactory;
private final FlinkTypeFactory typeFactory;
private final FunctionIdentifier identifier;
private final FunctionDefinition definition;
......@@ -58,6 +64,7 @@ public final class BridgingSqlAggFunction extends SqlAggFunction {
private final List<RelDataType> paramTypes;
private BridgingSqlAggFunction(
DataTypeFactory dataTypeFactory,
FlinkTypeFactory typeFactory,
SqlKind kind,
FunctionIdentifier identifier,
......@@ -67,14 +74,16 @@ public final class BridgingSqlAggFunction extends SqlAggFunction {
createName(identifier),
createSqlIdentifier(identifier),
kind,
createSqlReturnTypeInference(definition, typeInference),
createSqlOperandTypeInference(definition, typeInference),
createSqlOperandTypeChecker(definition, typeInference),
createSqlReturnTypeInference(dataTypeFactory, definition, typeInference),
createSqlOperandTypeInference(dataTypeFactory, definition, typeInference),
createSqlOperandTypeChecker(dataTypeFactory, definition, typeInference),
createSqlFunctionCategory(identifier),
createOrderRequirement(),
createOverWindowRequirement(definition),
createGroupOrderRequirement());
this.dataTypeFactory = dataTypeFactory;
this.typeFactory = typeFactory;
this.identifier = identifier;
this.definition = definition;
this.typeInference = typeInference;
......@@ -84,7 +93,8 @@ public final class BridgingSqlAggFunction extends SqlAggFunction {
/**
* Creates an instance of a aggregating function (either a system or user-defined function).
*
* @param typeFactory used for resolving typed arguments
* @param dataTypeFactory used for creating {@link DataType}
* @param typeFactory used for bridging to {@link RelDataType}
* @param kind commonly used SQL standard function; use {@link SqlKind#OTHER_FUNCTION} if this function
* cannot be mapped to a common function kind.
* @param identifier catalog identifier
......@@ -92,6 +102,7 @@ public final class BridgingSqlAggFunction extends SqlAggFunction {
* @param typeInference type inference logic
*/
public static BridgingSqlAggFunction of(
DataTypeFactory dataTypeFactory,
FlinkTypeFactory typeFactory,
SqlKind kind,
FunctionIdentifier identifier,
......@@ -103,6 +114,7 @@ public final class BridgingSqlAggFunction extends SqlAggFunction {
"Aggregating function kind expected.");
return new BridgingSqlAggFunction(
dataTypeFactory,
typeFactory,
kind,
identifier,
......@@ -110,6 +122,14 @@ public final class BridgingSqlAggFunction extends SqlAggFunction {
typeInference);
}
public DataTypeFactory getDataTypeFactory() {
return dataTypeFactory;
}
public FlinkTypeFactory getTypeFactory() {
return typeFactory;
}
public FunctionIdentifier getIdentifier() {
return identifier;
}
......
......@@ -19,12 +19,15 @@
package org.apache.flink.table.planner.functions.bridging;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlKind;
......@@ -46,6 +49,10 @@ import static org.apache.flink.util.Preconditions.checkState;
@Internal
public final class BridgingSqlFunction extends SqlFunction {
private final DataTypeFactory dataTypeFactory;
private final FlinkTypeFactory typeFactory;
private final FunctionIdentifier identifier;
private final FunctionDefinition definition;
......@@ -53,6 +60,7 @@ public final class BridgingSqlFunction extends SqlFunction {
private final TypeInference typeInference;
private BridgingSqlFunction(
DataTypeFactory dataTypeFactory,
FlinkTypeFactory typeFactory,
SqlKind kind,
FunctionIdentifier identifier,
......@@ -62,12 +70,14 @@ public final class BridgingSqlFunction extends SqlFunction {
createName(identifier),
createSqlIdentifier(identifier),
kind,
createSqlReturnTypeInference(definition, typeInference),
createSqlOperandTypeInference(definition, typeInference),
createSqlOperandTypeChecker(definition, typeInference),
createSqlReturnTypeInference(dataTypeFactory, definition, typeInference),
createSqlOperandTypeInference(dataTypeFactory, definition, typeInference),
createSqlOperandTypeChecker(dataTypeFactory, definition, typeInference),
createParamTypes(typeFactory, typeInference),
createSqlFunctionCategory(identifier));
this.dataTypeFactory = dataTypeFactory;
this.typeFactory = typeFactory;
this.identifier = identifier;
this.definition = definition;
this.typeInference = typeInference;
......@@ -76,7 +86,8 @@ public final class BridgingSqlFunction extends SqlFunction {
/**
* Creates an instance of a scalar or table function (either a system or user-defined function).
*
* @param typeFactory used for resolving typed arguments
* @param dataTypeFactory used for creating {@link DataType}
* @param typeFactory used for bridging to {@link RelDataType}
* @param kind commonly used SQL standard function; use {@link SqlKind#OTHER_FUNCTION} if this function
* cannot be mapped to a common function kind.
* @param identifier catalog identifier
......@@ -84,6 +95,7 @@ public final class BridgingSqlFunction extends SqlFunction {
* @param typeInference type inference logic
*/
public static BridgingSqlFunction of(
DataTypeFactory dataTypeFactory,
FlinkTypeFactory typeFactory,
SqlKind kind,
FunctionIdentifier identifier,
......@@ -92,9 +104,10 @@ public final class BridgingSqlFunction extends SqlFunction {
checkState(
definition.getKind() == FunctionKind.SCALAR || definition.getKind() == FunctionKind.TABLE,
"Scala or table function kind expected.");
"Scalar or table function kind expected.");
return new BridgingSqlFunction(
dataTypeFactory,
typeFactory,
kind,
identifier,
......@@ -102,6 +115,14 @@ public final class BridgingSqlFunction extends SqlFunction {
typeInference);
}
public DataTypeFactory getDataTypeFactory() {
return dataTypeFactory;
}
public FlinkTypeFactory getTypeFactory() {
return typeFactory;
}
public FunctionIdentifier getIdentifier() {
return identifier;
}
......
......@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.functions.bridging;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
......@@ -61,21 +62,24 @@ final class BridgingUtils {
}
static SqlReturnTypeInference createSqlReturnTypeInference(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
TypeInference typeInference) {
return new TypeInferenceReturnInference(definition, typeInference);
return new TypeInferenceReturnInference(dataTypeFactory, definition, typeInference);
}
static SqlOperandTypeInference createSqlOperandTypeInference(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
TypeInference typeInference) {
return new TypeInferenceOperandInference(definition, typeInference);
return new TypeInferenceOperandInference(dataTypeFactory, definition, typeInference);
}
static SqlOperandTypeChecker createSqlOperandTypeChecker(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
TypeInference typeInference) {
return new TypeInferenceOperandChecker(definition, typeInference);
return new TypeInferenceOperandChecker(dataTypeFactory, definition, typeInference);
}
static @Nullable List<RelDataType> createParamTypes(
......
......@@ -38,24 +38,24 @@ import java.time.Period;
@Internal
public abstract class AbstractSqlCallContext implements CallContext {
private final DataTypeFactory typeFactory;
private final DataTypeFactory dataTypeFactory;
private final FunctionDefinition definition;
private final String name;
protected AbstractSqlCallContext(
DataTypeFactory typeFactory,
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
String name) {
this.typeFactory = typeFactory;
this.dataTypeFactory = dataTypeFactory;
this.definition = definition;
this.name = name;
}
@Override
public DataTypeFactory getDataTypeFactory() {
return typeFactory;
return dataTypeFactory;
}
@Override
......
......@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.functions.inference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
......@@ -52,11 +53,12 @@ public final class CallBindingCallContext extends AbstractSqlCallContext {
private final @Nullable DataType outputType;
public CallBindingCallContext(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
SqlCallBinding binding,
@Nullable RelDataType outputType) {
super(
((FlinkTypeFactory) binding.getTypeFactory()).getDataTypeFactory(),
dataTypeFactory,
definition,
binding.getOperator().getNameAsId().toString());
......
......@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.functions.inference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
......@@ -43,10 +44,11 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext {
private final List<DataType> argumentDataTypes;
public OperatorBindingCallContext(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
SqlOperatorBinding binding) {
super(
((FlinkTypeFactory) binding.getTypeFactory()).getDataTypeFactory(),
dataTypeFactory,
definition,
binding.getOperator().getNameAsId().toString());
......
......@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.functions.inference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
......@@ -43,6 +44,7 @@ import org.apache.calcite.sql.validate.SqlValidatorNamespace;
import java.util.List;
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
import static org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments;
import static org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException;
import static org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException;
......@@ -56,6 +58,8 @@ import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUne
@Internal
public final class TypeInferenceOperandChecker implements SqlOperandTypeChecker {
private final DataTypeFactory dataTypeFactory;
private final FunctionDefinition definition;
private final TypeInference typeInference;
......@@ -63,8 +67,10 @@ public final class TypeInferenceOperandChecker implements SqlOperandTypeChecker
private final SqlOperandCountRange countRange;
public TypeInferenceOperandChecker(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
TypeInference typeInference) {
this.dataTypeFactory = dataTypeFactory;
this.definition = definition;
this.typeInference = typeInference;
this.countRange = new ArgumentCountRange(typeInference.getInputTypeStrategy().getArgumentCount());
......@@ -72,7 +78,11 @@ public final class TypeInferenceOperandChecker implements SqlOperandTypeChecker
@Override
public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
final CallContext callContext = new CallBindingCallContext(definition, callBinding, null);
final CallContext callContext = new CallBindingCallContext(
dataTypeFactory,
definition,
callBinding,
null);
try {
return checkOperandTypesOrError(callBinding, callContext);
}
......@@ -128,7 +138,7 @@ public final class TypeInferenceOperandChecker implements SqlOperandTypeChecker
}
private void insertImplicitCasts(SqlCallBinding callBinding, List<DataType> expectedDataTypes) {
final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) callBinding.getTypeFactory();
final FlinkTypeFactory flinkTypeFactory = unwrapTypeFactory(callBinding);
final List<SqlNode> operands = callBinding.operands();
for (int i = 0; i < operands.size(); i++) {
final LogicalType expectedType = expectedDataTypes.get(i).getLogicalType();
......
......@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.functions.inference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
......@@ -28,12 +29,12 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.type.SqlOperandTypeInference;
import java.util.List;
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUnexpectedException;
/**
......@@ -44,22 +45,30 @@ import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUne
@Internal
public final class TypeInferenceOperandInference implements SqlOperandTypeInference {
private final DataTypeFactory dataTypeFactory;
private final FunctionDefinition definition;
private final TypeInference typeInference;
public TypeInferenceOperandInference(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
TypeInference typeInference) {
this.dataTypeFactory = dataTypeFactory;
this.definition = definition;
this.typeInference = typeInference;
}
@Override
public void inferOperandTypes(SqlCallBinding callBinding, RelDataType returnType, RelDataType[] operandTypes) {
final CallContext callContext = new CallBindingCallContext(definition, callBinding, returnType);
final CallContext callContext = new CallBindingCallContext(
dataTypeFactory,
definition,
callBinding,
returnType);
try {
inferOperandTypesOrError(callBinding.getTypeFactory(), callContext, operandTypes);
inferOperandTypesOrError(unwrapTypeFactory(callBinding), callContext, operandTypes);
} catch (Throwable t) {
throw createUnexpectedException(callContext, t);
}
......@@ -67,9 +76,7 @@ public final class TypeInferenceOperandInference implements SqlOperandTypeInfere
// --------------------------------------------------------------------------------------------
private void inferOperandTypesOrError(RelDataTypeFactory typeFactory, CallContext callContext, RelDataType[] operandTypes) {
final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
private void inferOperandTypesOrError(FlinkTypeFactory typeFactory, CallContext callContext, RelDataType[] operandTypes) {
final List<DataType> expectedDataTypes;
// typed arguments have highest priority
if (typeInference.getTypedArguments().isPresent()) {
......@@ -87,7 +94,7 @@ public final class TypeInferenceOperandInference implements SqlOperandTypeInfere
for (int i = 0; i < operandTypes.length; i++) {
final LogicalType inferredType = expectedDataTypes.get(i).getLogicalType();
operandTypes[i] = flinkTypeFactory.createFieldTypeFromLogicalType(inferredType);
operandTypes[i] = typeFactory.createFieldTypeFromLogicalType(inferredType);
}
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.functions.inference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.inference.CallContext;
......@@ -31,6 +32,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
import static org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException;
import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUnexpectedException;
import static org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutputType;
......@@ -43,23 +45,29 @@ import static org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutp
@Internal
public final class TypeInferenceReturnInference implements SqlReturnTypeInference {
private final DataTypeFactory dataTypeFactory;
private final FunctionDefinition definition;
private final TypeInference typeInference;
public TypeInferenceReturnInference(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
TypeInference typeInference) {
this.dataTypeFactory = dataTypeFactory;
this.definition = definition;
this.typeInference = typeInference;
}
@Override
public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
final FlinkTypeFactory typeFactory = (FlinkTypeFactory) opBinding.getTypeFactory();
final CallContext callContext = new OperatorBindingCallContext(definition, opBinding);
final CallContext callContext = new OperatorBindingCallContext(
dataTypeFactory,
definition,
opBinding);
try {
return inferReturnTypeOrError(typeFactory, callContext);
return inferReturnTypeOrError(unwrapTypeFactory(opBinding), callContext);
}
catch (ValidationException e) {
throw createInvalidCallException(callContext, e);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.utils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlOperatorBinding;
/**
* Utilities for quick access of commonly used instances (like {@link FlinkTypeFactory}) without
* long chains of getters or casting like {@code (FlinkTypeFactory) agg.getCluster.getTypeFactory()}.
*/
@Internal
public final class ShortcutUtils {
public static FlinkTypeFactory unwrapTypeFactory(SqlOperatorBinding operatorBinding) {
return unwrapTypeFactory(operatorBinding.getTypeFactory());
}
public static FlinkTypeFactory unwrapTypeFactory(RelNode relNode) {
return unwrapTypeFactory(relNode.getCluster());
}
public static FlinkTypeFactory unwrapTypeFactory(RelOptCluster cluster) {
return unwrapTypeFactory(cluster.getTypeFactory());
}
public static FlinkTypeFactory unwrapTypeFactory(RelDataTypeFactory typeFactory) {
return (FlinkTypeFactory) typeFactory;
}
private ShortcutUtils() {
// no instantiation
}
}
......@@ -18,30 +18,28 @@
package org.apache.flink.table.planner.calcite
import java.nio.charset.Charset
import java.util
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`._
import org.apache.calcite.sql.SqlIntervalQualifier
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.{BasicSqlType, MapSqlType, SqlTypeName}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.util.ConversionUtil
import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.table.api.{DataTypes, TableException, TableSchema}
import org.apache.flink.table.catalog.{DataTypeFactory, UnresolvedIdentifier}
import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType
import org.apache.flink.table.planner.plan.schema.{GenericRelDataType, _}
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical._
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.types.Nothing
import org.apache.flink.util.Preconditions.checkArgument
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`._
import org.apache.calcite.sql.SqlIntervalQualifier
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.{BasicSqlType, MapSqlType, SqlTypeName}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.util.ConversionUtil
import java.nio.charset.Charset
import java.util
import java.util.Optional
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
......@@ -55,20 +53,6 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
private val seenTypes = mutable.HashMap[LogicalType, RelDataType]()
def getDataTypeFactory: DataTypeFactory = new DataTypeFactory {
override def createDataType(name: String): Optional[DataType] =
throw new TableException("Data type creation is not supported yet.")
override def createDataType(identifier: UnresolvedIdentifier): Optional[DataType] =
throw new TableException("Data type creation is not supported yet.")
override def createDataType[T](clazz: Class[T]): DataType =
throw new TableException("Data type creation is not supported yet.")
override def createRawDataType[T](clazz: Class[T]): DataType =
throw new TableException("Data type creation is not supported yet.")
}
/**
* Create a calcite field type in table schema from [[LogicalType]]. It use
* PEEK_FIELDS_NO_EXPAND when type is a nested struct type (Flink [[RowType]]).
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册