提交 ab00ad6b 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] Test Refactor

上级 306ef32f
......@@ -25,15 +25,20 @@ public class BatchWordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2);
graphBuilder.setSource("BatchWordCountSource",
BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter",
BatchWordCountSplitter.class, 2);
graphBuilder.setTask("BatchWordCountCounter",
BatchWordCountCounter.class, 2);
graphBuilder.setSink("BatchWordCountSink", BatchWordCountSink.class);
graphBuilder.shuffleConnect("BatchWordCountSource", "BatchWordCountSplitter");
graphBuilder.fieldsConnect("BatchWordCountSplitter", "BatchWordCountCounter", 0,
StringValue.class);
graphBuilder.shuffleConnect("BatchWordCountCounter", "BatchWordCountSink");
graphBuilder.shuffleConnect("BatchWordCountSource",
"BatchWordCountSplitter");
graphBuilder.fieldsConnect("BatchWordCountSplitter",
"BatchWordCountCounter", 0, StringValue.class);
graphBuilder.shuffleConnect("BatchWordCountCounter",
"BatchWordCountSink");
return graphBuilder.getJobGraph();
}
......
......@@ -18,7 +18,6 @@ package eu.stratosphere.streaming.test.batch.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
......@@ -28,10 +27,10 @@ import eu.stratosphere.types.StringValue;
public class BatchWordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private LongValue timestamp = new LongValue(0);
private String word = "";
private StringValue wordValue = new StringValue();
private IntValue countValue = new IntValue();
private LongValue timestamp = new LongValue();
private String word = new String();
private int count = 1;
@Override
......
......@@ -23,15 +23,15 @@ import eu.stratosphere.types.StringValue;
public class BatchWordCountSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
private LongValue timestamp = new LongValue(0);
private StringValue word = new StringValue();
private IntValue count = new IntValue();
private LongValue timestamp = new LongValue();
@Override
public void invoke(StreamRecord record) throws Exception {
word=(StringValue) record.getField(0, 0);
count=(IntValue) record.getField(0, 1);
timestamp=(LongValue) record.getField(0, 2);
word = (StringValue) record.getField(0, 0);
count = (IntValue) record.getField(0, 1);
timestamp = (LongValue) record.getField(0, 2);
System.out.println("============================================");
System.out.println(word.getValue() + " " + count.getValue() + " "
+ timestamp.getValue());
......
......@@ -15,29 +15,57 @@
package eu.stratosphere.streaming.test.batch.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class BatchWordCountSource extends UserSourceInvokable {
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private long timestamp;
private StreamRecord mottoRecords = new StreamRecord(2);
private BufferedReader br = null;
private String line = new String();
private StringValue lineValue = new StringValue();
private LongValue timestampValue = new LongValue();
private Value[] values = new Value[2];
private final static int BATCH_SIZE = 10;
private long timestamp = 0;
public BatchWordCountSource() {
try {
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
@Override
public void invoke() throws Exception {
timestamp = 0;
for (int i = 0; i < 100; ++i) {
AtomRecord mottoRecord = new AtomRecord(2);
mottoRecord.setField(0, new StringValue(motto));
mottoRecord.setField(1, new LongValue(timestamp));
mottoRecords.addRecord(mottoRecord);
++timestamp;
if (timestamp % 10 == 0) {
emit(mottoRecords);
StreamRecord mottoRecords = new StreamRecord(2);
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
lineValue.setValue(line);
timestampValue.setValue(timestamp);
values[0] = lineValue;
values[1] = timestampValue;
mottoRecords.addRecord(values);
timestamp++;
if (timestamp % BATCH_SIZE == 0) {
emit(mottoRecords);
mottoRecords = new StreamRecord(2);
}
}
line = br.readLine();
}
}
......
......@@ -15,7 +15,6 @@
package eu.stratosphere.streaming.test.batch.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.LongValue;
......@@ -23,24 +22,21 @@ import eu.stratosphere.types.StringValue;
public class BatchWordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private LongValue timestamp = new LongValue(0);
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private AtomRecord outputRecord = new AtomRecord(2);
private StringValue sentence = new StringValue();
private LongValue timestamp = new LongValue();
private String[] words = new String[] {};
private StringValue wordValue = new StringValue();
@Override
public void invoke(StreamRecord record) throws Exception {
int numberOfRecords = record.getNumOfRecords();
for(int i=0; i<numberOfRecords; ++i){
for (int i = 0; i < numberOfRecords; ++i) {
sentence = (StringValue) record.getField(i, 0);
timestamp = (LongValue) record.getField(i, 1);
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, timestamp);
emit(new StreamRecord(outputRecord));
emit(new StreamRecord(wordValue, timestamp));
}
}
}
......
......@@ -32,7 +32,7 @@ public class CellInfo extends TestBase2 {
graphBuilder.fieldsConnect("infoSource", "cellTask", 0, IntValue.class);
graphBuilder.fieldsConnect("querySource", "cellTask",0, IntValue.class);
graphBuilder.broadcastConnect("cellTask", "sink");
graphBuilder.shuffleConnect("cellTask", "sink");
return graphBuilder.getJobGraph();
}
......
......@@ -17,5 +17,6 @@ package eu.stratosphere.streaming.test.cellinfo;
public interface IWorkerEngine {
public int get(long timeStamp, long lastMillis, int cellId);
public void put(int cellId, long timeStamp);
}
......@@ -30,10 +30,10 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private int slidingStep = 20;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private LongValue timestamp = new LongValue(0);
private String word = "";
private StringValue wordValue = new StringValue();
private IntValue countValue = new IntValue();
private LongValue timestamp = new LongValue();
private String word = new String();
private int count = 1;
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册