From aed8c1957a706c35d824c598e2d610d897059c36 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Mon, 27 Apr 2020 17:25:28 +0800 Subject: [PATCH] [FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11848) --- .../flink/table/utils/TypeMappingUtils.java | 56 ++++++++++----- .../table/utils/TypeMappingUtilsTest.java | 68 +++++++++++++++++++ 2 files changed, 107 insertions(+), 17 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java index 4922ac2f3b8..9d800ea57b8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java @@ -158,19 +158,31 @@ public final class TypeMappingUtils { String physicalFieldName, String logicalFieldName, boolean isSource) { - checkIfCompatible( - physicalFieldType, - logicalFieldType, - (cause) -> new ValidationException( - String.format( - "Type %s of table field '%s' does not match with " + - "the physical type %s of the '%s' field of the %s.", + if (isSource) { + checkIfCompatible( + physicalFieldType, + logicalFieldType, + (cause) -> new ValidationException( + String.format("Type %s of table field '%s' does not match with " + + "the physical type %s of the '%s' field of the TableSource return type.", + logicalFieldType, + logicalFieldName, + physicalFieldType, + physicalFieldName), + cause)); + } else { + checkIfCompatible( logicalFieldType, - logicalFieldName, physicalFieldType, - physicalFieldName, - isSource ? "TableSource return type" : "TableSink consumed type"), - cause)); + (cause) -> new ValidationException( + String.format("Type %s of table field '%s' does not match with " + + "the physical type %s of the '%s' field of the TableSink consumed type.", + logicalFieldType, + logicalFieldName, + physicalFieldType, + physicalFieldName), + cause)); + } } private static void verifyTimeAttributeType(TableColumn logicalColumn, String rowtimeOrProctime) { @@ -245,22 +257,32 @@ public final class TypeMappingUtils { } private static void checkIfCompatible( - LogicalType physicalFieldType, - LogicalType logicalFieldType, + LogicalType sourceType, + LogicalType targetType, Function exceptionSupplier) { - if (supportsAvoidingCast(physicalFieldType, logicalFieldType)) { + if (supportsAvoidingCast(sourceType, targetType)) { return; } - physicalFieldType.accept(new LogicalTypeDefaultVisitor() { + sourceType.accept(new LogicalTypeDefaultVisitor() { + @Override + public Void visit(DecimalType sourceType) { + //When targetType is a legacy decimal type, pass the check. + if (targetType instanceof LegacyTypeInformationType + && targetType.getTypeRoot() == LogicalTypeRoot.DECIMAL) { + return null; + } + return defaultMethod(sourceType); + } + @Override public Void visit(LogicalType other) { if (other instanceof LegacyTypeInformationType && other.getTypeRoot() == LogicalTypeRoot.DECIMAL) { - if (!(logicalFieldType instanceof DecimalType)) { + if (!(targetType instanceof DecimalType)) { throw exceptionSupplier.apply(null); } - DecimalType logicalDecimalType = (DecimalType) logicalFieldType; + DecimalType logicalDecimalType = (DecimalType) targetType; if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION || logicalDecimalType.getScale() != 18) { throw exceptionSupplier.apply(new ValidationException( diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java index daadd66b303..200988d182c 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java @@ -18,16 +18,23 @@ package org.apache.flink.table.utils; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.DefinedProctimeAttribute; import org.apache.flink.table.sources.DefinedRowtimeAttributes; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LegacyTypeInformationType; +import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; import org.junit.Rule; import org.junit.Test; @@ -331,6 +338,30 @@ public class TypeMappingUtilsTest { ); } + @Test + public void testCheckPhysicalLogicalTypeCompatible() { + TableSchema tableSchema = TableSchema.builder() + .field("a", DataTypes.VARCHAR(2)) + .field("b", DataTypes.DECIMAL(20, 2)) + .build(); + TableSink tableSink = new TestTableSink(tableSchema); + LegacyTypeInformationType legacyDataType = (LegacyTypeInformationType) tableSink.getConsumedDataType() + .getLogicalType(); + TypeInformation legacyTypeInfo = ((TupleTypeInfo) legacyDataType.getTypeInformation()).getTypeAt(1); + DataType physicalType = TypeConversions.fromLegacyInfoToDataType(legacyTypeInfo); + TableSchema physicSchema = DataTypeUtils.expandCompositeTypeToSchema(physicalType); + DataType[] logicalDataTypes = tableSchema.getFieldDataTypes(); + DataType[] physicalDataTypes = physicSchema.getFieldDataTypes(); + for (int i = 0; i < logicalDataTypes.length; i++) { + TypeMappingUtils.checkPhysicalLogicalTypeCompatible( + physicalDataTypes[i].getLogicalType(), + logicalDataTypes[i].getLogicalType(), + "physicalField", + "logicalField", + false); + } + } + private static class TestTableSource implements TableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { @@ -370,4 +401,41 @@ public class TypeMappingUtilsTest { throw new UnsupportedOperationException("Should not be called"); } } + + /** + * Since UpsertStreamTableSink not in flink-table-common module, here we use Tuple2 <Boolean, Row> to + * simulate the behavior of UpsertStreamTableSink. + */ + private static class TestTableSink implements TableSink> { + private final TableSchema tableSchema; + + private TestTableSink(TableSchema tableSchema) { + this.tableSchema = tableSchema; + } + + TypeInformation getRecordType() { + return tableSchema.toRowType(); + } + + @Override + public TypeInformation> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } + + @Override + public String[] getFieldNames() { + return tableSchema.getFieldNames(); + } + + @Override + public TypeInformation[] getFieldTypes() { + return tableSchema.getFieldTypes(); + } + + @Override + public TableSink> configure( + String[] fieldNames, TypeInformation[] fieldTypes) { + return null; + } + } } -- GitLab