提交 adfd45fb 编写于 作者: T Timo Walther

[FLINK-16999][table] Enable new data structure converters for connectors

上级 1f668dd3
......@@ -43,17 +43,25 @@ public interface RuntimeConverter extends Serializable {
/**
* Context for conversions during runtime.
*
* <p>Meant for future extensibility. Use {@link Context#empty()} for now.
*/
interface Context {
/**
* Empty context.
* Runtime classloader for loading user-defined classes.
*/
ClassLoader getClassLoader();
/**
* Creates a new instance of {@link Context}.
*
* @param classLoader runtime classloader for loading user-defined classes.
*/
static Context empty() {
static Context create(ClassLoader classLoader) {
return new Context() {
// nothing to do
@Override
public ClassLoader getClassLoader() {
return classLoader;
}
};
}
}
......
......@@ -31,13 +31,21 @@ import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass;
/**
* Transformations for transforming one data type to another.
*
* @see TypeTransformation
*/
@Internal
public class TypeTransformations {
public final class TypeTransformations {
/**
* Transformation that uses internal data structures for all conversion classes.
*/
public static final TypeTransformation TO_INTERNAL_CLASS =
(dataType) -> dataType.bridgedTo(toInternalConversionClass(dataType.getLogicalType()));
/**
* Returns a type transformation that transforms data type to a new data type whose conversion
......@@ -73,4 +81,10 @@ public class TypeTransformations {
public static TypeTransformation toNullable() {
return DataType::nullable;
}
// --------------------------------------------------------------------------------------------
private TypeTransformations() {
// no instantiation
}
}
......@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
......@@ -376,6 +377,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
.createTypeInformation(physicalRowDataType)
.createSerializer(new ExecutionConfig());
DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
Collection<RowData> values = convertToRowData(data, converter);
if (runtimeSource.equals("SourceFunction")) {
......
......@@ -18,33 +18,35 @@
package org.apache.flink.table.runtime.connector.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import javax.annotation.Nullable;
/**
* This class wraps a {@link DataFormatConverter} so it can be used in
* a {@link DynamicTableSink} as a {@link DynamicTableSink.DataStructureConverter}.
* Implementation of {@link DynamicTableSink.DataStructureConverter}.
*
* <p>It wraps the internal {@link DataStructureConverter}.
*/
public final class DataFormatConverterWrapper implements DynamicTableSink.DataStructureConverter {
@Internal
class DataStructureConverterWrapper implements DynamicTableSink.DataStructureConverter {
private static final long serialVersionUID = 1L;
private final DataFormatConverter<Object, Object> formatConverter;
private final DataStructureConverter<Object, Object> structureConverter;
public DataFormatConverterWrapper(DataFormatConverter<Object, Object> formatConverter) {
this.formatConverter = formatConverter;
DataStructureConverterWrapper(DataStructureConverter<Object, Object> structureConverter) {
this.structureConverter = structureConverter;
}
@Override
public void open(Context context) {
// do nothing
structureConverter.open(context.getClassLoader());
}
@Nullable
@Override
public Object toExternal(@Nullable Object internalStructure) {
return formatConverter.toExternal(internalStructure);
public @Nullable Object toExternal(@Nullable Object internalStructure) {
return structureConverter.toExternalOrNull(internalStructure);
}
}
......@@ -18,26 +18,21 @@
package org.apache.flink.table.runtime.connector.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformation;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.utils.DataTypeUtils;
import static org.apache.flink.table.data.util.DataFormatConverters.getConverterForDataType;
import static org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext.InternalConversionClassTransformation.INTERNAL_CLASS_TRANSFORM;
import static org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo;
import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass;
/**
* Implementation of {@link DynamicTableSink.Context}. Currently we delegate
* {@link #createDataStructureConverter} to {@link DataFormatConverters.DataFormatConverter}.
*
* <p>In the future, we can code generate the implementation of {@link #createDataStructureConverter}
* for better performance.
* Implementation of {@link DynamicTableSink.Context}.
*/
public class SinkRuntimeProviderContext implements DynamicTableSink.Context {
@Internal
public final class SinkRuntimeProviderContext implements DynamicTableSink.Context {
private final boolean isBounded;
......@@ -52,23 +47,14 @@ public class SinkRuntimeProviderContext implements DynamicTableSink.Context {
@Override
public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
DataType internalDataType = DataTypeUtils.transform(consumedDataType, INTERNAL_CLASS_TRANSFORM);
final DataType internalDataType = DataTypeUtils.transform(
consumedDataType,
TypeTransformations.TO_INTERNAL_CLASS);
return fromDataTypeToTypeInfo(internalDataType);
}
@SuppressWarnings("unchecked")
@Override
public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) {
DataFormatConverters.DataFormatConverter<Object, Object> converter = getConverterForDataType(consumedDataType);
return new DataFormatConverterWrapper(converter);
}
static final class InternalConversionClassTransformation implements TypeTransformation {
static final TypeTransformation INTERNAL_CLASS_TRANSFORM = new InternalConversionClassTransformation();
@Override
public DataType transform(DataType typeToTransform) {
Class<?> internalClass = toInternalConversionClass(typeToTransform.getLogicalType());
return typeToTransform.bridgedTo(internalClass);
}
return new DataStructureConverterWrapper(DataStructureConverters.getConverter(consumedDataType));
}
}
......@@ -18,34 +18,36 @@
package org.apache.flink.table.runtime.connector.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import javax.annotation.Nullable;
/**
* This class wraps a {@link DataFormatConverter} so it can be used in
* a {@link DynamicTableSource} as a {@link DynamicTableSource.DataStructureConverter}.
* Implementation of {@link DynamicTableSource.DataStructureConverter}.
*
* <p>It wraps the internal {@link DataStructureConverter}.
*/
public final class DataFormatConverterWrapper implements DynamicTableSource.DataStructureConverter {
@Internal
class DataStructureConverterWrapper implements DynamicTableSource.DataStructureConverter {
private static final long serialVersionUID = 1L;
private final DataFormatConverter<Object, Object> formatConverter;
private final DataStructureConverter<Object, Object> structureConverter;
public DataFormatConverterWrapper(DataFormatConverter<Object, Object> formatConverter) {
this.formatConverter = formatConverter;
DataStructureConverterWrapper(DataStructureConverter<Object, Object> structureConverter) {
this.structureConverter = structureConverter;
}
@Override
public void open(RuntimeConverter.Context context) {
// do nothing
structureConverter.open(context.getClassLoader());
}
@Nullable
@Override
public Object toInternal(@Nullable Object externalStructure) {
return formatConverter.toInternal(externalStructure);
public @Nullable Object toInternal(@Nullable Object externalStructure) {
return structureConverter.toInternalOrNull(externalStructure);
}
}
......@@ -18,21 +18,17 @@
package org.apache.flink.table.runtime.connector.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.types.DataType;
import static org.apache.flink.table.data.util.DataFormatConverters.getConverterForDataType;
/**
* Implementation of {@link LookupTableSource.Context}. Currently we delegate
* {@link #createDataStructureConverter} to {@link DataFormatConverter}.
*
* <p>In the future, we can code generate the implementation of {@link #createDataStructureConverter}
* for better performance.
* Implementation of {@link LookupTableSource.Context}.
*/
public class LookupRuntimeProviderContext implements LookupTableSource.Context {
@Internal
public final class LookupRuntimeProviderContext implements LookupTableSource.Context {
private final int[][] lookupKeys;
......@@ -45,10 +41,8 @@ public class LookupRuntimeProviderContext implements LookupTableSource.Context {
return lookupKeys;
}
@SuppressWarnings("unchecked")
@Override
public DataStructureConverter createDataStructureConverter(DataType producedDataType) {
DataFormatConverter<Object, Object> converter = getConverterForDataType(producedDataType);
return new DataFormatConverterWrapper(converter);
return new DataStructureConverterWrapper(DataStructureConverters.getConverter(producedDataType));
}
}
......@@ -18,51 +18,35 @@
package org.apache.flink.table.runtime.connector.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformation;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.utils.DataTypeUtils;
import static org.apache.flink.table.data.util.DataFormatConverters.getConverterForDataType;
import static org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext.InternalConversionClassTransformation.INTERNAL_CLASS_TRANSFORM;
import static org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo;
import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass;
/**
* Implementation of {@link ScanTableSource.Context}. Currently we delegate
* {@link #createTypeInformation} and {@link #createDataStructureConverter} to
* {@link TypeInfoDataTypeConverter} and {@link DataFormatConverter}.
*
* <p>In the future, we can code generate the implementation of {@link #createDataStructureConverter}
* for better performance.
* Implementation of {@link ScanTableSource.Context}.
*/
public class ScanRuntimeProviderContext implements ScanTableSource.Context {
@Internal
public final class ScanRuntimeProviderContext implements ScanTableSource.Context {
public static final ScanRuntimeProviderContext INSTANCE = new ScanRuntimeProviderContext();
@Override
public TypeInformation<?> createTypeInformation(DataType producedDataType) {
DataType internalDataType = DataTypeUtils.transform(producedDataType, INTERNAL_CLASS_TRANSFORM);
final DataType internalDataType = DataTypeUtils.transform(
producedDataType,
TypeTransformations.TO_INTERNAL_CLASS);
return fromDataTypeToTypeInfo(internalDataType);
}
@SuppressWarnings("unchecked")
@Override
public DataStructureConverter createDataStructureConverter(DataType producedDataType) {
DataFormatConverter<Object, Object> converter = getConverterForDataType(producedDataType);
return new DataFormatConverterWrapper(converter);
}
static final class InternalConversionClassTransformation implements TypeTransformation {
static final TypeTransformation INTERNAL_CLASS_TRANSFORM = new InternalConversionClassTransformation();
@Override
public DataType transform(DataType typeToTransform) {
Class<?> internalClass = toInternalConversionClass(typeToTransform.getLogicalType());
return typeToTransform.bridgedTo(internalClass);
}
return new DataStructureConverterWrapper(DataStructureConverters.getConverter(producedDataType));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册