提交 820b58dd 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] WordCount updated

上级 f4551a83
......@@ -27,12 +27,15 @@ public class WordCount extends TestBase2 {
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WorCountSource", WordCountSource.class);
graphBuilder.setTask("WorCountTask", WordCountTask.class, 2);
graphBuilder.setTask("WorCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WorCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.fieldsConnect("WorCountSource", "WorCountTask", 0,
graphBuilder.fieldsConnect("WorCountSource", "WorCountSplitter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WorCountTask", "WordCountSink");
graphBuilder.fieldsConnect("WorCountSplitter", "WorCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WorCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private Record outputRecord = new Record(wordValue, countValue);
private String word = "";
private int count = 1;
@Override
public void invoke(Record record) throws Exception {
record.getFieldInto(0, wordValue);
word = wordValue.getValue();
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
outputRecord.setField(0, wordValue);
countValue.setValue(count);
outputRecord.setField(1, countValue);
emit(outputRecord);
} else {
wordCounts.put(word, 1);
countValue.setValue(1);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(outputRecord);
}
}
}
......@@ -15,34 +15,23 @@
package eu.stratosphere.streaming.test.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WordCountSink implements UserSinkInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
@Override
public void invoke(Record record) throws Exception {
record.getFieldInto(0, word);
record.getFieldInto(1, count);
if (wordCounts.containsKey(word.getValue())) {
wordCounts
.put(word.getValue(), wordCounts.get(word.getValue()) + 1);
System.out.println(word.getValue() + " "
+ wordCounts.get(word.getValue()));
} else {
wordCounts.put(word.getValue(), 1);
System.out.println(word.getValue() + " "
+ wordCounts.get(word.getValue()));
}
}
System.out.println(word.getValue() + " " + count.getValue());
}
\ No newline at end of file
}
}
......@@ -19,8 +19,8 @@ import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WordCountTask extends UserTaskInvokable {
public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册