提交 19de827a 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] examples refactor

上级 7345bf6a
......@@ -22,9 +22,11 @@ import java.io.ObjectOutputStream;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.util.Collector;
//TODO:add link to ExecutionEnvironment
/**
......@@ -32,6 +34,8 @@ import eu.stratosphere.streaming.util.ClusterUtil;
* construct streaming topologies.
*
*/
// TODO: add file, elements, rmq source
// TODO: figure out generic dummysink
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
......@@ -62,6 +66,17 @@ public class StreamExecutionEnvironment {
this(1, 1000);
}
private static class DummySource extends UserSourceInvokable<Tuple1<String>> {
private static final long serialVersionUID = 1L;
public void invoke(Collector<Tuple1<String>> collector) {
for (int i = 0; i < 10; i++) {
collector.collect(new Tuple1<String>("source"));
}
}
}
/**
* Partitioning strategy on the stream.
*/
......@@ -70,10 +85,10 @@ public class StreamExecutionEnvironment {
}
/**
* Sets the batch size of the data stream in which the tuple are transmitted.
* Sets the batch size of the datastream in which the tuple are transmitted.
*
* @param inputStream
* input data stream
* input datastream
*/
public <T extends Tuple> void setBatchSize(DataStream<T> inputStream) {
......@@ -88,7 +103,7 @@ public class StreamExecutionEnvironment {
* Internal function for assembling the underlying JobGraph of the job.
*
* @param inputStream
* input data stream
* input datastream
* @param outputID
* ID of the output
*/
......@@ -116,7 +131,7 @@ public class StreamExecutionEnvironment {
}
// TODO: link to JobGraph, JobVertex
// TODO: link to JobGraph, JobVertex, user-defined spellcheck
/**
* Internal function for passing the user defined functions to the JobGraph
* of the job.
......@@ -155,17 +170,6 @@ public class StreamExecutionEnvironment {
return returnStream;
}
/**
* Ads a sink to the data stream closing it.
*
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this);
......@@ -188,32 +192,12 @@ public class StreamExecutionEnvironment {
return returnStream;
}
/**
* Ads a sink to the data stream closing it. To parallelism is defaulted to
* 1.
*
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
return addSink(inputStream, sinkFunction, 1);
}
// TODO: link to SinkFunction
/**
* Dummy implementation of the SinkFunction writing every tuple to the
* standard output.
*
* @param <IN>
* Input tuple type
*/
private static final class DummySink<IN extends Tuple> extends SinkFunction<IN> {
public static final class DummySink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
......@@ -223,13 +207,6 @@ public class StreamExecutionEnvironment {
}
/**
* Prints the tuples of the data stream to the standard output.
*
* @param inputStream
* the input data stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> print(DataStream<T> inputStream) {
DataStream<T> returnStream = addSink(inputStream, new DummySink<T>());
......@@ -238,24 +215,10 @@ public class StreamExecutionEnvironment {
return returnStream;
}
// TODO: Link to JobGraph and ClusterUtil
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil.
*/
public void execute() {
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
}
// TODO: Link to DataStream
/**
* Ads a data source thus opening a data stream.
*
* @param sourceFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this);
......@@ -276,20 +239,32 @@ public class StreamExecutionEnvironment {
return returnStream.copy();
}
//TODO: understand difference
public DataStream<Tuple1<String>> readTextFile(String path) {
return addSource(new FileSourceFunction(path), 1);
}
public DataStream<Tuple1<String>> readTextStream(String path) {
return addSource(new FileStreamFunction(path), 1);
return addSource(new FileStreamFunction(path),1);
}
public DataStream<Tuple1<String>> addDummySource() {
DataStream<Tuple1<String>> returnStream = new DataStream<Tuple1<String>>(this);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(new DummySource());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jobGraphBuilder.setSource(returnStream.getId(), new DummySource(), "source",
baos.toByteArray(), 1, 1);
return returnStream;
}
//TODO: Add link to JobGraphBuilder
/**
* Getter of the JobGraphBuilder of the streaming job.
* @return
*/
public JobGraphBuilder jobGB() {
return jobGraphBuilder;
}
......
......@@ -29,10 +29,10 @@ public class BasicTopology {
Tuple1<String> tuple = new Tuple1<String>("streaming");
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
// emit continuously a tuple
public void invoke(Collector<Tuple1<String>> out) throws Exception {
// emit continuously
while (true) {
collector.collect(tuple);
out.collect(tuple);
}
}
}
......
......@@ -30,8 +30,8 @@ public class CellInfoLocal {
private static Random rand = new Random();
private final static int CELL_COUNT = 10;
private final static int LAST_MILLIS = 1000;
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
private final static int PARALELISM = 1;
private final static int SOURCE_PARALELISM = 1;
private final static class QuerySource extends
SourceFunction<Tuple4<Boolean, Integer, Long, Integer>> {
......@@ -52,7 +52,7 @@ public class CellInfoLocal {
}
}
private final static class InfoSource extends
public final static class InfoSource extends
SourceFunction<Tuple4<Boolean, Integer, Long, Integer>> {
private static final long serialVersionUID = 1L;
......@@ -60,7 +60,7 @@ public class CellInfoLocal {
false, 0, 0L, 0);
@Override
public void invoke(Collector<Tuple4<Boolean, Integer, Long, Integer>> collector)
public void invoke(Collector<Tuple4<Boolean, Integer, Long, Integer>> out)
throws Exception {
for (int i = 0; i < 1000; i++) {
Thread.sleep(100);
......@@ -68,7 +68,7 @@ public class CellInfoLocal {
tuple.f1 = rand.nextInt(CELL_COUNT);
tuple.f2 = System.currentTimeMillis();
collector.collect(tuple);
out.collect(tuple);
}
}
}
......
......@@ -28,17 +28,17 @@ public class JoinSourceOne extends SourceFunction<Tuple3<String, String, Integer
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple3<String, String, Integer> outRecord = new Tuple3<String, String, Integer>();
private Tuple3<String, String, Integer> outTuple = new Tuple3<String, String, Integer>();
@Override
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
public void invoke(Collector<Tuple3<String, String, Integer>> out) throws Exception {
// Continuously emit tuples with random names and integers (salaries).
while (true) {
outRecord.f0 = "salary";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(10000);
collector.collect(outRecord);
outTuple.f0 = "salary";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(10000);
out.collect(outTuple);
}
}
}
......@@ -28,17 +28,17 @@ public class JoinSourceTwo extends SourceFunction<Tuple3<String, String, Integer
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple3<String, String, Integer> outRecord = new Tuple3<String, String, Integer>();
private Tuple3<String, String, Integer> outTuple = new Tuple3<String, String, Integer>();
@Override
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
public void invoke(Collector<Tuple3<String, String, Integer>> out) throws Exception {
// Continuously emit tuples with random names and integers (grades).
while (true) {
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(5) + 1;
outTuple.f0 = "grade";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(5) + 1;
collector.collect(outRecord);
out.collect(outTuple);
}
}
}
......@@ -59,10 +59,10 @@ public class IncrementalOLS {
Random rnd = new Random();
@Override
public void invoke(Collector<Tuple2<Double, Double[]>> collector) throws Exception {
public void invoke(Collector<Tuple2<Double, Double[]>> out) throws Exception {
while (true) {
collector.collect(getTrainingData());
out.collect(getTrainingData());
}
}
......
......@@ -38,17 +38,18 @@ public class WindowJoinLocal {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<String, String, Integer, Long>> source1 = env.addSource(
DataStream<Tuple4<String, String, Integer, Long>> dataStream1 = env.addSource(
new WindowJoinSourceOne(), SOURCE_PARALELISM);
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Integer>> source2 = env
DataStream<Tuple3<String, Integer, Integer>> dataStream2 = env
.addSource(new WindowJoinSourceTwo(), SOURCE_PARALELISM)
.connectWith(source1)
.connectWith(dataStream1)
.partitionBy(1)
.flatMap(new WindowJoinTask(), PARALELISM)
.addSink(new JoinSink());
dataStream2.print();
env.execute();
}
......
......@@ -28,17 +28,17 @@ public class WindowJoinSourceOne extends SourceFunction<Tuple4<String, String, I
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple4<String, String, Integer, Long> outRecord = new Tuple4<String, String, Integer, Long>();
private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
while (true) {
outRecord.f0 = "salary";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(10000);
outRecord.f3 = progress;
collector.collect(outRecord);
outTuple.f0 = "salary";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(10000);
outTuple.f3 = progress;
out.collect(outTuple);
progress += 1;
}
}
......
......@@ -28,17 +28,17 @@ public class WindowJoinSourceTwo extends SourceFunction<Tuple4<String, String, I
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple4<String, String, Integer, Long> outRecord = new Tuple4<String, String, Integer, Long>();
private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
while (true) {
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(5) + 1;
outRecord.f3 = progress;
collector.collect(outRecord);
outTuple.f0 = "grade";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(5) + 1;
outTuple.f3 = progress;
out.collect(outTuple);
progress += 1;
}
}
......
......@@ -27,7 +27,6 @@ public class WindowSumLocal {
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<Integer, Long>> dataStream = env
.addSource(new WindowSumSource(), SOURCE_PARALELISM)
.map(new WindowSumMultiple(), PARALELISM)
......
......@@ -47,8 +47,6 @@ public class WordCountCounter extends MapFunction<Tuple1<String>, Tuple2<String,
outTuple.f1 = count;
return outTuple;
// performanceCounter.count();
}
}
......@@ -24,30 +24,12 @@ public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<St
private Tuple1<String> outTuple = new Tuple1<String>();
//TODO move the performance tracked version to a separate package and clean this
// PerformanceCounter pCounter = new
// PerformanceCounter("SplitterEmitCounter", 1000, 1000,
// "/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID);
// PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000,
// 1000, true,
// "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID);
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
// pTimer.stopTimer();
// pCounter.count();
}
}
// @Override
// public String getResult() {
// pCounter.writeCSV();
// pTimer.writeCSV();
// return "";
// }
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册