未验证 提交 646be5a4 编写于 作者: J Jark Wu 提交者: GitHub

[FLINK-17633][table-common] Improve FactoryUtil to align with new format option keys

This closes #12099
上级 5324b0c5
...@@ -71,11 +71,30 @@ public final class FactoryUtil { ...@@ -71,11 +71,30 @@ public final class FactoryUtil {
"Uniquely identifies the connector of a dynamic table that is used for accessing data in " + "Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
"an external system. Its value is used during table source and table sink discovery."); "an external system. Its value is used during table source and table sink discovery.");
public static final String FORMAT_PREFIX = "format."; public static final ConfigOption<String> KEY_FORMAT = ConfigOptions
.key("key.format")
.stringType()
.noDefaultValue()
.withDescription("Defines the format identifier for encoding key data. " +
"The identifier is used to discover a suitable format factory.");
public static final String KEY_FORMAT_PREFIX = "key.format."; public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions
.key("value.format")
.stringType()
.noDefaultValue()
.withDescription("Defines the format identifier for encoding value data. " +
"The identifier is used to discover a suitable format factory.");
public static final ConfigOption<String> FORMAT = ConfigOptions
.key("format")
.stringType()
.noDefaultValue()
.withDescription("Defines the format identifier for encoding data. " +
"The identifier is used to discover a suitable format factory.");
public static final String VALUE_FORMAT_PREFIX = "value.format."; private static final String FORMAT_KEY = "format";
private static final String FORMAT_SUFFIX = ".format";
/** /**
* Creates a {@link DynamicTableSource} from a {@link CatalogTable}. * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
...@@ -162,12 +181,20 @@ public final class FactoryUtil { ...@@ -162,12 +181,20 @@ public final class FactoryUtil {
* <pre>{@code * <pre>{@code
* // in createDynamicTableSource() * // in createDynamicTableSource()
* helper = FactoryUtil.createTableFactoryHelper(this, context); * helper = FactoryUtil.createTableFactoryHelper(this, context);
* keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix"); * keyFormat = helper.discoverScanFormat(DeserializationFormatFactory.class, KEY_FORMAT);
* valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix"); * valueFormat = helper.discoverScanFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
* helper.validate(); * helper.validate();
* ... // construct connector with discovered formats * ... // construct connector with discovered formats
* }</pre> * }</pre>
* *
* <p>Note: The format option parameter of {@code helper.discoverScanFormat(formatFactoryClass, formatOption)}
* and {@code helper.discoverSinkFormat(formatFactoryClass, formatOption)} must be 'format' or
* with '.format' suffix (e.g. {@link #FORMAT}, {@link #KEY_FORMAT} and {@link #VALUE_FORMAT}).
* The discovery logic will replace 'format' with the factory identifier value as the format
* prefix. For example, assuming the identifier is 'json', if format option key is 'format',
* then format prefix is 'json.'. If format option key is 'value.format', then format prefix
* is 'value.json'. The format prefix is used to project the options for the format factory.
*
* <p>Note: This utility checks for left-over options in the final step. * <p>Note: This utility checks for left-over options in the final step.
*/ */
public static TableFactoryHelper createTableFactoryHelper( public static TableFactoryHelper createTableFactoryHelper(
...@@ -377,14 +404,11 @@ public final class FactoryUtil { ...@@ -377,14 +404,11 @@ public final class FactoryUtil {
/** /**
* Discovers a {@link ScanFormat} of the given type using the given option as factory identifier. * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier.
*
* <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
*/ */
public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat( public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat(
Class<F> formatFactoryClass, Class<F> formatFactoryClass,
ConfigOption<String> formatOption, ConfigOption<String> formatOption) {
String formatPrefix) { return discoverOptionalScanFormat(formatFactoryClass, formatOption)
return discoverOptionalScanFormat(formatFactoryClass, formatOption, formatPrefix)
.orElseThrow(() -> .orElseThrow(() ->
new ValidationException( new ValidationException(
String.format("Could not find required scan format '%s'.", formatOption.key()))); String.format("Could not find required scan format '%s'.", formatOption.key())));
...@@ -393,15 +417,13 @@ public final class FactoryUtil { ...@@ -393,15 +417,13 @@ public final class FactoryUtil {
/** /**
* Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory * Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory
* identifier. * identifier.
*
* <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
*/ */
public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat( public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat(
Class<F> formatFactoryClass, Class<F> formatFactoryClass,
ConfigOption<String> formatOption, ConfigOption<String> formatOption) {
String formatPrefix) { return discoverOptionalFormatFactory(formatFactoryClass, formatOption)
return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
.map(formatFactory -> { .map(formatFactory -> {
String formatPrefix = formatPrefix(formatFactory, formatOption);
try { try {
return formatFactory.createScanFormat(context, projectOptions(formatPrefix)); return formatFactory.createScanFormat(context, projectOptions(formatPrefix));
} catch (Throwable t) { } catch (Throwable t) {
...@@ -417,14 +439,11 @@ public final class FactoryUtil { ...@@ -417,14 +439,11 @@ public final class FactoryUtil {
/** /**
* Discovers a {@link SinkFormat} of the given type using the given option as factory identifier. * Discovers a {@link SinkFormat} of the given type using the given option as factory identifier.
*
* <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
*/ */
public <I, F extends SinkFormatFactory<I>> SinkFormat<I> discoverSinkFormat( public <I, F extends SinkFormatFactory<I>> SinkFormat<I> discoverSinkFormat(
Class<F> formatFactoryClass, Class<F> formatFactoryClass,
ConfigOption<String> formatOption, ConfigOption<String> formatOption) {
String formatPrefix) { return discoverOptionalSinkFormat(formatFactoryClass, formatOption)
return discoverOptionalSinkFormat(formatFactoryClass, formatOption, formatPrefix)
.orElseThrow(() -> .orElseThrow(() ->
new ValidationException( new ValidationException(
String.format("Could not find required sink format '%s'.", formatOption.key()))); String.format("Could not find required sink format '%s'.", formatOption.key())));
...@@ -433,15 +452,13 @@ public final class FactoryUtil { ...@@ -433,15 +452,13 @@ public final class FactoryUtil {
/** /**
* Discovers a {@link SinkFormat} of the given type using the given option (if present) as factory * Discovers a {@link SinkFormat} of the given type using the given option (if present) as factory
* identifier. * identifier.
*
* <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
*/ */
public <I, F extends SinkFormatFactory<I>> Optional<SinkFormat<I>> discoverOptionalSinkFormat( public <I, F extends SinkFormatFactory<I>> Optional<SinkFormat<I>> discoverOptionalSinkFormat(
Class<F> formatFactoryClass, Class<F> formatFactoryClass,
ConfigOption<String> formatOption, ConfigOption<String> formatOption) {
String formatPrefix) { return discoverOptionalFormatFactory(formatFactoryClass, formatOption)
return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
.map(formatFactory -> { .map(formatFactory -> {
String formatPrefix = formatPrefix(formatFactory, formatOption);
try { try {
return formatFactory.createSinkFormat(context, projectOptions(formatPrefix)); return formatFactory.createSinkFormat(context, projectOptions(formatPrefix));
} catch (Throwable t) { } catch (Throwable t) {
...@@ -492,8 +509,7 @@ public final class FactoryUtil { ...@@ -492,8 +509,7 @@ public final class FactoryUtil {
private <F extends Factory> Optional<F> discoverOptionalFormatFactory( private <F extends Factory> Optional<F> discoverOptionalFormatFactory(
Class<F> formatFactoryClass, Class<F> formatFactoryClass,
ConfigOption<String> formatOption, ConfigOption<String> formatOption) {
String formatPrefix) {
final String identifier = allOptions.get(formatOption); final String identifier = allOptions.get(formatOption);
if (identifier == null) { if (identifier == null) {
return Optional.empty(); return Optional.empty();
...@@ -502,6 +518,7 @@ public final class FactoryUtil { ...@@ -502,6 +518,7 @@ public final class FactoryUtil {
context.getClassLoader(), context.getClassLoader(),
formatFactoryClass, formatFactoryClass,
identifier); identifier);
String formatPrefix = formatPrefix(factory, formatOption);
// log all used options of other factories // log all used options of other factories
consumedOptionKeys.addAll( consumedOptionKeys.addAll(
factory.requiredOptions().stream() factory.requiredOptions().stream()
...@@ -516,6 +533,21 @@ public final class FactoryUtil { ...@@ -516,6 +533,21 @@ public final class FactoryUtil {
return Optional.of(factory); return Optional.of(factory);
} }
private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOption) {
String identifier = formatFactory.factoryIdentifier();
if (formatOption.key().equals(FORMAT_KEY)) {
return identifier + ".";
} else if (formatOption.key().endsWith(FORMAT_SUFFIX)) {
// extract the key prefix, e.g. extract 'key' from 'key.format'
String keyPrefix = formatOption.key().substring(0, formatOption.key().length() - FORMAT_SUFFIX.length());
return keyPrefix + "." + identifier + ".";
} else {
throw new ValidationException(
"Format identifier key should be 'format' or suffix with '.format', " +
"don't support format identifier key '" + formatOption.key() + "'.");
}
}
private ReadableConfig projectOptions(String formatPrefix) { private ReadableConfig projectOptions(String formatPrefix) {
return new DelegatingConfiguration( return new DelegatingConfiguration(
allOptions, allOptions,
......
...@@ -85,8 +85,8 @@ public class FactoryUtilTest { ...@@ -85,8 +85,8 @@ public class FactoryUtilTest {
@Test @Test
public void testMissingFormat() { public void testMissingFormat() {
expectError("Could not find required scan format 'value.format.kind'."); expectError("Could not find required scan format 'value.format'.");
testError(options -> options.remove("value.format.kind")); testError(options -> options.remove("value.format"));
} }
@Test @Test
...@@ -96,24 +96,24 @@ public class FactoryUtilTest { ...@@ -96,24 +96,24 @@ public class FactoryUtilTest {
DeserializationFormatFactory.class.getName() + "' in the classpath.\n\n" + DeserializationFormatFactory.class.getName() + "' in the classpath.\n\n" +
"Available factory identifiers are:\n\n" + "Available factory identifiers are:\n\n" +
"test-format"); "test-format");
testError(options -> options.put("value.format.kind", "FAIL")); testError(options -> options.put("value.format", "FAIL"));
} }
@Test @Test
public void testMissingFormatOption() { public void testMissingFormatOption() {
expectError( expectError(
"Error creating scan format 'test-format' in option space 'key.format.'."); "Error creating scan format 'test-format' in option space 'key.test-format.'.");
expectError( expectError(
"One or more required options are missing.\n\n" + "One or more required options are missing.\n\n" +
"Missing required options are:\n\n" + "Missing required options are:\n\n" +
"delimiter"); "delimiter");
testError(options -> options.remove("key.format.delimiter")); testError(options -> options.remove("key.test-format.delimiter"));
} }
@Test @Test
public void testInvalidFormatOption() { public void testInvalidFormatOption() {
expectError("Invalid value for option 'fail-on-missing'."); expectError("Invalid value for option 'fail-on-missing'.");
testError(options -> options.put("key.format.fail-on-missing", "FAIL")); testError(options -> options.put("key.test-format.fail-on-missing", "FAIL"));
} }
@Test @Test
...@@ -126,14 +126,15 @@ public class FactoryUtilTest { ...@@ -126,14 +126,15 @@ public class FactoryUtilTest {
"Supported options:\n\n" + "Supported options:\n\n" +
"buffer-size\n" + "buffer-size\n" +
"connector\n" + "connector\n" +
"key.format.delimiter\n" + "format\n" +
"key.format.fail-on-missing\n" + "key.format\n" +
"key.format.kind\n" + "key.test-format.delimiter\n" +
"key.test-format.fail-on-missing\n" +
"property-version\n" + "property-version\n" +
"target\n" + "target\n" +
"value.format.delimiter\n" + "value.format\n" +
"value.format.fail-on-missing\n" + "value.test-format.delimiter\n" +
"value.format.kind"); "value.test-format.fail-on-missing");
testError(options -> { testError(options -> {
options.put("this-is-not-consumed", "42"); options.put("this-is-not-consumed", "42");
options.put("this-is-also-not-consumed", "true"); options.put("this-is-also-not-consumed", "true");
...@@ -161,8 +162,8 @@ public class FactoryUtilTest { ...@@ -161,8 +162,8 @@ public class FactoryUtilTest {
@Test @Test
public void testOptionalFormat() { public void testOptionalFormat() {
final Map<String, String> options = createAllOptions(); final Map<String, String> options = createAllOptions();
options.remove("key.format.kind"); options.remove("key.format");
options.remove("key.format.delimiter"); options.remove("key.test-format.delimiter");
final DynamicTableSource actualSource = createTableSource(options); final DynamicTableSource actualSource = createTableSource(options);
final DynamicTableSource expectedSource = new DynamicTableSourceMock( final DynamicTableSource expectedSource = new DynamicTableSourceMock(
"MyTarget", "MyTarget",
...@@ -178,6 +179,30 @@ public class FactoryUtilTest { ...@@ -178,6 +179,30 @@ public class FactoryUtilTest {
assertEquals(expectedSink, actualSink); assertEquals(expectedSink, actualSink);
} }
@Test
public void testAlternativeValueFormat() {
final Map<String, String> options = createAllOptions();
options.remove("value.format");
options.remove("value.test-format.delimiter");
options.remove("value.test-format.fail-on-missing");
options.put("format", "test-format");
options.put("test-format.delimiter", ";");
options.put("test-format.fail-on-missing", "true");
final DynamicTableSource actualSource = createTableSource(options);
final DynamicTableSource expectedSource = new DynamicTableSourceMock(
"MyTarget",
new ScanFormatMock(",", false),
new ScanFormatMock(";", true));
assertEquals(expectedSource, actualSource);
final DynamicTableSink actualSink = createTableSink(options);
final DynamicTableSink expectedSink = new DynamicTableSinkMock(
"MyTarget",
1000L,
new SinkFormatMock(","),
new SinkFormatMock(";"));
assertEquals(expectedSink, actualSink);
}
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
private void expectError(String message) { private void expectError(String message) {
...@@ -198,11 +223,11 @@ public class FactoryUtilTest { ...@@ -198,11 +223,11 @@ public class FactoryUtilTest {
options.put("connector", TestDynamicTableFactory.IDENTIFIER); options.put("connector", TestDynamicTableFactory.IDENTIFIER);
options.put("target", "MyTarget"); options.put("target", "MyTarget");
options.put("buffer-size", "1000"); options.put("buffer-size", "1000");
options.put("key.format.kind", TestFormatFactory.IDENTIFIER); options.put("key.format", "test-format");
options.put("key.format.delimiter", ","); options.put("key.test-format.delimiter", ",");
options.put("value.format.kind", TestFormatFactory.IDENTIFIER); options.put("value.format", "test-format");
options.put("value.format.delimiter", "|"); options.put("value.test-format.delimiter", "|");
options.put("value.format.fail-on-missing", "true"); options.put("value.test-format.fail-on-missing", "true");
return options; return options;
} }
......
...@@ -38,6 +38,10 @@ import java.util.Objects; ...@@ -38,6 +38,10 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.KEY_FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.VALUE_FORMAT;
/** /**
* Test implementations for {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory}. * Test implementations for {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory}.
*/ */
...@@ -55,28 +59,19 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory, ...@@ -55,28 +59,19 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
.longType() .longType()
.defaultValue(100L); .defaultValue(100L);
public static final ConfigOption<String> KEY_FORMAT = ConfigOptions
.key("key.format.kind")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions
.key("value.format.kind")
.stringType()
.noDefaultValue();
@Override @Override
public DynamicTableSource createDynamicTableSource(Context context) { public DynamicTableSource createDynamicTableSource(Context context) {
final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final Optional<ScanFormat<DeserializationSchema<RowData>>> keyFormat = helper.discoverOptionalScanFormat( final Optional<ScanFormat<DeserializationSchema<RowData>>> keyFormat = helper.discoverOptionalScanFormat(
DeserializationFormatFactory.class, DeserializationFormatFactory.class,
KEY_FORMAT, KEY_FORMAT);
FactoryUtil.KEY_FORMAT_PREFIX); final ScanFormat<DeserializationSchema<RowData>> valueFormat = helper.discoverOptionalScanFormat(
final ScanFormat<DeserializationSchema<RowData>> valueFormat = helper.discoverScanFormat(
DeserializationFormatFactory.class, DeserializationFormatFactory.class,
VALUE_FORMAT, FORMAT).orElseGet(
FactoryUtil.VALUE_FORMAT_PREFIX); () -> helper.discoverScanFormat(
DeserializationFormatFactory.class,
VALUE_FORMAT));
helper.validate(); helper.validate();
return new DynamicTableSourceMock( return new DynamicTableSourceMock(
...@@ -91,12 +86,13 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory, ...@@ -91,12 +86,13 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
final Optional<SinkFormat<SerializationSchema<RowData>>> keyFormat = helper.discoverOptionalSinkFormat( final Optional<SinkFormat<SerializationSchema<RowData>>> keyFormat = helper.discoverOptionalSinkFormat(
SerializationFormatFactory.class, SerializationFormatFactory.class,
KEY_FORMAT, KEY_FORMAT);
FactoryUtil.KEY_FORMAT_PREFIX); final SinkFormat<SerializationSchema<RowData>> valueFormat = helper.discoverOptionalSinkFormat(
final SinkFormat<SerializationSchema<RowData>> valueFormat = helper.discoverSinkFormat(
SerializationFormatFactory.class, SerializationFormatFactory.class,
VALUE_FORMAT, FORMAT).orElseGet(
FactoryUtil.VALUE_FORMAT_PREFIX); () -> helper.discoverSinkFormat(
SerializationFormatFactory.class,
VALUE_FORMAT));
helper.validate(); helper.validate();
return new DynamicTableSinkMock( return new DynamicTableSinkMock(
...@@ -115,7 +111,6 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory, ...@@ -115,7 +111,6 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
public Set<ConfigOption<?>> requiredOptions() { public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>(); final Set<ConfigOption<?>> options = new HashSet<>();
options.add(TARGET); options.add(TARGET);
options.add(VALUE_FORMAT);
return options; return options;
} }
...@@ -124,6 +119,8 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory, ...@@ -124,6 +119,8 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
final Set<ConfigOption<?>> options = new HashSet<>(); final Set<ConfigOption<?>> options = new HashSet<>();
options.add(BUFFER_SIZE); options.add(BUFFER_SIZE);
options.add(KEY_FORMAT); options.add(KEY_FORMAT);
options.add(FORMAT);
options.add(VALUE_FORMAT);
return options; return options;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册