提交 54f06b16 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] clustersize setting added to streamexecution environment

上级 19de827a
......@@ -38,6 +38,8 @@ import eu.stratosphere.util.Collector;
// TODO: figure out generic dummysink
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
private float clusterSize = 1;
/**
* General constructor specifying the batch size in which the tuples are
......@@ -66,6 +68,11 @@ public class StreamExecutionEnvironment {
this(1, 1000);
}
public StreamExecutionEnvironment setClusterSize(int clusterSize) {
this.clusterSize = clusterSize;
return this;
}
private static class DummySource extends UserSourceInvokable<Tuple1<String>> {
private static final long serialVersionUID = 1L;
......@@ -163,7 +170,7 @@ public class StreamExecutionEnvironment {
}
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
baos.toByteArray(), parallelism, parallelism);
baos.toByteArray(), parallelism,(int) Math.ceil(parallelism/clusterSize));
connectGraph(inputStream, returnStream.getId());
......@@ -185,7 +192,7 @@ public class StreamExecutionEnvironment {
}
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
baos.toByteArray(), parallelism, parallelism);
baos.toByteArray(), parallelism, (int) Math.ceil(parallelism/clusterSize));
connectGraph(inputStream, returnStream.getId());
......@@ -234,7 +241,7 @@ public class StreamExecutionEnvironment {
}
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
baos.toByteArray(), parallelism, parallelism);
baos.toByteArray(), parallelism, (int) Math.ceil(parallelism/clusterSize));
return returnStream.copy();
}
......@@ -242,9 +249,9 @@ public class StreamExecutionEnvironment {
public DataStream<Tuple1<String>> readTextFile(String path) {
return addSource(new FileSourceFunction(path), 1);
}
public DataStream<Tuple1<String>> readTextStream(String path) {
return addSource(new FileStreamFunction(path),1);
return addSource(new FileStreamFunction(path), 1);
}
public DataStream<Tuple1<String>> addDummySource() {
......
......@@ -29,10 +29,10 @@ public class BasicTopology {
Tuple1<String> tuple = new Tuple1<String>("streaming");
@Override
public void invoke(Collector<Tuple1<String>> out) throws Exception {
// emit continuously
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
// emit continuously a tuple
while (true) {
out.collect(tuple);
collector.collect(tuple);
}
}
}
......
......@@ -30,8 +30,8 @@ public class CellInfoLocal {
private static Random rand = new Random();
private final static int CELL_COUNT = 10;
private final static int LAST_MILLIS = 1000;
private final static int PARALELISM = 1;
private final static int SOURCE_PARALELISM = 1;
private static final int PARALELISM = 1;
private static final int SOURCE_PARALELISM = 1;
private final static class QuerySource extends
SourceFunction<Tuple4<Boolean, Integer, Long, Integer>> {
......@@ -52,7 +52,7 @@ public class CellInfoLocal {
}
}
public final static class InfoSource extends
private final static class InfoSource extends
SourceFunction<Tuple4<Boolean, Integer, Long, Integer>> {
private static final long serialVersionUID = 1L;
......@@ -60,7 +60,7 @@ public class CellInfoLocal {
false, 0, 0L, 0);
@Override
public void invoke(Collector<Tuple4<Boolean, Integer, Long, Integer>> out)
public void invoke(Collector<Tuple4<Boolean, Integer, Long, Integer>> collector)
throws Exception {
for (int i = 0; i < 1000; i++) {
Thread.sleep(100);
......@@ -68,7 +68,7 @@ public class CellInfoLocal {
tuple.f1 = rand.nextInt(CELL_COUNT);
tuple.f2 = System.currentTimeMillis();
out.collect(tuple);
collector.collect(tuple);
}
}
}
......
......@@ -28,17 +28,17 @@ public class JoinSourceOne extends SourceFunction<Tuple3<String, String, Integer
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple3<String, String, Integer> outTuple = new Tuple3<String, String, Integer>();
private Tuple3<String, String, Integer> outRecord = new Tuple3<String, String, Integer>();
@Override
public void invoke(Collector<Tuple3<String, String, Integer>> out) throws Exception {
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
// Continuously emit tuples with random names and integers (salaries).
while (true) {
outTuple.f0 = "salary";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(10000);
out.collect(outTuple);
outRecord.f0 = "salary";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(10000);
collector.collect(outRecord);
}
}
}
......@@ -28,17 +28,17 @@ public class JoinSourceTwo extends SourceFunction<Tuple3<String, String, Integer
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple3<String, String, Integer> outTuple = new Tuple3<String, String, Integer>();
private Tuple3<String, String, Integer> outRecord = new Tuple3<String, String, Integer>();
@Override
public void invoke(Collector<Tuple3<String, String, Integer>> out) throws Exception {
public void invoke(Collector<Tuple3<String, String, Integer>> collector) throws Exception {
// Continuously emit tuples with random names and integers (grades).
while (true) {
outTuple.f0 = "grade";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(5) + 1;
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(5) + 1;
out.collect(outTuple);
collector.collect(outRecord);
}
}
}
......@@ -59,10 +59,10 @@ public class IncrementalOLS {
Random rnd = new Random();
@Override
public void invoke(Collector<Tuple2<Double, Double[]>> out) throws Exception {
public void invoke(Collector<Tuple2<Double, Double[]>> collector) throws Exception {
while (true) {
out.collect(getTrainingData());
collector.collect(getTrainingData());
}
}
......
......@@ -38,17 +38,16 @@ public class WindowJoinLocal {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<String, String, Integer, Long>> dataStream1 = env.addSource(
DataStream<Tuple4<String, String, Integer, Long>> source1 = env.addSource(
new WindowJoinSourceOne(), SOURCE_PARALELISM);
DataStream<Tuple3<String, Integer, Integer>> dataStream2 = env
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Integer>> source2 = env
.addSource(new WindowJoinSourceTwo(), SOURCE_PARALELISM)
.connectWith(dataStream1)
.connectWith(source1)
.partitionBy(1)
.flatMap(new WindowJoinTask(), PARALELISM)
.addSink(new JoinSink());
dataStream2.print();
env.execute();
......
......@@ -28,17 +28,17 @@ public class WindowJoinSourceOne extends SourceFunction<Tuple4<String, String, I
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
private Tuple4<String, String, Integer, Long> outRecord = new Tuple4<String, String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
while (true) {
outTuple.f0 = "salary";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(10000);
outTuple.f3 = progress;
out.collect(outTuple);
outRecord.f0 = "salary";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(10000);
outRecord.f3 = progress;
collector.collect(outRecord);
progress += 1;
}
}
......
......@@ -28,17 +28,17 @@ public class WindowJoinSourceTwo extends SourceFunction<Tuple4<String, String, I
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
private Tuple4<String, String, Integer, Long> outRecord = new Tuple4<String, String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
while (true) {
outTuple.f0 = "grade";
outTuple.f1 = names[rand.nextInt(names.length)];
outTuple.f2 = rand.nextInt(5) + 1;
outTuple.f3 = progress;
out.collect(outTuple);
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(5) + 1;
outRecord.f3 = progress;
collector.collect(outRecord);
progress += 1;
}
}
......
......@@ -27,6 +27,7 @@ public class WindowSumLocal {
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<Integer, Long>> dataStream = env
.addSource(new WindowSumSource(), SOURCE_PARALELISM)
.map(new WindowSumMultiple(), PARALELISM)
......
......@@ -47,6 +47,8 @@ public class WordCountCounter extends MapFunction<Tuple1<String>, Tuple2<String,
outTuple.f1 = count;
return outTuple;
// performanceCounter.count();
}
}
......@@ -24,12 +24,30 @@ public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<St
private Tuple1<String> outTuple = new Tuple1<String>();
//TODO move the performance tracked version to a separate package and clean this
// PerformanceCounter pCounter = new
// PerformanceCounter("SplitterEmitCounter", 1000, 1000,
// "/home/strato/stratosphere-distrib/log/counter/Splitter" + channelID);
// PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000,
// 1000, true,
// "/home/strato/stratosphere-distrib/log/timer/Splitter" + channelID);
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
// pTimer.stopTimer();
// pCounter.count();
}
}
// @Override
// public String getResult() {
// pCounter.writeCSV();
// pTimer.writeCSV();
// return "";
// }
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册