提交 5dd42c1a 编写于 作者: M mbalassi

[FLINK-1839] Reworked TwitterStreamITCase for testability

The job is now commutative and associative and thus testable in parallel.
上级 0afed4dc
...@@ -17,12 +17,11 @@ ...@@ -17,12 +17,11 @@
package org.apache.flink.streaming.examples.twitter; package org.apache.flink.streaming.examples.twitter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData; import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.sling.commons.json.JSONException; import org.apache.sling.commons.json.JSONException;
...@@ -63,31 +62,18 @@ public class TwitterStream { ...@@ -63,31 +62,18 @@ public class TwitterStream {
// set up the execution environment // set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setBufferTimeout(1000);
// get input data // get input data
DataStream<String> streamSource = getTextDataStream(env); DataStream<String> streamSource = getTextDataStream(env);
DataStream<Tuple2<String, Integer>> tweets = streamSource DataStream<Tuple2<String, Integer>> tweets = streamSource
// selecting English tweets and splitting to words // selecting English tweets and splitting to (word, 1)
.flatMap(new SelectEnglishAndTokenizeFlatMap()) .flatMap(new SelectEnglishAndTokenizeFlatMap())
// returning (word, 1) // group by words and sum their occurrences
.map(new MapFunction<String, Tuple2<String, Integer>>() { .groupBy(0).sum(1);
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<String, Integer>(value, 1);
}
})
// group by words and sum their occurence
.groupBy(0).sum(1)
// select word with maximum occurence
.flatMap(new SelectMaxOccurence());
// emit result // emit result
if (fileOutput) { if (fileOutput) {
tweets.writeAsText(outputPath, 1L); tweets.writeAsText(outputPath);
} else { } else {
tweets.print(); tweets.print();
} }
...@@ -110,14 +96,14 @@ public class TwitterStream { ...@@ -110,14 +96,14 @@ public class TwitterStream {
* Integer>). * Integer>).
* </p> * </p>
*/ */
public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, String> { public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
/** /**
* Select the language from the incoming JSON text * Select the language from the incoming JSON text
*/ */
@Override @Override
public void flatMap(String value, Collector<String> out) throws Exception { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
try { try {
if (getString(value, "lang").equals("en")) { if (getString(value, "lang").equals("en")) {
// message of tweet // message of tweet
...@@ -125,39 +111,15 @@ public class TwitterStream { ...@@ -125,39 +111,15 @@ public class TwitterStream {
// split the message // split the message
while (tokenizer.hasMoreTokens()) { while (tokenizer.hasMoreTokens()) {
String result = tokenizer.nextToken().replaceAll("\\s*", ""); String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
if (result != null && !result.equals("")) { if (result != null && !result.equals("")) {
out.collect(result); out.collect(new Tuple2<String, Integer>(result, 1));
} }
} }
} }
} catch (JSONException e) { } catch (JSONException e) {
// the JSON was not parsed correctly
}
}
}
/**
* Implements a user-defined FlatMapFunction that checks if the current
* occurence is higher than the maximum occurence. If so, returns the word
* and changes the maximum.
*/
public static class SelectMaxOccurence implements
FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
private Integer maximum;
public SelectMaxOccurence() {
this.maximum = 0;
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out)
throws Exception {
if ((Integer) value.getField(1) >= maximum) {
out.collect(value);
maximum = (Integer) value.getField(1);
} }
} }
} }
...@@ -168,7 +130,7 @@ public class TwitterStream { ...@@ -168,7 +130,7 @@ public class TwitterStream {
private static boolean fileInput = false; private static boolean fileInput = false;
private static boolean fileOutput = false; private static boolean fileOutput = false;
private static String textPath; private static String propertiesPath;
private static String outputPath; private static String outputPath;
private static boolean parseParameters(String[] args) { private static boolean parseParameters(String[] args) {
...@@ -177,18 +139,18 @@ public class TwitterStream { ...@@ -177,18 +139,18 @@ public class TwitterStream {
fileOutput = true; fileOutput = true;
if (args.length == 2) { if (args.length == 2) {
fileInput = true; fileInput = true;
textPath = args[0]; propertiesPath = args[0];
outputPath = args[1]; outputPath = args[1];
} else if (args.length == 1) { } else if (args.length == 1) {
outputPath = args[0]; outputPath = args[0];
} else { } else {
System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>"); System.err.println("USAGE:\nTwitterStream [<pathToPropertiesFile>] <result path>");
return false; return false;
} }
} else { } else {
System.out.println("Executing TwitterStream example with built-in default data."); System.out.println("Executing TwitterStream example with built-in default data.");
System.out.println(" Provide parameters to read input data from a file."); System.out.println(" Provide parameters to read input data from a file.");
System.out.println(" USAGE: TwitterStream <pathToPropertiesFile>"); System.out.println(" USAGE: TwitterStream [<pathToPropertiesFile>] <result path>");
} }
return true; return true;
} }
...@@ -196,7 +158,7 @@ public class TwitterStream { ...@@ -196,7 +158,7 @@ public class TwitterStream {
private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) { private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
if (fileInput) { if (fileInput) {
// read the text file from given input path // read the text file from given input path
return env.readTextFile(textPath); return env.addSource(new TwitterSource(propertiesPath));
} else { } else {
// get default test text data // get default test text data
return env.fromElements(TwitterStreamData.TEXTS); return env.fromElements(TwitterStreamData.TEXTS);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册