提交 306ef32f 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] WordCount Refactor

上级 cef2a4d0
......@@ -25,9 +25,9 @@ public class BatchForward extends TestBase2{
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("StreamSource", BatchForwardSource.class);
graphBuilder.setSink("StreamSink", BachForwardSink.class);
graphBuilder.setSink("StreamSink", BatchForwardSink.class);
graphBuilder.broadcastConnect("StreamSource", "StreamSink");
graphBuilder.shuffleConnect("StreamSource", "StreamSink");
return graphBuilder.getJobGraph();
}
......
......@@ -19,7 +19,7 @@ import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.StringValue;
public class BachForwardSink implements UserSinkInvokable {
public class BatchForwardSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
......
......@@ -30,10 +30,10 @@ public class BatchWordCount extends TestBase2 {
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2);
graphBuilder.setSink("BatchWordCountSink", BatchWordCountSink.class);
graphBuilder.broadcastConnect("BatchWordCountSource", "BatchWordCountSplitter");
graphBuilder.shuffleConnect("BatchWordCountSource", "BatchWordCountSplitter");
graphBuilder.fieldsConnect("BatchWordCountSplitter", "BatchWordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("BatchWordCountCounter", "BatchWordCountSink");
graphBuilder.shuffleConnect("BatchWordCountCounter", "BatchWordCountSink");
return graphBuilder.getJobGraph();
}
......
......@@ -26,19 +26,18 @@ import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class BatchWordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private LongValue timestamp = new LongValue(0);
private String word = "";
private AtomRecord outputRecord = new AtomRecord(3);
private int count = 1;
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue=(StringValue) record.getField(0, 0);
timestamp=(LongValue) record.getField(0, 1);
wordValue = (StringValue) record.getField(0, 0);
timestamp = (LongValue) record.getField(0, 1);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
......@@ -48,10 +47,7 @@ public class BatchWordCountCounter extends UserTaskInvokable {
wordCounts.put(word, 1);
countValue.setValue(1);
}
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
outputRecord.setField(2, timestamp);
emit(new StreamRecord(outputRecord));
emit(new StreamRecord(wordValue, countValue, timestamp));
}
}
\ No newline at end of file
......@@ -30,10 +30,10 @@ public class WordCount extends TestBase2 {
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.broadcastConnect("WordCountSource", "WordCountSplitter");
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WordCountCounter", "WordCountSink");
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
......
......@@ -26,9 +26,9 @@ import eu.stratosphere.types.StringValue;
public class WordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private String word = "";
private StringValue wordValue = new StringValue();
private IntValue countValue = new IntValue();
private String word = new String();
private int count = 1;
@Override
......
......@@ -9,18 +9,16 @@ public class WordCountDummySource extends UserSourceInvokable {
private String line = new String();
private StringValue lineValue = new StringValue();
private Value[] values = new StringValue[1];
public WordCountDummySource() {
line = "first second";
lineValue.setValue(line);
values[0] = lineValue;
}
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1; i++) {
emit(new StreamRecord(values));
emit(new StreamRecord(lineValue));
System.out.println("xxxxxxxxx");
}
}
......
......@@ -110,10 +110,10 @@ public class WordCountLocal {
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.broadcastConnect("WordCountSource", "WordCountSplitter");
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WordCountCounter", "WordCountSink");
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
......
......@@ -22,7 +22,7 @@ import eu.stratosphere.types.StringValue;
public class WordCountSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
private StringValue word = new StringValue();
private IntValue count = new IntValue();
@Override
......
......@@ -21,9 +21,9 @@ import eu.stratosphere.types.StringValue;
public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private StringValue sentence = new StringValue();
private String[] words = new String[] {};
private StringValue wordValue = new StringValue();
@Override
public void invoke(StreamRecord record) throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册