提交 1a6b4046 编写于 作者: M mbalassi

[streaming] ReadTextFile re-implemented as RichFunction and cleanup

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
上级 510a8113
......@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
......@@ -32,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
......@@ -84,7 +82,7 @@ public class JobGraphBuilder {
private Map<String, Integer> iterationTailCount;
private Map<String, Long> iterationWaitTime;
private Map<String, Map<String, OperatorState<?>>> operatorStates;
private Map<String, UserCodeWrapper<? extends InputFormat<String, ?>>> sources;
private Map<String, InputFormat<String, ?>> inputFormatList;
/**
* Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
......@@ -117,7 +115,7 @@ public class JobGraphBuilder {
iterationTailCount = new HashMap<String, Integer>();
iterationWaitTime = new HashMap<String, Long>();
operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
sources = new HashMap<String, UserCodeWrapper<? extends InputFormat<String, ?>>>();
inputFormatList = new HashMap<String, InputFormat<String, ?>>();
if (LOG.isDebugEnabled()) {
LOG.debug("JobGraph created");
......@@ -162,28 +160,34 @@ public class JobGraphBuilder {
}
}
/**
* Adds a source vertex to the streaming JobGraph with the given parameters
*
* @param vertexName
* Name of the vertex
* @param function
* User defined function
* @param inTypeInfo
* Input type for serialization
* @param outTypeInfo
* Output type for serialization
* @param operatorName
* Operator type
* @param serializedFunction
* Serialized udf
* @param parallelism
* Number of parallel instances created
*/
public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> function,
TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName,
TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName,
byte[] serializedFunction, int parallelism) {
StreamInvokable<OUT, OUT> invokableObject = new SourceInvokable<OUT>(function);
@SuppressWarnings("unchecked")
StreamInvokable<IN, OUT> invokableObject = (StreamInvokable<IN, OUT>) new SourceInvokable<OUT>(
function);
addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName,
serializedFunction, parallelism);
StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
inTypeInfo) : null;
StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
outTypeInfo) : null;
addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
sources.put(vertexName, function.getFormatWrapper());
System.out.println(sources);
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexName);
}
}
/**
......@@ -283,7 +287,7 @@ public class JobGraphBuilder {
* Name of the vertex
* @param vertexClass
* The class of the vertex
* @param invokableObject
* @param invokableObjectject
* The user defined invokable object
* @param operatorName
* Type of the user defined operator
......@@ -372,16 +376,8 @@ public class JobGraphBuilder {
config.setIterationWaitTime(iterationWaitTime.get(vertexName));
}
if (sources.containsKey(vertexName)) {
TaskConfig taskConfig = new TaskConfig(vertex.getConfiguration());
// TypeInformation<?> OutTypeInfo =
// typeWrapperOut1.get(vertexName).getTypeInfo();
InputFormat<String, ?> format = sources.get(vertexName).getUserCodeObject();
vertex.setInputSplitSource(sources.get(vertexName).getUserCodeObject());
// taskConfig.setOutputSerializer(createSerializer(OutTypeInfo));
format.configure(taskConfig.getStubParameters());
// TaskConfig(vertex.getConfiguration());
// taskConfig.setStubWrapper(sources.get(vertexName));
if (inputFormatList.containsKey(vertexName)) {
vertex.setInputSplitSource(inputFormatList.get(vertexName));
}
streamVertices.put(vertexName, vertex);
......@@ -438,6 +434,19 @@ public class JobGraphBuilder {
vertexParallelism.put(vertexName, parallelism);
}
/**
* Sets the input format for the given vertex.
*
* @param vertexName
* Name of the vertex
* @param inputFormat
* input format of the file source associated with the given
* vertex
*/
public void setInputFormat(String vertexName, InputFormat<String, ?> inputFormat) {
inputFormatList.put(vertexName, inputFormat);
}
public void setMutability(String vertexName, boolean isMutable) {
mutability.put(vertexName, isMutable);
}
......
......@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
......@@ -131,6 +132,13 @@ public abstract class StreamExecutionEnvironment {
return this;
}
/**
* Sets the maximum time frequency (milliseconds) for the flushing of the
* output buffers. For clarification on the extremal values see
* {@link #setBufferTimeout(long)}.
*
* @return The timeout of the buffer.
*/
public long getBufferTimeout() {
return this.bufferTimeout;
}
......@@ -162,36 +170,16 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the text file.
*/
public DataStreamSource<String> readTextFile(String filePath) {
// checkIfFileExists(filePath);
Validate.notNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the system's default
* character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @param parallelism
* degree of parallelism
* @return The DataStream representing the text file.
*/
public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
Validate.notNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO),
parallelism);
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
return addFileSource(format, typeInfo);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the given
* character set.
* given file line wise. The file will be read with the given character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
......@@ -201,51 +189,11 @@ public abstract class StreamExecutionEnvironment {
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Validate.notNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
}
// public DataStreamSource<StringValue> readTextFileWithValue(String
// filePath) {
// Validate.notNull(filePath, "The file path may not be null.");
// TextValueInputFormat format = new TextValueInputFormat(new
// Path(filePath));
// return addSource(new FileSourceFunction<StringValue>(format,
// new ValueTypeInfo<StringValue>(StringValue.class)), 1);
// }
//
// public DataStreamSource<StringValue> readTextFileWithValue(String
// filePath, String charsetName,
// boolean skipInvalidLines) {
// Validate.notNull(filePath, "The file path may not be null.");
// TextValueInputFormat format = new TextValueInputFormat(new
// Path(filePath));
// format.setCharsetName(charsetName);
// format.setSkipInvalidLines(skipInvalidLines);
// return addSource(new FileSourceFunction<StringValue>(format,
// new ValueTypeInfo<StringValue>(StringValue.class)), 1);
// }
//
// public <X> DataStreamSource<X> readFile(FileInputFormat<X> format, String
// filePath) {
// if (format == null) {
// throw new IllegalArgumentException("InputFormat must not be null.");
// }
// if (filePath == null) {
// throw new IllegalArgumentException("The file path must not be null.");
// }
//
// format.setFilePath(new Path(filePath));
// try {
// return addSource(
// new FileSourceFunction<X>(format,
// TypeExtractor.getInputFormatTypes(format)), 1);
// } catch (Exception e) {
// throw new InvalidProgramException(
// "The type returned by the input format could not be automatically determined. "
// + "Please specify the TypeInformation of the produced type explicitly.");
// }
// }
return addFileSource(format, typeInfo);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
......@@ -424,6 +372,15 @@ public abstract class StreamExecutionEnvironment {
return returnStream;
}
private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
TypeInformation<String> typeInfo) {
FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
DataStreamSource<String> returnStream = addSource(function);
jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat);
return returnStream;
}
// --------------------------------------------------------------------------------------------
// Instantiation of Execution Contexts
// --------------------------------------------------------------------------------------------
......
......@@ -21,47 +21,31 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
public class FileSourceFunction extends SourceFunction<String> {
public class FileSourceFunction extends RichSourceFunction<String> {
private static final long serialVersionUID = 1L;
private InputSplitProvider provider;
private InputFormat<String, ?> format;
private InputFormat<String, ?> inputFormat;
private TypeSerializerFactory<String> serializerFactory;
private UserCodeWrapper<? extends InputFormat<String, ?>> formatWrapper;
// cancel flag
private volatile boolean taskCanceled = false;
public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
this.format = format;
@SuppressWarnings({ "unchecked", "rawtypes" })
GenericDataSourceBase<String, ?> source = new GenericDataSourceBase(format,
new OperatorInformation<String>(typeInfo), format.toString());
formatWrapper = source.getUserCodeWrapper();
this.inputFormat = format;
this.serializerFactory = createSerializer(typeInfo);
}
@Override
public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
return this.formatWrapper;
}
private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) {
TypeSerializer<String> serializer = typeInfo.createSerializer();
......@@ -73,21 +57,28 @@ public class FileSourceFunction extends SourceFunction<String> {
}
}
@Override
public void open(Configuration parameters) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
this.provider = context.getInputSplitProvider();
inputFormat.configure(context.getTaskStubParameters());
}
@Override
public void invoke(Collector<String> collector) throws Exception {
final TypeSerializer<String> serializer = serializerFactory.getSerializer();
final Iterator<InputSplit> splitIterator = getInputSplits();
@SuppressWarnings("unchecked")
final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.format;
final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
try {
while (!this.taskCanceled && splitIterator.hasNext()) {
while (splitIterator.hasNext()) {
final InputSplit split = splitIterator.next();
String record = serializer.createInstance();
format.open(split);
try {
while (!this.taskCanceled && !format.reachedEnd()) {
while (!format.reachedEnd()) {
if ((record = format.nextRecord(record)) != null) {
collector.collect(record);
}
......@@ -148,10 +139,4 @@ public class FileSourceFunction extends SourceFunction<String> {
}
};
}
@Override
public final void initialize(Environment env) {
this.provider = env.getInputSplitProvider();
}
}
......@@ -21,11 +21,9 @@ import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.util.Collector;
public class FileStreamFunction extends SourceFunction<String> {
public class FileStreamFunction implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
private final String path;
......@@ -48,9 +46,4 @@ public class FileStreamFunction extends SourceFunction<String> {
br.close();
}
}
@Override
public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
return null;
}
}
......@@ -22,7 +22,7 @@ import java.util.Collection;
import org.apache.flink.util.Collector;
public class FromElementsFunction<T> extends SourceFunction<T> {
public class FromElementsFunction<T> implements SourceFunction<T> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
......
......@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
* Source Function used to generate the number sequence
*
*/
public class GenSequenceFunction extends SourceFunction<Long> {
public class GenSequenceFunction implements SourceFunction<Long> {
private static final long serialVersionUID = 1L;
......
......@@ -20,17 +20,10 @@ package org.apache.flink.streaming.api.function.source;
import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.util.Collector;
public abstract class SourceFunction<OUT> implements Function, Serializable {
private static final long serialVersionUID = 1L;
public interface SourceFunction<OUT> extends Function, Serializable {
public abstract void invoke(Collector<OUT> collector) throws Exception;
public void initialize(Environment env){}
public abstract UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper();
public void invoke(Collector<OUT> collector) throws Exception;
}
......@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
......
......@@ -76,9 +76,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
}
protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
if (userInvokable.getSourceFunction() != null) {
userInvokable.getSourceFunction().initialize(getEnvironment());
}
userInvokable.setRuntimeContext(context);
userInvokable.open(getTaskConfiguration());
userInvokable.invoke();
......@@ -107,8 +104,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
public StreamingRuntimeContext createRuntimeContext(String taskName,
Map<String, OperatorState<?>> states) {
Environment env = getEnvironment();
return new StreamingRuntimeContext(taskName, env.getCurrentNumberOfSubtasks(),
env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), states, env.getCopyTask());
return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(), states);
}
@Override
......
......@@ -19,11 +19,13 @@
package org.apache.flink.streaming.api.streamvertex;
import java.util.Map;
import java.util.concurrent.FutureTask;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.state.OperatorState;
/**
......@@ -32,25 +34,20 @@ import org.apache.flink.streaming.state.OperatorState;
*/
public class StreamingRuntimeContext extends RuntimeUDFContext {
private Environment env;
private final Map<String, OperatorState<?>> operatorStates;
public StreamingRuntimeContext(String name, int numParallelSubtasks, int subtaskIndex,
ClassLoader userCodeClassLoader, Map<String, OperatorState<?>> operatorStates) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader);
this.operatorStates = operatorStates;
}
public StreamingRuntimeContext(String name, int numParallelSubtasks, int subtaskIndex,
ClassLoader userCodeClassLoader, Map<String, OperatorState<?>> operatorStates,
Map<String, FutureTask<Path>> cpTasks) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, cpTasks);
public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
Map<String, OperatorState<?>> operatorStates) {
super(name, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
userCodeClassLoader, env.getCopyTask());
this.env = env;
this.operatorStates = operatorStates;
}
/**
* Returns the operator state registered by the given name for the operator.
*
*
* @param name
* Name of the operator state to be returned.
* @return The operator state.
......@@ -69,4 +66,23 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
}
/**
* Returns the input split provider associated with the operator.
*
* @return The input split provider.
*/
public InputSplitProvider getInputSplitProvider() {
return env.getInputSplitProvider();
}
/**
* Returns the stub parameters associated with the {@link TaskConfig} of the
* operator.
*
* @return The stub parameters.
*/
public Configuration getTaskStubParameters() {
return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
}
}
......@@ -116,12 +116,6 @@ public class StreamVertexTest {
fail();
} catch (RuntimeException e) {
}
try {
env.readTextFile("random/path/that/is/not/valid");
fail();
} catch (IllegalArgumentException e) {
}
}
private static class CoMap implements CoMapFunction<String, Long, String> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册