diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RuntimeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RuntimeConverter.java index 2b714eaeffa4563fa8301800d8b2d06c923bcf7e..0f483dd3c1a68ecf2ed824de4fc2df0b81851c14 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RuntimeConverter.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RuntimeConverter.java @@ -43,17 +43,25 @@ public interface RuntimeConverter extends Serializable { /** * Context for conversions during runtime. - * - *

Meant for future extensibility. Use {@link Context#empty()} for now. */ interface Context { /** - * Empty context. + * Runtime classloader for loading user-defined classes. + */ + ClassLoader getClassLoader(); + + /** + * Creates a new instance of {@link Context}. + * + * @param classLoader runtime classloader for loading user-defined classes. */ - static Context empty() { + static Context create(ClassLoader classLoader) { return new Context() { - // nothing to do + @Override + public ClassLoader getClassLoader() { + return classLoader; + } }; } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java index 99e4ff9a817088def5ca2dc0a7dd8005f4d7580c..66da3bcb62ba01bc0efb5d21215a378dcf79217b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java @@ -31,13 +31,21 @@ import java.sql.Timestamp; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass; + /** * Transformations for transforming one data type to another. * * @see TypeTransformation */ @Internal -public class TypeTransformations { +public final class TypeTransformations { + + /** + * Transformation that uses internal data structures for all conversion classes. + */ + public static final TypeTransformation TO_INTERNAL_CLASS = + (dataType) -> dataType.bridgedTo(toInternalConversionClass(dataType.getLogicalType())); /** * Returns a type transformation that transforms data type to a new data type whose conversion @@ -73,4 +81,10 @@ public class TypeTransformations { public static TypeTransformation toNullable() { return DataType::nullable; } + + // -------------------------------------------------------------------------------------------- + + private TypeTransformations() { + // no instantiation + } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index 5c45fb050374010c6ce78e1909f731d039361561..e889814a45b8e0e718f9851a89fbf5080d7ecd78 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.RuntimeConverter; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.OutputFormatProvider; import org.apache.flink.table.connector.sink.SinkFunctionProvider; @@ -376,6 +377,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory, .createTypeInformation(physicalRowDataType) .createSerializer(new ExecutionConfig()); DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType); + converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader())); Collection values = convertToRowData(data, converter); if (runtimeSource.equals("SourceFunction")) { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/DataFormatConverterWrapper.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/DataStructureConverterWrapper.java similarity index 59% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/DataFormatConverterWrapper.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/DataStructureConverterWrapper.java index 89688325b21ec48fc65c3f1bae40a3690abafa2a..8e1319d872fb3a65ba0c27026fb5284d561c267f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/DataFormatConverterWrapper.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/DataStructureConverterWrapper.java @@ -18,33 +18,35 @@ package org.apache.flink.table.runtime.connector.sink; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter; +import org.apache.flink.table.data.conversion.DataStructureConverter; import javax.annotation.Nullable; /** - * This class wraps a {@link DataFormatConverter} so it can be used in - * a {@link DynamicTableSink} as a {@link DynamicTableSink.DataStructureConverter}. + * Implementation of {@link DynamicTableSink.DataStructureConverter}. + * + *

It wraps the internal {@link DataStructureConverter}. */ -public final class DataFormatConverterWrapper implements DynamicTableSink.DataStructureConverter { +@Internal +class DataStructureConverterWrapper implements DynamicTableSink.DataStructureConverter { private static final long serialVersionUID = 1L; - private final DataFormatConverter formatConverter; + private final DataStructureConverter structureConverter; - public DataFormatConverterWrapper(DataFormatConverter formatConverter) { - this.formatConverter = formatConverter; + DataStructureConverterWrapper(DataStructureConverter structureConverter) { + this.structureConverter = structureConverter; } @Override public void open(Context context) { - // do nothing + structureConverter.open(context.getClassLoader()); } - @Nullable @Override - public Object toExternal(@Nullable Object internalStructure) { - return formatConverter.toExternal(internalStructure); + public @Nullable Object toExternal(@Nullable Object internalStructure) { + return structureConverter.toExternalOrNull(internalStructure); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java index 1c95e19c88b0fc51321a7bd4f1955ff6d7598230..007bd1f4e2fdfe2788608d5dd26cc4d51f2ed0d0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java @@ -18,26 +18,21 @@ package org.apache.flink.table.runtime.connector.sink; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.TypeTransformation; +import org.apache.flink.table.types.inference.TypeTransformations; import org.apache.flink.table.types.utils.DataTypeUtils; -import static org.apache.flink.table.data.util.DataFormatConverters.getConverterForDataType; -import static org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext.InternalConversionClassTransformation.INTERNAL_CLASS_TRANSFORM; import static org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo; -import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass; /** - * Implementation of {@link DynamicTableSink.Context}. Currently we delegate - * {@link #createDataStructureConverter} to {@link DataFormatConverters.DataFormatConverter}. - * - *

In the future, we can code generate the implementation of {@link #createDataStructureConverter} - * for better performance. + * Implementation of {@link DynamicTableSink.Context}. */ -public class SinkRuntimeProviderContext implements DynamicTableSink.Context { +@Internal +public final class SinkRuntimeProviderContext implements DynamicTableSink.Context { private final boolean isBounded; @@ -52,23 +47,14 @@ public class SinkRuntimeProviderContext implements DynamicTableSink.Context { @Override public TypeInformation createTypeInformation(DataType consumedDataType) { - DataType internalDataType = DataTypeUtils.transform(consumedDataType, INTERNAL_CLASS_TRANSFORM); + final DataType internalDataType = DataTypeUtils.transform( + consumedDataType, + TypeTransformations.TO_INTERNAL_CLASS); return fromDataTypeToTypeInfo(internalDataType); } - @SuppressWarnings("unchecked") @Override public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) { - DataFormatConverters.DataFormatConverter converter = getConverterForDataType(consumedDataType); - return new DataFormatConverterWrapper(converter); - } - - static final class InternalConversionClassTransformation implements TypeTransformation { - static final TypeTransformation INTERNAL_CLASS_TRANSFORM = new InternalConversionClassTransformation(); - @Override - public DataType transform(DataType typeToTransform) { - Class internalClass = toInternalConversionClass(typeToTransform.getLogicalType()); - return typeToTransform.bridgedTo(internalClass); - } + return new DataStructureConverterWrapper(DataStructureConverters.getConverter(consumedDataType)); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/DataFormatConverterWrapper.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/DataStructureConverterWrapper.java similarity index 60% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/DataFormatConverterWrapper.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/DataStructureConverterWrapper.java index 1403f753225d752a392ecd2728726185c2eee87d..0f9bf0e35e9c9004099634aa59afea3bfcf0b176 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/DataFormatConverterWrapper.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/DataStructureConverterWrapper.java @@ -18,34 +18,36 @@ package org.apache.flink.table.runtime.connector.source; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.connector.RuntimeConverter; import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter; +import org.apache.flink.table.data.conversion.DataStructureConverter; import javax.annotation.Nullable; /** - * This class wraps a {@link DataFormatConverter} so it can be used in - * a {@link DynamicTableSource} as a {@link DynamicTableSource.DataStructureConverter}. + * Implementation of {@link DynamicTableSource.DataStructureConverter}. + * + *

It wraps the internal {@link DataStructureConverter}. */ -public final class DataFormatConverterWrapper implements DynamicTableSource.DataStructureConverter { +@Internal +class DataStructureConverterWrapper implements DynamicTableSource.DataStructureConverter { private static final long serialVersionUID = 1L; - private final DataFormatConverter formatConverter; + private final DataStructureConverter structureConverter; - public DataFormatConverterWrapper(DataFormatConverter formatConverter) { - this.formatConverter = formatConverter; + DataStructureConverterWrapper(DataStructureConverter structureConverter) { + this.structureConverter = structureConverter; } @Override public void open(RuntimeConverter.Context context) { - // do nothing + structureConverter.open(context.getClassLoader()); } - @Nullable @Override - public Object toInternal(@Nullable Object externalStructure) { - return formatConverter.toInternal(externalStructure); + public @Nullable Object toInternal(@Nullable Object externalStructure) { + return structureConverter.toInternalOrNull(externalStructure); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java index 7e8bc5c8eafcb3e3898abd45e113fbee036e026a..4fd50292a7b1cf0d7a67950756eb1b0138dc6f64 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java @@ -18,21 +18,17 @@ package org.apache.flink.table.runtime.connector.source; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter; import org.apache.flink.table.connector.source.LookupTableSource; -import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.types.DataType; -import static org.apache.flink.table.data.util.DataFormatConverters.getConverterForDataType; - /** - * Implementation of {@link LookupTableSource.Context}. Currently we delegate - * {@link #createDataStructureConverter} to {@link DataFormatConverter}. - * - *

In the future, we can code generate the implementation of {@link #createDataStructureConverter} - * for better performance. + * Implementation of {@link LookupTableSource.Context}. */ -public class LookupRuntimeProviderContext implements LookupTableSource.Context { +@Internal +public final class LookupRuntimeProviderContext implements LookupTableSource.Context { private final int[][] lookupKeys; @@ -45,10 +41,8 @@ public class LookupRuntimeProviderContext implements LookupTableSource.Context { return lookupKeys; } - @SuppressWarnings("unchecked") @Override public DataStructureConverter createDataStructureConverter(DataType producedDataType) { - DataFormatConverter converter = getConverterForDataType(producedDataType); - return new DataFormatConverterWrapper(converter); + return new DataStructureConverterWrapper(DataStructureConverters.getConverter(producedDataType)); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java index 7d7035f0d00cc90bd51e37e2a3c520881bb8dafa..8de2eb5a30c3e72548f9dae29debb1aec9a0c170 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java @@ -18,51 +18,35 @@ package org.apache.flink.table.runtime.connector.source; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter; import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter; -import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.TypeTransformation; +import org.apache.flink.table.types.inference.TypeTransformations; import org.apache.flink.table.types.utils.DataTypeUtils; -import static org.apache.flink.table.data.util.DataFormatConverters.getConverterForDataType; -import static org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext.InternalConversionClassTransformation.INTERNAL_CLASS_TRANSFORM; import static org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo; -import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass; /** - * Implementation of {@link ScanTableSource.Context}. Currently we delegate - * {@link #createTypeInformation} and {@link #createDataStructureConverter} to - * {@link TypeInfoDataTypeConverter} and {@link DataFormatConverter}. - * - *

In the future, we can code generate the implementation of {@link #createDataStructureConverter} - * for better performance. + * Implementation of {@link ScanTableSource.Context}. */ -public class ScanRuntimeProviderContext implements ScanTableSource.Context { +@Internal +public final class ScanRuntimeProviderContext implements ScanTableSource.Context { public static final ScanRuntimeProviderContext INSTANCE = new ScanRuntimeProviderContext(); @Override public TypeInformation createTypeInformation(DataType producedDataType) { - DataType internalDataType = DataTypeUtils.transform(producedDataType, INTERNAL_CLASS_TRANSFORM); + final DataType internalDataType = DataTypeUtils.transform( + producedDataType, + TypeTransformations.TO_INTERNAL_CLASS); return fromDataTypeToTypeInfo(internalDataType); } - @SuppressWarnings("unchecked") @Override public DataStructureConverter createDataStructureConverter(DataType producedDataType) { - DataFormatConverter converter = getConverterForDataType(producedDataType); - return new DataFormatConverterWrapper(converter); - } - - static final class InternalConversionClassTransformation implements TypeTransformation { - static final TypeTransformation INTERNAL_CLASS_TRANSFORM = new InternalConversionClassTransformation(); - @Override - public DataType transform(DataType typeToTransform) { - Class internalClass = toInternalConversionClass(typeToTransform.getLogicalType()); - return typeToTransform.bridgedTo(internalClass); - } + return new DataStructureConverterWrapper(DataStructureConverters.getConverter(producedDataType)); } }