提交 50422801 编写于 作者: F Fabian Hueske

Minor changes in Java POJO WordCount example

上级 0902829e
......@@ -18,7 +18,6 @@ import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.example.java.wordcount.util.WordCountData;
import eu.stratosphere.util.Collector;
......@@ -49,24 +48,6 @@ public class WordCountPOJO {
// PROGRAM
// *************************************************************************
public static class WC {
public String word;
public int count;
public WC() {
}
public WC(String word, int count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " " + count;
}
}
public static void main(String[] args) throws Exception {
parseParameters(args);
......@@ -76,26 +57,14 @@ public class WordCountPOJO {
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<WC> tokenized = text.flatMap(new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String value, Collector<WC> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new WC(token, 1));
}
}
}
});
DataSet<WC> counts = tokenized
.groupBy("word")
.reduce(new ReduceFunction<WC>() {
public WC reduce(WC value1, WC value2) {
return new WC(value1.word, value1.count + value2.count);
}
});
DataSet<WC> counts = text
.flatMap(new Tokenizer())
.groupBy("word")
.reduce(new ReduceFunction<WC>() {
public WC reduce(WC value1, WC value2) {
return new WC(value1.word, value1.count + value2.count);
}
});
// emit result
if(fileOutput) {
......@@ -104,9 +73,31 @@ public class WordCountPOJO {
counts.print();
}
env.execute("WordCount with custom data types Example");
env.execute("WordCount with custom data types example");
}
// *************************************************************************
// USER DATA TYPES (POJOs)
// *************************************************************************
public static class WC {
public String word;
public int count;
public WC() {
}
public WC(String word, int count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " " + count;
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
......@@ -114,19 +105,19 @@ public class WordCountPOJO {
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
* multiple WC POJOs as "(word, 1)".
*/
public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
public static final class Tokenizer extends FlatMapFunction<String, WC> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
public void flatMap(String value, Collector<WC> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
out.collect(new WC(token, 1));
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册