提交 6bac9214 编写于 作者: Z zentol 提交者: Stephan Ewen

[FLINK-2351] [core] Remove IOFormat ConfigBuilders

This closes #1420
上级 b80ecfdc
......@@ -24,7 +24,6 @@ import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.operators.base.FileDataSourceBase;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
......@@ -616,98 +615,4 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
* The configuration key to set the number of samples to take for the statistics.
*/
private static final String NUM_STATISTICS_SAMPLES = "delimited-format.numSamples";
// ----------------------------------- Config Builder -----------------------------------------
/**
* Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent
* fashion.
*
* @return A config builder for setting parameters.
*/
public static ConfigBuilder configureDelimitedFormat(FileDataSourceBase<?> target) {
return new ConfigBuilder(target.getParameters());
}
/**
* Abstract builder used to set parameters to the input format's configuration in a fluent way.
*/
protected static class AbstractConfigBuilder<T> extends FileInputFormat.AbstractConfigBuilder<T> {
private static final String NEWLINE_DELIMITER = "\n";
// --------------------------------------------------------------------
/**
* Creates a new builder for the given configuration.
*
* @param config The configuration into which the parameters will be written.
*/
protected AbstractConfigBuilder(Configuration config) {
super(config);
}
// --------------------------------------------------------------------
/**
* Sets the delimiter to be a single character, namely the given one. The character must be within
* the value range <code>0</code> to <code>127</code>.
*
* @param delimiter The delimiter character.
* @return The builder itself.
*/
public T recordDelimiter(char delimiter) {
if (delimiter == '\n') {
this.config.setString(RECORD_DELIMITER, NEWLINE_DELIMITER);
} else {
this.config.setString(RECORD_DELIMITER, String.valueOf(delimiter));
}
@SuppressWarnings("unchecked")
T ret = (T) this;
return ret;
}
/**
* Sets the delimiter to be the given string. The string will be converted to bytes for more efficient
* comparison during input parsing. The conversion will be done using the platforms default charset.
*
* @param delimiter The delimiter string.
* @return The builder itself.
*/
public T recordDelimiter(String delimiter) {
this.config.setString(RECORD_DELIMITER, delimiter);
@SuppressWarnings("unchecked")
T ret = (T) this;
return ret;
}
/**
* Sets the number of line samples to take in order to estimate the base statistics for the
* input format.
*
* @param numSamples The number of line samples to take.
* @return The builder itself.
*/
public T numSamplesForStatistics(int numSamples) {
this.config.setInteger(NUM_STATISTICS_SAMPLES, numSamples);
@SuppressWarnings("unchecked")
T ret = (T) this;
return ret;
}
}
/**
* A builder used to set parameters to the input format's configuration in a fluent way.
*/
public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
/**
* Creates a new builder for the given configuration.
*
* @param targetConfig The configuration into which the parameters will be written.
*/
protected ConfigBuilder(Configuration targetConfig) {
super(targetConfig);
}
}
}
......@@ -34,7 +34,6 @@ import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
......@@ -922,69 +921,4 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
* The config parameter which defines whether input directories are recursively traversed.
*/
public static final String ENUMERATE_NESTED_FILES_FLAG = "recursive.file.enumeration";
// ----------------------------------- Config Builder -----------------------------------------
/**
* Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent
* fashion.
*
* @return A config builder for setting parameters.
*/
public static ConfigBuilder configureFileFormat(GenericDataSourceBase<?, ?> target) {
return new ConfigBuilder(target.getParameters());
}
/**
* Abstract builder used to set parameters to the input format's configuration in a fluent way.
*/
protected static abstract class AbstractConfigBuilder<T> {
/**
* The configuration into which the parameters will be written.
*/
protected final Configuration config;
// --------------------------------------------------------------------
/**
* Creates a new builder for the given configuration.
*
* @param targetConfig The configuration into which the parameters will be written.
*/
protected AbstractConfigBuilder(Configuration targetConfig) {
this.config = targetConfig;
}
// --------------------------------------------------------------------
/**
* Sets the path to the file or directory to be read by this file input format.
*
* @param filePath The path to the file or directory.
* @return The builder itself.
*/
public T filePath(String filePath) {
this.config.setString(FILE_PARAMETER_KEY, filePath);
@SuppressWarnings("unchecked")
T ret = (T) this;
return ret;
}
}
/**
* A builder used to set parameters to the input format's configuration in a fluent way.
*/
public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
/**
* Creates a new builder for the given configuration.
*
* @param targetConfig The configuration into which the parameters will be written.
*/
protected ConfigBuilder(Configuration targetConfig) {
super(targetConfig);
}
}
}
......@@ -23,7 +23,6 @@ import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.operators.base.FileDataSinkBase;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
......@@ -312,53 +311,4 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen
}
}
}
// ============================================================================================
/**
* Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent
* fashion.
*
* @return A config builder for setting parameters.
*/
public static ConfigBuilder configureFileFormat(FileDataSinkBase<?> target) {
return new ConfigBuilder(target.getParameters());
}
/**
* A builder used to set parameters to the output format's configuration in a fluent way.
*/
public static abstract class AbstractConfigBuilder<T> {
/**
* The configuration into which the parameters will be written.
*/
protected final Configuration config;
// --------------------------------------------------------------------
/**
* Creates a new builder for the given configuration.
*
* @param targetConfig The configuration into which the parameters will be written.
*/
protected AbstractConfigBuilder(Configuration targetConfig) {
this.config = targetConfig;
}
}
/**
* A builder used to set parameters to the input format's configuration in a fluent way.
*/
public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
/**
* Creates a new builder for the given configuration.
*
* @param targetConfig The configuration into which the parameters will be written.
*/
protected ConfigBuilder(Configuration targetConfig) {
super(targetConfig);
}
}
}
......@@ -62,34 +62,6 @@ public class FileDataSourceBase<OUT> extends GenericDataSourceBase<OUT, FileInpu
this(f, operatorInfo, Preconditions.checkNotNull(filePath, "The file path may not be null."), "File " + filePath);
}
/**
* Creates a new instance for the given file using the given file input format.
*
* @param f The {@link org.apache.flink.api.common.io.FileInputFormat} implementation used to read the data.
* @param operatorInfo The type information for the output type.
* @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
* @param name The given name for the Pact, used in plans, logs and progress messages.
*/
public FileDataSourceBase(Class<? extends FileInputFormat<OUT>> f, OperatorInformation<OUT> operatorInfo, String filePath, String name) {
super(f, operatorInfo, name);
Preconditions.checkNotNull(filePath, "The file path may not be null.");
this.filePath = filePath;
FileInputFormat.configureFileFormat(this).filePath(filePath);
}
/**
* Creates a new instance for the given file using the given input format. The contract has the default name.
*
* @param f The {@link org.apache.flink.api.common.io.FileInputFormat} implementation used to read the data.
* @param operatorInfo The type information for the output type.
* @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
*/
public FileDataSourceBase(Class<? extends FileInputFormat<OUT>> f, OperatorInformation<OUT> operatorInfo, String filePath) {
this(f, operatorInfo, Preconditions.checkNotNull(filePath, "The file path may not be null."), "File " + filePath);
}
// --------------------------------------------------------------------------------------------
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册