未验证 提交 aed8c195 编写于 作者: T Terry Wang 提交者: GitHub

[FLINK-17313] Fix type validation error when sink table ddl contains columns...

[FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11848)
上级 b7ce6119
...@@ -158,19 +158,31 @@ public final class TypeMappingUtils { ...@@ -158,19 +158,31 @@ public final class TypeMappingUtils {
String physicalFieldName, String physicalFieldName,
String logicalFieldName, String logicalFieldName,
boolean isSource) { boolean isSource) {
checkIfCompatible( if (isSource) {
physicalFieldType, checkIfCompatible(
logicalFieldType, physicalFieldType,
(cause) -> new ValidationException( logicalFieldType,
String.format( (cause) -> new ValidationException(
"Type %s of table field '%s' does not match with " + String.format("Type %s of table field '%s' does not match with " +
"the physical type %s of the '%s' field of the %s.", "the physical type %s of the '%s' field of the TableSource return type.",
logicalFieldType,
logicalFieldName,
physicalFieldType,
physicalFieldName),
cause));
} else {
checkIfCompatible(
logicalFieldType, logicalFieldType,
logicalFieldName,
physicalFieldType, physicalFieldType,
physicalFieldName, (cause) -> new ValidationException(
isSource ? "TableSource return type" : "TableSink consumed type"), String.format("Type %s of table field '%s' does not match with " +
cause)); "the physical type %s of the '%s' field of the TableSink consumed type.",
logicalFieldType,
logicalFieldName,
physicalFieldType,
physicalFieldName),
cause));
}
} }
private static void verifyTimeAttributeType(TableColumn logicalColumn, String rowtimeOrProctime) { private static void verifyTimeAttributeType(TableColumn logicalColumn, String rowtimeOrProctime) {
...@@ -245,22 +257,32 @@ public final class TypeMappingUtils { ...@@ -245,22 +257,32 @@ public final class TypeMappingUtils {
} }
private static void checkIfCompatible( private static void checkIfCompatible(
LogicalType physicalFieldType, LogicalType sourceType,
LogicalType logicalFieldType, LogicalType targetType,
Function<Throwable, ValidationException> exceptionSupplier) { Function<Throwable, ValidationException> exceptionSupplier) {
if (supportsAvoidingCast(physicalFieldType, logicalFieldType)) { if (supportsAvoidingCast(sourceType, targetType)) {
return; return;
} }
physicalFieldType.accept(new LogicalTypeDefaultVisitor<Void>() { sourceType.accept(new LogicalTypeDefaultVisitor<Void>() {
@Override
public Void visit(DecimalType sourceType) {
//When targetType is a legacy decimal type, pass the check.
if (targetType instanceof LegacyTypeInformationType
&& targetType.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
return null;
}
return defaultMethod(sourceType);
}
@Override @Override
public Void visit(LogicalType other) { public Void visit(LogicalType other) {
if (other instanceof LegacyTypeInformationType && other.getTypeRoot() == LogicalTypeRoot.DECIMAL) { if (other instanceof LegacyTypeInformationType && other.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
if (!(logicalFieldType instanceof DecimalType)) { if (!(targetType instanceof DecimalType)) {
throw exceptionSupplier.apply(null); throw exceptionSupplier.apply(null);
} }
DecimalType logicalDecimalType = (DecimalType) logicalFieldType; DecimalType logicalDecimalType = (DecimalType) targetType;
if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION || if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION ||
logicalDecimalType.getScale() != 18) { logicalDecimalType.getScale() != 18) {
throw exceptionSupplier.apply(new ValidationException( throw exceptionSupplier.apply(new ValidationException(
......
...@@ -18,16 +18,23 @@ ...@@ -18,16 +18,23 @@
package org.apache.flink.table.utils; package org.apache.flink.table.utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute; import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes; import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
...@@ -331,6 +338,30 @@ public class TypeMappingUtilsTest { ...@@ -331,6 +338,30 @@ public class TypeMappingUtilsTest {
); );
} }
@Test
public void testCheckPhysicalLogicalTypeCompatible() {
TableSchema tableSchema = TableSchema.builder()
.field("a", DataTypes.VARCHAR(2))
.field("b", DataTypes.DECIMAL(20, 2))
.build();
TableSink tableSink = new TestTableSink(tableSchema);
LegacyTypeInformationType legacyDataType = (LegacyTypeInformationType) tableSink.getConsumedDataType()
.getLogicalType();
TypeInformation legacyTypeInfo = ((TupleTypeInfo) legacyDataType.getTypeInformation()).getTypeAt(1);
DataType physicalType = TypeConversions.fromLegacyInfoToDataType(legacyTypeInfo);
TableSchema physicSchema = DataTypeUtils.expandCompositeTypeToSchema(physicalType);
DataType[] logicalDataTypes = tableSchema.getFieldDataTypes();
DataType[] physicalDataTypes = physicSchema.getFieldDataTypes();
for (int i = 0; i < logicalDataTypes.length; i++) {
TypeMappingUtils.checkPhysicalLogicalTypeCompatible(
physicalDataTypes[i].getLogicalType(),
logicalDataTypes[i].getLogicalType(),
"physicalField",
"logicalField",
false);
}
}
private static class TestTableSource private static class TestTableSource
implements TableSource<Object>, DefinedProctimeAttribute, DefinedRowtimeAttributes { implements TableSource<Object>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
...@@ -370,4 +401,41 @@ public class TypeMappingUtilsTest { ...@@ -370,4 +401,41 @@ public class TypeMappingUtilsTest {
throw new UnsupportedOperationException("Should not be called"); throw new UnsupportedOperationException("Should not be called");
} }
} }
/**
* Since UpsertStreamTableSink not in flink-table-common module, here we use Tuple2 &lt;Boolean, Row&gt; to
* simulate the behavior of UpsertStreamTableSink.
*/
private static class TestTableSink implements TableSink<Tuple2<Boolean, Row>> {
private final TableSchema tableSchema;
private TestTableSink(TableSchema tableSchema) {
this.tableSchema = tableSchema;
}
TypeInformation<Row> getRecordType() {
return tableSchema.toRowType();
}
@Override
public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
}
@Override
public String[] getFieldNames() {
return tableSchema.getFieldNames();
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return tableSchema.getFieldTypes();
}
@Override
public TableSink<Tuple2<Boolean, Row>> configure(
String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册