提交 510a8113 编写于 作者: S szape 提交者: mbalassi

[streaming] Basic support reading from local and distributed file systems in readTextFile methods

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
上级 283c398e
......@@ -23,6 +23,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
......@@ -30,7 +32,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
......@@ -79,6 +84,7 @@ public class JobGraphBuilder {
private Map<String, Integer> iterationTailCount;
private Map<String, Long> iterationWaitTime;
private Map<String, Map<String, OperatorState<?>>> operatorStates;
private Map<String, UserCodeWrapper<? extends InputFormat<String, ?>>> sources;
/**
* Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
......@@ -111,6 +117,7 @@ public class JobGraphBuilder {
iterationTailCount = new HashMap<String, Integer>();
iterationWaitTime = new HashMap<String, Long>();
operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
sources = new HashMap<String, UserCodeWrapper<? extends InputFormat<String, ?>>>();
if (LOG.isDebugEnabled()) {
LOG.debug("JobGraph created");
......@@ -155,6 +162,30 @@ public class JobGraphBuilder {
}
}
public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> function,
TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName,
byte[] serializedFunction, int parallelism) {
StreamInvokable<OUT, OUT> invokableObject = new SourceInvokable<OUT>(function);
addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
serializedFunction, parallelism);
StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
inTypeInfo) : null;
StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
outTypeInfo) : null;
addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
sources.put(vertexName, function.getFormatWrapper());
System.out.println(sources);
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexName);
}
}
/**
* Adds a vertex for the iteration head to the {@link JobGraph}. The
* iterated values will be fed from this vertex back to the graph.
......@@ -341,6 +372,18 @@ public class JobGraphBuilder {
config.setIterationWaitTime(iterationWaitTime.get(vertexName));
}
if (sources.containsKey(vertexName)) {
TaskConfig taskConfig = new TaskConfig(vertex.getConfiguration());
// TypeInformation<?> OutTypeInfo =
// typeWrapperOut1.get(vertexName).getTypeInfo();
InputFormat<String, ?> format = sources.get(vertexName).getUserCodeObject();
vertex.setInputSplitSource(sources.get(vertexName).getUserCodeObject());
// taskConfig.setOutputSerializer(createSerializer(OutTypeInfo));
format.configure(taskConfig.getStubParameters());
// TaskConfig(vertex.getConfiguration());
// taskConfig.setStubWrapper(sources.get(vertexName));
}
streamVertices.put(vertexName, vertex);
}
......
......@@ -24,11 +24,15 @@ import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
......@@ -158,10 +162,91 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the text file.
*/
public DataStreamSource<String> readTextFile(String filePath) {
checkIfFileExists(filePath);
return addSource(new FileSourceFunction(filePath));
// checkIfFileExists(filePath);
Validate.notNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the system's default
* character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @param parallelism
* degree of parallelism
* @return The DataStream representing the text file.
*/
public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
Validate.notNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO),
parallelism);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the given
* character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
*/
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Validate.notNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setCharsetName(charsetName);
return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
}
// public DataStreamSource<StringValue> readTextFileWithValue(String
// filePath) {
// Validate.notNull(filePath, "The file path may not be null.");
// TextValueInputFormat format = new TextValueInputFormat(new
// Path(filePath));
// return addSource(new FileSourceFunction<StringValue>(format,
// new ValueTypeInfo<StringValue>(StringValue.class)), 1);
// }
//
// public DataStreamSource<StringValue> readTextFileWithValue(String
// filePath, String charsetName,
// boolean skipInvalidLines) {
// Validate.notNull(filePath, "The file path may not be null.");
// TextValueInputFormat format = new TextValueInputFormat(new
// Path(filePath));
// format.setCharsetName(charsetName);
// format.setSkipInvalidLines(skipInvalidLines);
// return addSource(new FileSourceFunction<StringValue>(format,
// new ValueTypeInfo<StringValue>(StringValue.class)), 1);
// }
//
// public <X> DataStreamSource<X> readFile(FileInputFormat<X> format, String
// filePath) {
// if (format == null) {
// throw new IllegalArgumentException("InputFormat must not be null.");
// }
// if (filePath == null) {
// throw new IllegalArgumentException("The file path must not be null.");
// }
//
// format.setFilePath(new Path(filePath));
// try {
// return addSource(
// new FileSourceFunction<X>(format,
// TypeExtractor.getInputFormatTypes(format)), 1);
// } catch (Exception e) {
// throw new InvalidProgramException(
// "The type returned by the input format could not be automatically determined. "
// + "Please specify the TypeInformation of the produced type explicitly.");
// }
// }
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
......@@ -330,9 +415,8 @@ public abstract class StreamExecutionEnvironment {
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
try {
jobGraphBuilder.addStreamVertex(returnStream.getId(),
new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
SerializationUtils.serialize(function), 1);
jobGraphBuilder.addSourceVertex(returnStream.getId(), function, null, outTypeInfo,
"source", SerializationUtils.serialize(function), getDegreeOfParallelism());
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
}
......
......@@ -17,32 +17,141 @@
package org.apache.flink.streaming.api.function.source;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.util.Collector;
public class FileSourceFunction implements SourceFunction<String> {
public class FileSourceFunction extends SourceFunction<String> {
private static final long serialVersionUID = 1L;
private final String path;
private InputSplitProvider provider;
public FileSourceFunction(String path) {
this.path = path;
private InputFormat<String, ?> format;
private TypeSerializerFactory<String> serializerFactory;
private UserCodeWrapper<? extends InputFormat<String, ?>> formatWrapper;
// cancel flag
private volatile boolean taskCanceled = false;
public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
this.format = format;
@SuppressWarnings({ "unchecked", "rawtypes" })
GenericDataSourceBase<String, ?> source = new GenericDataSourceBase(format,
new OperatorInformation<String>(typeInfo), format.toString());
formatWrapper = source.getUserCodeWrapper();
this.serializerFactory = createSerializer(typeInfo);
}
@Override
public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
return this.formatWrapper;
}
private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) {
TypeSerializer<String> serializer = typeInfo.createSerializer();
if (serializer.isStateful()) {
return new RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass());
} else {
return new RuntimeStatelessSerializerFactory<String>(serializer,
typeInfo.getTypeClass());
}
}
@Override
public void invoke(Collector<String> collector) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(path));
String line = br.readLine();
while (line != null) {
if (!line.equals("")) {
collector.collect(line);
public void invoke(Collector<String> collector) throws Exception {
final TypeSerializer<String> serializer = serializerFactory.getSerializer();
final Iterator<InputSplit> splitIterator = getInputSplits();
@SuppressWarnings("unchecked")
final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.format;
try {
while (!this.taskCanceled && splitIterator.hasNext()) {
final InputSplit split = splitIterator.next();
String record = serializer.createInstance();
format.open(split);
try {
while (!this.taskCanceled && !format.reachedEnd()) {
if ((record = format.nextRecord(record)) != null) {
collector.collect(record);
}
}
} finally {
format.close();
}
}
line = br.readLine();
collector.close();
} catch (Exception ex) {
ex.printStackTrace();
}
br.close();
}
private Iterator<InputSplit> getInputSplits() {
return new Iterator<InputSplit>() {
private InputSplit nextSplit;
private boolean exhausted;
@Override
public boolean hasNext() {
if (exhausted) {
return false;
}
if (nextSplit != null) {
return true;
}
InputSplit split = provider.getNextInputSplit();
if (split != null) {
this.nextSplit = split;
return true;
} else {
exhausted = true;
return false;
}
}
@Override
public InputSplit next() {
if (this.nextSplit == null && !hasNext()) {
throw new NoSuchElementException();
}
final InputSplit tmp = this.nextSplit;
this.nextSplit = null;
return tmp;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
@Override
public final void initialize(Environment env) {
this.provider = env.getInputSplitProvider();
}
}
......@@ -21,9 +21,11 @@ import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.util.Collector;
public class FileStreamFunction implements SourceFunction<String> {
public class FileStreamFunction extends SourceFunction<String> {
private static final long serialVersionUID = 1L;
private final String path;
......@@ -46,4 +48,9 @@ public class FileStreamFunction implements SourceFunction<String> {
br.close();
}
}
@Override
public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
return null;
}
}
......@@ -22,7 +22,7 @@ import java.util.Collection;
import org.apache.flink.util.Collector;
public class FromElementsFunction<T> implements SourceFunction<T> {
public class FromElementsFunction<T> extends SourceFunction<T> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
......
......@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
* Source Function used to generate the number sequence
*
*/
public class GenSequenceFunction implements SourceFunction<Long> {
public class GenSequenceFunction extends SourceFunction<Long> {
private static final long serialVersionUID = 1L;
......
......@@ -18,11 +18,19 @@
package org.apache.flink.streaming.api.function.source;
import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.util.Collector;
public interface SourceFunction<OUT> extends Function, Serializable {
public void invoke(Collector<OUT> collector) throws Exception;
public abstract class SourceFunction<OUT> implements Function, Serializable {
private static final long serialVersionUID = 1L;
public abstract void invoke(Collector<OUT> collector) throws Exception;
public void initialize(Environment env){}
public abstract UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper();
}
......@@ -49,5 +49,10 @@ public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements Se
@Override
protected void callUserFunction() throws Exception {
}
@Override
public SourceFunction<OUT> getSourceFunction(){
return sourceFunction;
}
}
......@@ -24,6 +24,8 @@ import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
......@@ -161,4 +163,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(userFunction, t);
}
public SourceFunction<OUT> getSourceFunction() {
return null;
}
}
......@@ -76,6 +76,9 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
}
protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
if (userInvokable.getSourceFunction() != null) {
userInvokable.getSourceFunction().initialize(getEnvironment());
}
userInvokable.setRuntimeContext(context);
userInvokable.open(getTaskConfiguration());
userInvokable.invoke();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册