提交 f9d03236 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] StreamExecutionEnvironment javadoc update

上级 0dea5d1d
......@@ -28,11 +28,26 @@ import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.util.Collector;
//TODO: add file, elements, rmq source
//TODO: figure out generic dummysink
//TODO:add link to ExecutionEnvironment
/**
* ExecutionEnvironment for streaming jobs. An instance of it is necessary to
* construct streaming topologies.
*
*/
// TODO: add file, elements, rmq source
// TODO: figure out generic dummysink
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
/**
* General constructor specifying the batch size in which the tuples are
* transmitted and their timeout boundary.
*
* @param defaultBatchSize
* number of tuples in a batch
* @param defaultBatchTimeoutMillis
* timeout boundary in milliseconds
*/
public StreamExecutionEnvironment(int defaultBatchSize, long defaultBatchTimeoutMillis) {
if (defaultBatchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive.");
......@@ -44,6 +59,9 @@ public class StreamExecutionEnvironment {
defaultBatchSize, defaultBatchTimeoutMillis);
}
/**
* Constructor for transmitting tuples individually with a 1 second timeout.
*/
public StreamExecutionEnvironment() {
this(1, 1000);
}
......@@ -59,10 +77,19 @@ public class StreamExecutionEnvironment {
}
}
/**
* Partitioning strategy on the stream.
*/
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD
}
/**
* Sets the batch size of the datastream in which the tuple are transmitted.
*
* @param inputStream
* input datastream
*/
public <T extends Tuple> void setBatchSize(DataStream<T> inputStream) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
......@@ -71,6 +98,15 @@ public class StreamExecutionEnvironment {
}
}
// TODO: Link to JobGraph & JobGraphBuilder
/**
* Internal function for assembling the underlying JobGraph of the job.
*
* @param inputStream
* input datastream
* @param outputID
* ID of the output
*/
private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
......@@ -95,9 +131,19 @@ public class StreamExecutionEnvironment {
}
public <T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
//TODO: link to JobGraph, JobVertex, user-defined spellcheck
/**
* Internal function for passing the user defined functions to the JobGraph of the job.
* @param functionName name of the function
* @param inputStream input data stream
* @param function the user defined function
* @param functionInvokable the wrapping JobVertex instance
* @param parallelism number of parallel instances of the function
* @return the data stream constructed
*/
<T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
DataStream<T> inputStream, final AbstractFunction function,
UserTaskInvokable<T, R> functionInvokable, int paralelism) {
UserTaskInvokable<T, R> functionInvokable, int parallelism) {
DataStream<R> returnStream = new DataStream<R>(this);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
......@@ -110,21 +156,15 @@ public class StreamExecutionEnvironment {
}
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
baos.toByteArray(),paralelism,paralelism);
baos.toByteArray(), parallelism, parallelism);
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
public <T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
DataStream<T> inputStream, final AbstractFunction function,
UserTaskInvokable<T, R> functionInvokable) {
return addFunction(functionName, inputStream, function, functionInvokable, 1);
}
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction, int paralelism) {
SinkFunction<T> sinkFunction, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
......@@ -138,13 +178,13 @@ public class StreamExecutionEnvironment {
}
jobGraphBuilder.setSink("sink", new SinkInvokable<T>(sinkFunction), "sink",
baos.toByteArray(),paralelism,paralelism);
baos.toByteArray(), parallelism, parallelism);
connectGraph(inputStream, "sink");
return returnStream;
}
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
return addSink(inputStream, sinkFunction, 1);
......@@ -169,7 +209,8 @@ public class StreamExecutionEnvironment {
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
}
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction, int parallelism) {
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
......@@ -183,13 +224,13 @@ public class StreamExecutionEnvironment {
}
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
baos.toByteArray(),parallelism,parallelism);
baos.toByteArray(), parallelism, parallelism);
return returnStream;
}
public DataStream<Tuple1<String>> readTextFile(String path) {
return addSource(new FileSourceFunction(path),1);
return addSource(new FileSourceFunction(path), 1);
}
public DataStream<Tuple1<String>> addDummySource() {
......@@ -206,7 +247,7 @@ public class StreamExecutionEnvironment {
}
jobGraphBuilder.setSource(returnStream.getId(), new DummySource(), "source",
baos.toByteArray(),1,1);
baos.toByteArray(), 1, 1);
return returnStream;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册