提交 98850048 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] wordcountlocal updatet for tuple

上级 a5e1f0ae
......@@ -18,27 +18,26 @@ package eu.stratosphere.streaming.examples.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
public class WordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue();
private IntValue countValue = new IntValue();
private String word = new String();
private String word = "";
private Integer count = 0;
// private StreamRecord streamRecord = new StreamRecord(2);
private int count = 1;
private int i = 0;
private long time;
private long prevTime = System.currentTimeMillis();
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue.setValue(((StringValue) record.getRecord(0)[0]).getValue());
word = wordValue.getValue();
word = record.getString(0);
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
......@@ -49,14 +48,14 @@ public class WordCountCounter extends UserTaskInvokable {
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
countValue.setValue(count);
} else {
count=1;
wordCounts.put(word, 1);
countValue.setValue(1);
}
// TODO: object reuse
// streamRecord.setRecord(wordValue, countValue);
// emit(streamRecord);
emit(new StreamRecord(wordValue, countValue));
outRecord.setString(0,word);
outRecord.setInteger(1,count);
emit(outRecord);
}
}
......@@ -15,14 +15,13 @@
package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class WordCountDummySource extends UserSourceInvokable {
private StringValue lineValue = new StringValue("");
StreamRecord record = new StreamRecord(lineValue);
StreamRecord record = new StreamRecord(new Tuple1<String>());
public WordCountDummySource() {
}
......@@ -31,11 +30,10 @@ public class WordCountDummySource extends UserSourceInvokable {
public void invoke() throws Exception {
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
lineValue.setValue("Gyula Marci");
record.setString(0, "Gyula Marci");
} else {
lineValue.setValue("Gabor Gyula");
record.setString(0, "Gabor Gyula");
}
record.setRecord(lineValue);
emit(record);
}
}
......
......@@ -15,14 +15,13 @@
package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class WordCountDummySource2 extends UserSourceInvokable {
private StringValue lineValue = new StringValue("");
StreamRecord record = new StreamRecord(lineValue);
StreamRecord record = new StreamRecord(new Tuple1<String>());
private long time;
private long prevTime = System.currentTimeMillis();
......@@ -40,11 +39,10 @@ public class WordCountDummySource2 extends UserSourceInvokable {
}
if (i % 2 == 0) {
lineValue.setValue("Gyula Marci");
record.setString(0, "Gyula Marci");
} else {
lineValue.setValue("Gabor Gyula");
record.setString(0, "Gabor Gyula");
}
record.setRecord(lineValue);
emit(record);
}
}
......
......@@ -25,7 +25,6 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.types.StringValue;
public class WordCountLocal {
......@@ -37,7 +36,8 @@ public class WordCountLocal {
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class);
graphBuilder.shuffleConnect("WordCountSplitter","WordCountCounter");
// graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
......@@ -45,6 +45,7 @@ public class WordCountLocal {
//TODO: arguments check
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
......
......@@ -19,17 +19,15 @@ import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class WordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line = new String();
private StringValue lineValue = new StringValue();
private Value[] values = new StringValue[1];
private Tuple1<String> lineTuple = new Tuple1<String>();
public WordCountSource() {
try {
......@@ -44,10 +42,9 @@ public class WordCountSource extends UserSourceInvokable {
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
lineValue.setValue(line);
values[0] = lineValue;
lineTuple.setField(line, 0);
// TODO: object reuse
emit(new StreamRecord(values));
emit(new StreamRecord(lineTuple));
}
line = br.readLine();
}
......
......@@ -15,17 +15,15 @@
package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue();
private String[] words = new String[] {};
private StringValue wordValue = new StringValue("");
private int i = 0;
private StreamRecord outputRecord = new StreamRecord(wordValue);
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
private long time;
private long prevTime = System.currentTimeMillis();
......@@ -37,13 +35,11 @@ public class WordCountSplitter extends UserTaskInvokable {
System.out.println("Splitter:\t" + i + "\t----Time: " + (time - prevTime));
prevTime = time;
}
sentence = (StringValue) record.getRecord(0)[0];
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
outputRecord.setRecord(wordValue);
words = record.getString(0).split(" ");
for (String word : words) {
outputRecord.setString(0, word);
emit(outputRecord);
// emit(new StreamRecord(wordValue));
}
}
}
\ No newline at end of file
......@@ -18,7 +18,6 @@ package eu.stratosphere.streaming.api.streamcomponent;
import static org.junit.Assert.assertEquals;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
......@@ -41,8 +40,6 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.IntValue;
public class StreamComponentTest {
......@@ -52,10 +49,10 @@ public class StreamComponentTest {
public static class MySource extends UserSourceInvokable {
public MySource() {
}
StreamRecord record = new StreamRecord(new Tuple1<Integer>());
@Override
public void invoke() throws Exception {
StreamRecord record = new StreamRecord(new Tuple1<Integer>(-1));
for (int i = 0; i < 1000; i++) {
record.setField(0, i);
emit(record);
......@@ -70,7 +67,7 @@ public class StreamComponentTest {
@Override
public void invoke(StreamRecord record) throws Exception {
Integer i = (Integer) record.getField(0);
Integer i = record.getInteger(0);
emit(new StreamRecord(new Tuple2<Integer, Integer>(i, i + 1)));
}
}
......@@ -81,8 +78,8 @@ public class StreamComponentTest {
@Override
public void invoke(StreamRecord record) throws Exception {
Integer k = (Integer) record.getField(0);
Integer v = (Integer) record.getField(1);
Integer k = record.getInteger(0);
Integer v = record.getInteger(1);
data.put(k, v);
}
......@@ -100,7 +97,7 @@ public class StreamComponentTest {
root.setLevel(Level.OFF);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("MySource", StreamComponentTest.MySource.class);
graphBuilder.setSource("MySource", MySource.class);
graphBuilder.setTask("MyTask", MyTask.class, 2);
graphBuilder.setSink("MySink", MySink.class);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册