diff --git a/README.md b/README.md index c6315c45a22d0d085544a85c3ca2f50ad1d638cc..58eae014aa4fcfdb596a45758de9a80ada63ab39 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ whatsmars-common | Utils公共模块 / Java SE demo whatsmars-dubbo | 高性能分布式RPC框架 whatsmars-elasticjob | 分布式调度框架 whatsmars-elasticsearch | Elasticsearch +whatsmars-flink | 分布式流处理框架 whatsmars-mq | 消息中间件RocketMQ,Kafka等 whatsmars-redis | Redis客户端简单封装 whatsmars-rpc | Transporter & Codec & Serialization @@ -26,7 +27,7 @@ whatsmars-spring | Spring Framework whatsmars-spring-boot | Spring Boot 实战 whatsmars-spring-boot-samples | Spring Boot Samples whatsmars-spring-cloud | Spring Cloud 微服务生态 -whatsmars-storm | 分布式实时计算系统 +whatsmars-storm | 分布式流处理框架 whatsmars-zk | zookeeper remoting 封装 ### Rocket Stack diff --git a/pom.xml b/pom.xml index 28a72b81b54c464b7f52608e8dd1a54b616eda89..4d5b3430811114fb224f22f0710fb80a280df920 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,7 @@ whatsmars-elasticsearch whatsmars-zk whatsmars-storm + whatsmars-flink diff --git a/whatsmars-flink/pom.xml b/whatsmars-flink/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..af8e3ecb48cec97840aa8595375ee96fa6979187 --- /dev/null +++ b/whatsmars-flink/pom.xml @@ -0,0 +1,73 @@ + + + + whatsmars-parent + org.hongxi + Rocket.S7 + + 4.0.0 + + whatsmars-flink + + + 1.9.1 + 2.11 + + + + + + + org.apache.flink + flink-java + ${flink.version} + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + + + + + + + + + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${flink.version} + test + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + + test-jar + + + + + \ No newline at end of file diff --git a/whatsmars-flink/src/main/java/org/hongxi/whatsmars/flink/streaming/WordCount.java b/whatsmars-flink/src/main/java/org/hongxi/whatsmars/flink/streaming/WordCount.java new file mode 100644 index 0000000000000000000000000000000000000000..a74f8d13163492ea9386b403333e7952531d0da5 --- /dev/null +++ b/whatsmars-flink/src/main/java/org/hongxi/whatsmars/flink/streaming/WordCount.java @@ -0,0 +1,102 @@ +package org.hongxi.whatsmars.flink.streaming; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +import org.hongxi.whatsmars.flink.streaming.util.WordCountData; + +/** + * Implements the "WordCount" program that computes a simple word occurrence + * histogram over text files in a streaming fashion. + * + *

The input is a plain text file with lines separated by newline characters. + * + *

Usage: WordCount --input <path> --output <path>
+ * If no parameters are provided, the program is run with default data from + * {@link WordCountData}. + * + *

This example shows how to: + *

+ */ +public class WordCount { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + // Checking input parameters + final ParameterTool params = ParameterTool.fromArgs(args); + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // make parameters available in the web interface + env.getConfig().setGlobalJobParameters(params); + + // get input data + DataStream text; + if (params.has("input")) { + // read the text file from given input path + text = env.readTextFile(params.get("input")); + } else { + System.out.println("Executing WordCount example with default input data set."); + System.out.println("Use --input to specify file input."); + // get default test text data + text = env.fromElements(WordCountData.WORDS); + } + + DataStream> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .keyBy(0).sum(1); + + // emit result + if (params.has("output")) { + counts.writeAsText(params.get("output")); + } else { + System.out.println("Printing result to stdout. Use --output to specify output path."); + counts.print(); + } + + // execute program + env.execute("Streaming WordCount"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a + * user-defined FlatMapFunction. The function takes a line (String) and + * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2}). + */ + public static final class Tokenizer implements FlatMapFunction> { + + @Override + public void flatMap(String value, Collector> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<>(token, 1)); + } + } + } + } + +} \ No newline at end of file diff --git a/whatsmars-flink/src/main/java/org/hongxi/whatsmars/flink/streaming/util/WordCountData.java b/whatsmars-flink/src/main/java/org/hongxi/whatsmars/flink/streaming/util/WordCountData.java new file mode 100644 index 0000000000000000000000000000000000000000..259a399b8e9d2c54c5fd963cd659768a1400243e --- /dev/null +++ b/whatsmars-flink/src/main/java/org/hongxi/whatsmars/flink/streaming/util/WordCountData.java @@ -0,0 +1,47 @@ +package org.hongxi.whatsmars.flink.streaming.util; + +/** + * Provides the default data sets used for the WordCount example program. + * The default data sets are used, if no parameters are given to the program. + * + */ +public class WordCountData { + + public static final String[] WORDS = new String[] { + "To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles,", + "And by opposing end them?--To die,--to sleep,--", + "No more; and by a sleep to say we end", + "The heartache, and the thousand natural shocks", + "That flesh is heir to,--'tis a consummation", + "Devoutly to be wish'd. To die,--to sleep;--", + "To sleep! perchance to dream:--ay, there's the rub;", + "For in that sleep of death what dreams may come,", + "When we have shuffled off this mortal coil,", + "Must give us pause: there's the respect", + "That makes calamity of so long life;", + "For who would bear the whips and scorns of time,", + "The oppressor's wrong, the proud man's contumely,", + "The pangs of despis'd love, the law's delay,", + "The insolence of office, and the spurns", + "That patient merit of the unworthy takes,", + "When he himself might his quietus make", + "With a bare bodkin? who would these fardels bear,", + "To grunt and sweat under a weary life,", + "But that the dread of something after death,--", + "The undiscover'd country, from whose bourn", + "No traveller returns,--puzzles the will,", + "And makes us rather bear those ills we have", + "Than fly to others that we know not of?", + "Thus conscience does make cowards of us all;", + "And thus the native hue of resolution", + "Is sicklied o'er with the pale cast of thought;", + "And enterprises of great pith and moment,", + "With this regard, their currents turn awry,", + "And lose the name of action.--Soft you now!", + "The fair Ophelia!--Nymph, in thy orisons", + "Be all my sins remember'd." + }; +} \ No newline at end of file diff --git a/whatsmars-flink/src/main/resources/log4j.properties b/whatsmars-flink/src/main/resources/log4j.properties new file mode 100644 index 0000000000000000000000000000000000000000..1d6ae64daab6a6d966c2ed9510193367de68a6bf --- /dev/null +++ b/whatsmars-flink/src/main/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file