From 11007c0d50d006630daa84a603e0051dfe88769c Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Tue, 21 Jan 2020 17:48:58 +0100 Subject: [PATCH] [FLINK-15612][table-planner-blink] Propagate the data type factory in the planner This closes #10917. --- .../bridging/BridgingSqlAggFunction.java | 28 ++++++++-- .../bridging/BridgingSqlFunction.java | 31 +++++++++-- .../functions/bridging/BridgingUtils.java | 10 +++- .../inference/AbstractSqlCallContext.java | 8 +-- .../inference/CallBindingCallContext.java | 4 +- .../inference/OperatorBindingCallContext.java | 4 +- .../TypeInferenceOperandChecker.java | 14 ++++- .../TypeInferenceOperandInference.java | 21 ++++--- .../TypeInferenceReturnInference.java | 14 ++++- .../table/planner/utils/ShortcutUtils.java | 55 +++++++++++++++++++ .../planner/calcite/FlinkTypeFactory.scala | 40 ++++---------- 11 files changed, 171 insertions(+), 58 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlAggFunction.java index da88e06818f..8ecade2edcb 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlAggFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlAggFunction.java @@ -19,11 +19,13 @@ package org.apache.flink.table.planner.functions.bridging; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.functions.FunctionRequirement; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.TypeInference; import org.apache.calcite.rel.type.RelDataType; @@ -49,6 +51,10 @@ import static org.apache.flink.util.Preconditions.checkState; @Internal public final class BridgingSqlAggFunction extends SqlAggFunction { + private final DataTypeFactory dataTypeFactory; + + private final FlinkTypeFactory typeFactory; + private final FunctionIdentifier identifier; private final FunctionDefinition definition; @@ -58,6 +64,7 @@ public final class BridgingSqlAggFunction extends SqlAggFunction { private final List paramTypes; private BridgingSqlAggFunction( + DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory, SqlKind kind, FunctionIdentifier identifier, @@ -67,14 +74,16 @@ public final class BridgingSqlAggFunction extends SqlAggFunction { createName(identifier), createSqlIdentifier(identifier), kind, - createSqlReturnTypeInference(definition, typeInference), - createSqlOperandTypeInference(definition, typeInference), - createSqlOperandTypeChecker(definition, typeInference), + createSqlReturnTypeInference(dataTypeFactory, definition, typeInference), + createSqlOperandTypeInference(dataTypeFactory, definition, typeInference), + createSqlOperandTypeChecker(dataTypeFactory, definition, typeInference), createSqlFunctionCategory(identifier), createOrderRequirement(), createOverWindowRequirement(definition), createGroupOrderRequirement()); + this.dataTypeFactory = dataTypeFactory; + this.typeFactory = typeFactory; this.identifier = identifier; this.definition = definition; this.typeInference = typeInference; @@ -84,7 +93,8 @@ public final class BridgingSqlAggFunction extends SqlAggFunction { /** * Creates an instance of a aggregating function (either a system or user-defined function). * - * @param typeFactory used for resolving typed arguments + * @param dataTypeFactory used for creating {@link DataType} + * @param typeFactory used for bridging to {@link RelDataType} * @param kind commonly used SQL standard function; use {@link SqlKind#OTHER_FUNCTION} if this function * cannot be mapped to a common function kind. * @param identifier catalog identifier @@ -92,6 +102,7 @@ public final class BridgingSqlAggFunction extends SqlAggFunction { * @param typeInference type inference logic */ public static BridgingSqlAggFunction of( + DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory, SqlKind kind, FunctionIdentifier identifier, @@ -103,6 +114,7 @@ public final class BridgingSqlAggFunction extends SqlAggFunction { "Aggregating function kind expected."); return new BridgingSqlAggFunction( + dataTypeFactory, typeFactory, kind, identifier, @@ -110,6 +122,14 @@ public final class BridgingSqlAggFunction extends SqlAggFunction { typeInference); } + public DataTypeFactory getDataTypeFactory() { + return dataTypeFactory; + } + + public FlinkTypeFactory getTypeFactory() { + return typeFactory; + } + public FunctionIdentifier getIdentifier() { return identifier; } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index dbad2c6b96c..8eec1e1c123 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -19,12 +19,15 @@ package org.apache.flink.table.planner.functions.bridging; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.TypeInference; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlKind; @@ -46,6 +49,10 @@ import static org.apache.flink.util.Preconditions.checkState; @Internal public final class BridgingSqlFunction extends SqlFunction { + private final DataTypeFactory dataTypeFactory; + + private final FlinkTypeFactory typeFactory; + private final FunctionIdentifier identifier; private final FunctionDefinition definition; @@ -53,6 +60,7 @@ public final class BridgingSqlFunction extends SqlFunction { private final TypeInference typeInference; private BridgingSqlFunction( + DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory, SqlKind kind, FunctionIdentifier identifier, @@ -62,12 +70,14 @@ public final class BridgingSqlFunction extends SqlFunction { createName(identifier), createSqlIdentifier(identifier), kind, - createSqlReturnTypeInference(definition, typeInference), - createSqlOperandTypeInference(definition, typeInference), - createSqlOperandTypeChecker(definition, typeInference), + createSqlReturnTypeInference(dataTypeFactory, definition, typeInference), + createSqlOperandTypeInference(dataTypeFactory, definition, typeInference), + createSqlOperandTypeChecker(dataTypeFactory, definition, typeInference), createParamTypes(typeFactory, typeInference), createSqlFunctionCategory(identifier)); + this.dataTypeFactory = dataTypeFactory; + this.typeFactory = typeFactory; this.identifier = identifier; this.definition = definition; this.typeInference = typeInference; @@ -76,7 +86,8 @@ public final class BridgingSqlFunction extends SqlFunction { /** * Creates an instance of a scalar or table function (either a system or user-defined function). * - * @param typeFactory used for resolving typed arguments + * @param dataTypeFactory used for creating {@link DataType} + * @param typeFactory used for bridging to {@link RelDataType} * @param kind commonly used SQL standard function; use {@link SqlKind#OTHER_FUNCTION} if this function * cannot be mapped to a common function kind. * @param identifier catalog identifier @@ -84,6 +95,7 @@ public final class BridgingSqlFunction extends SqlFunction { * @param typeInference type inference logic */ public static BridgingSqlFunction of( + DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory, SqlKind kind, FunctionIdentifier identifier, @@ -92,9 +104,10 @@ public final class BridgingSqlFunction extends SqlFunction { checkState( definition.getKind() == FunctionKind.SCALAR || definition.getKind() == FunctionKind.TABLE, - "Scala or table function kind expected."); + "Scalar or table function kind expected."); return new BridgingSqlFunction( + dataTypeFactory, typeFactory, kind, identifier, @@ -102,6 +115,14 @@ public final class BridgingSqlFunction extends SqlFunction { typeInference); } + public DataTypeFactory getDataTypeFactory() { + return dataTypeFactory; + } + + public FlinkTypeFactory getTypeFactory() { + return typeFactory; + } + public FunctionIdentifier getIdentifier() { return identifier; } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java index 7d903ecd047..4b0d2898d2a 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.functions.bridging; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; @@ -61,21 +62,24 @@ final class BridgingUtils { } static SqlReturnTypeInference createSqlReturnTypeInference( + DataTypeFactory dataTypeFactory, FunctionDefinition definition, TypeInference typeInference) { - return new TypeInferenceReturnInference(definition, typeInference); + return new TypeInferenceReturnInference(dataTypeFactory, definition, typeInference); } static SqlOperandTypeInference createSqlOperandTypeInference( + DataTypeFactory dataTypeFactory, FunctionDefinition definition, TypeInference typeInference) { - return new TypeInferenceOperandInference(definition, typeInference); + return new TypeInferenceOperandInference(dataTypeFactory, definition, typeInference); } static SqlOperandTypeChecker createSqlOperandTypeChecker( + DataTypeFactory dataTypeFactory, FunctionDefinition definition, TypeInference typeInference) { - return new TypeInferenceOperandChecker(definition, typeInference); + return new TypeInferenceOperandChecker(dataTypeFactory, definition, typeInference); } static @Nullable List createParamTypes( diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/AbstractSqlCallContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/AbstractSqlCallContext.java index e9aebf681c2..4dcdbe99557 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/AbstractSqlCallContext.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/AbstractSqlCallContext.java @@ -38,24 +38,24 @@ import java.time.Period; @Internal public abstract class AbstractSqlCallContext implements CallContext { - private final DataTypeFactory typeFactory; + private final DataTypeFactory dataTypeFactory; private final FunctionDefinition definition; private final String name; protected AbstractSqlCallContext( - DataTypeFactory typeFactory, + DataTypeFactory dataTypeFactory, FunctionDefinition definition, String name) { - this.typeFactory = typeFactory; + this.dataTypeFactory = dataTypeFactory; this.definition = definition; this.name = name; } @Override public DataTypeFactory getDataTypeFactory() { - return typeFactory; + return dataTypeFactory; } @Override diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java index f96fc5ecde2..04cbba4cc88 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.functions.inference; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.DataType; @@ -52,11 +53,12 @@ public final class CallBindingCallContext extends AbstractSqlCallContext { private final @Nullable DataType outputType; public CallBindingCallContext( + DataTypeFactory dataTypeFactory, FunctionDefinition definition, SqlCallBinding binding, @Nullable RelDataType outputType) { super( - ((FlinkTypeFactory) binding.getTypeFactory()).getDataTypeFactory(), + dataTypeFactory, definition, binding.getOperator().getNameAsId().toString()); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java index fca09cf600c..690d36a1e26 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.functions.inference; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.DataType; @@ -43,10 +44,11 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { private final List argumentDataTypes; public OperatorBindingCallContext( + DataTypeFactory dataTypeFactory, FunctionDefinition definition, SqlOperatorBinding binding) { super( - ((FlinkTypeFactory) binding.getTypeFactory()).getDataTypeFactory(), + dataTypeFactory, definition, binding.getOperator().getNameAsId().toString()); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java index 3be60cd8172..09596958a47 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.functions.inference; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.DataType; @@ -43,6 +44,7 @@ import org.apache.calcite.sql.validate.SqlValidatorNamespace; import java.util.List; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; import static org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments; import static org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException; import static org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException; @@ -56,6 +58,8 @@ import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUne @Internal public final class TypeInferenceOperandChecker implements SqlOperandTypeChecker { + private final DataTypeFactory dataTypeFactory; + private final FunctionDefinition definition; private final TypeInference typeInference; @@ -63,8 +67,10 @@ public final class TypeInferenceOperandChecker implements SqlOperandTypeChecker private final SqlOperandCountRange countRange; public TypeInferenceOperandChecker( + DataTypeFactory dataTypeFactory, FunctionDefinition definition, TypeInference typeInference) { + this.dataTypeFactory = dataTypeFactory; this.definition = definition; this.typeInference = typeInference; this.countRange = new ArgumentCountRange(typeInference.getInputTypeStrategy().getArgumentCount()); @@ -72,7 +78,11 @@ public final class TypeInferenceOperandChecker implements SqlOperandTypeChecker @Override public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) { - final CallContext callContext = new CallBindingCallContext(definition, callBinding, null); + final CallContext callContext = new CallBindingCallContext( + dataTypeFactory, + definition, + callBinding, + null); try { return checkOperandTypesOrError(callBinding, callContext); } @@ -128,7 +138,7 @@ public final class TypeInferenceOperandChecker implements SqlOperandTypeChecker } private void insertImplicitCasts(SqlCallBinding callBinding, List expectedDataTypes) { - final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) callBinding.getTypeFactory(); + final FlinkTypeFactory flinkTypeFactory = unwrapTypeFactory(callBinding); final List operands = callBinding.operands(); for (int i = 0; i < operands.size(); i++) { final LogicalType expectedType = expectedDataTypes.get(i).getLogicalType(); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandInference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandInference.java index 01cbd112382..e8d0aeea49c 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandInference.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandInference.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.functions.inference; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.DataType; @@ -28,12 +29,12 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil; import org.apache.flink.table.types.logical.LogicalType; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.SqlCallBinding; import org.apache.calcite.sql.type.SqlOperandTypeInference; import java.util.List; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUnexpectedException; /** @@ -44,22 +45,30 @@ import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUne @Internal public final class TypeInferenceOperandInference implements SqlOperandTypeInference { + private final DataTypeFactory dataTypeFactory; + private final FunctionDefinition definition; private final TypeInference typeInference; public TypeInferenceOperandInference( + DataTypeFactory dataTypeFactory, FunctionDefinition definition, TypeInference typeInference) { + this.dataTypeFactory = dataTypeFactory; this.definition = definition; this.typeInference = typeInference; } @Override public void inferOperandTypes(SqlCallBinding callBinding, RelDataType returnType, RelDataType[] operandTypes) { - final CallContext callContext = new CallBindingCallContext(definition, callBinding, returnType); + final CallContext callContext = new CallBindingCallContext( + dataTypeFactory, + definition, + callBinding, + returnType); try { - inferOperandTypesOrError(callBinding.getTypeFactory(), callContext, operandTypes); + inferOperandTypesOrError(unwrapTypeFactory(callBinding), callContext, operandTypes); } catch (Throwable t) { throw createUnexpectedException(callContext, t); } @@ -67,9 +76,7 @@ public final class TypeInferenceOperandInference implements SqlOperandTypeInfere // -------------------------------------------------------------------------------------------- - private void inferOperandTypesOrError(RelDataTypeFactory typeFactory, CallContext callContext, RelDataType[] operandTypes) { - final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; - + private void inferOperandTypesOrError(FlinkTypeFactory typeFactory, CallContext callContext, RelDataType[] operandTypes) { final List expectedDataTypes; // typed arguments have highest priority if (typeInference.getTypedArguments().isPresent()) { @@ -87,7 +94,7 @@ public final class TypeInferenceOperandInference implements SqlOperandTypeInfere for (int i = 0; i < operandTypes.length; i++) { final LogicalType inferredType = expectedDataTypes.get(i).getLogicalType(); - operandTypes[i] = flinkTypeFactory.createFieldTypeFromLogicalType(inferredType); + operandTypes[i] = typeFactory.createFieldTypeFromLogicalType(inferredType); } } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java index cbc4abb98d8..85ba3188140 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.functions.inference; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.inference.CallContext; @@ -31,6 +32,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.type.SqlReturnTypeInference; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; import static org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException; import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUnexpectedException; import static org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutputType; @@ -43,23 +45,29 @@ import static org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutp @Internal public final class TypeInferenceReturnInference implements SqlReturnTypeInference { + private final DataTypeFactory dataTypeFactory; + private final FunctionDefinition definition; private final TypeInference typeInference; public TypeInferenceReturnInference( + DataTypeFactory dataTypeFactory, FunctionDefinition definition, TypeInference typeInference) { + this.dataTypeFactory = dataTypeFactory; this.definition = definition; this.typeInference = typeInference; } @Override public RelDataType inferReturnType(SqlOperatorBinding opBinding) { - final FlinkTypeFactory typeFactory = (FlinkTypeFactory) opBinding.getTypeFactory(); - final CallContext callContext = new OperatorBindingCallContext(definition, opBinding); + final CallContext callContext = new OperatorBindingCallContext( + dataTypeFactory, + definition, + opBinding); try { - return inferReturnTypeOrError(typeFactory, callContext); + return inferReturnTypeOrError(unwrapTypeFactory(opBinding), callContext); } catch (ValidationException e) { throw createInvalidCallException(callContext, e); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java new file mode 100644 index 00000000000..36f06b349fe --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlOperatorBinding; + +/** + * Utilities for quick access of commonly used instances (like {@link FlinkTypeFactory}) without + * long chains of getters or casting like {@code (FlinkTypeFactory) agg.getCluster.getTypeFactory()}. + */ +@Internal +public final class ShortcutUtils { + + public static FlinkTypeFactory unwrapTypeFactory(SqlOperatorBinding operatorBinding) { + return unwrapTypeFactory(operatorBinding.getTypeFactory()); + } + + public static FlinkTypeFactory unwrapTypeFactory(RelNode relNode) { + return unwrapTypeFactory(relNode.getCluster()); + } + + public static FlinkTypeFactory unwrapTypeFactory(RelOptCluster cluster) { + return unwrapTypeFactory(cluster.getTypeFactory()); + } + + public static FlinkTypeFactory unwrapTypeFactory(RelDataTypeFactory typeFactory) { + return (FlinkTypeFactory) typeFactory; + } + + private ShortcutUtils() { + // no instantiation + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala index c542d7b73b1..1c4bda528f4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala @@ -18,30 +18,28 @@ package org.apache.flink.table.planner.calcite +import java.nio.charset.Charset +import java.util + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.jdbc.JavaTypeFactoryImpl +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.sql.SqlIntervalQualifier +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.`type`.{BasicSqlType, MapSqlType, SqlTypeName} +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.util.ConversionUtil import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.table.api.{DataTypes, TableException, TableSchema} -import org.apache.flink.table.catalog.{DataTypeFactory, UnresolvedIdentifier} import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType import org.apache.flink.table.planner.plan.schema.{GenericRelDataType, _} import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter -import org.apache.flink.table.types.DataType import org.apache.flink.table.types.logical._ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.types.Nothing import org.apache.flink.util.Preconditions.checkArgument -import org.apache.calcite.avatica.util.TimeUnit -import org.apache.calcite.jdbc.JavaTypeFactoryImpl -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`._ -import org.apache.calcite.sql.SqlIntervalQualifier -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.`type`.{BasicSqlType, MapSqlType, SqlTypeName} -import org.apache.calcite.sql.parser.SqlParserPos -import org.apache.calcite.util.ConversionUtil -import java.nio.charset.Charset -import java.util -import java.util.Optional import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -55,20 +53,6 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp private val seenTypes = mutable.HashMap[LogicalType, RelDataType]() - def getDataTypeFactory: DataTypeFactory = new DataTypeFactory { - override def createDataType(name: String): Optional[DataType] = - throw new TableException("Data type creation is not supported yet.") - - override def createDataType(identifier: UnresolvedIdentifier): Optional[DataType] = - throw new TableException("Data type creation is not supported yet.") - - override def createDataType[T](clazz: Class[T]): DataType = - throw new TableException("Data type creation is not supported yet.") - - override def createRawDataType[T](clazz: Class[T]): DataType = - throw new TableException("Data type creation is not supported yet.") - } - /** * Create a calcite field type in table schema from [[LogicalType]]. It use * PEEK_FIELDS_NO_EXPAND when type is a nested struct type (Flink [[RowType]]). -- GitLab