Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
000b5d53
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
000b5d53
编写于
5月 14, 2015
作者:
M
mjsax
提交者:
mbalassi
6月 14, 2015
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[storm-compat] Added Storm compatibility word count examples
上级
56e013f0
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
665 addition
and
0 deletion
+665
-0
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
.../stormcompatibility/wordcount/BoltTokenizerWordCount.java
+127
-0
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
...nk/stormcompatibility/wordcount/SpoutSourceWordCount.java
+158
-0
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
...ink/stormcompatibility/wordcount/StormWordCountLocal.java
+80
-0
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
...compatibility/wordcount/StormWordCountRemoteByClient.java
+88
-0
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
...patibility/wordcount/StormWordCountRemoteBySubmitter.java
+89
-0
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
...flink/stormcompatibility/wordcount/WordCountTopology.java
+123
-0
未找到文件。
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
0 → 100644
浏览文件 @
000b5d53
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.stormcompatibility.wordcount
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.api.java.typeutils.TypeExtractor
;
import
org.apache.flink.examples.java.wordcount.util.WordCountData
;
import
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer
;
import
org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
backtype.storm.topology.IRichBolt
;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}.
*
* <p>
* The input is a plain text file with lines separated by newline characters.
*
* <p>
* Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>
* This example shows how to:
* <ul>
* <li>use a Storm bolt within a Flink Streaming program.
* </ul>
*/
public
class
BoltTokenizerWordCount
{
// *************************************************************************
// PROGRAM
// *************************************************************************
public
static
void
main
(
final
String
[]
args
)
throws
Exception
{
if
(!
parseParameters
(
args
))
{
return
;
}
// set up the execution environment
final
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
// get input data
final
DataStream
<
String
>
text
=
getTextDataStream
(
env
);
final
DataStream
<
Tuple2
<
String
,
Integer
>>
counts
=
text
// split up the lines in pairs (2-tuples) containing: (word,1)
// this is done by a Storm bolt that is wrapped accordingly
.
transform
(
"StormBoltTokenizer"
,
TypeExtractor
.
getForObject
(
new
Tuple2
<
String
,
Integer
>(
new
String
(),
new
Integer
(
0
))),
new
StormBoltWrapper
<
String
,
Tuple2
<
String
,
Integer
>>(
new
StormBoltTokenizer
()))
// split up the lines in pairs (2-tuples) containing: (word,1)
// group by the tuple field "0" and sum up tuple field "1"
.
groupBy
(
0
).
sum
(
1
);
// emit result
if
(
fileOutput
)
{
counts
.
writeAsText
(
outputPath
);
}
else
{
counts
.
print
();
}
// execute program
env
.
execute
(
"Streaming WordCount with Storm bolt tokenizer"
);
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private
static
boolean
fileOutput
=
false
;
private
static
String
textPath
;
private
static
String
outputPath
;
private
static
boolean
parseParameters
(
final
String
[]
args
)
{
if
(
args
.
length
>
0
)
{
// parse input arguments
fileOutput
=
true
;
if
(
args
.
length
==
2
)
{
textPath
=
args
[
0
];
outputPath
=
args
[
1
];
}
else
{
System
.
err
.
println
(
"Usage: WordCount <text path> <result path>"
);
return
false
;
}
}
else
{
System
.
out
.
println
(
"Executing WordCount example with built-in default data."
);
System
.
out
.
println
(
" Provide parameters to read input data from a file."
);
System
.
out
.
println
(
" Usage: WordCount <text path> <result path>"
);
}
return
true
;
}
private
static
DataStream
<
String
>
getTextDataStream
(
final
StreamExecutionEnvironment
env
)
{
if
(
fileOutput
)
{
// read the text file from given input path
return
env
.
readTextFile
(
textPath
);
}
return
env
.
fromElements
(
WordCountData
.
WORDS
);
}
}
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
0 → 100644
浏览文件 @
000b5d53
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.stormcompatibility.wordcount
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.api.java.typeutils.TypeExtractor
;
import
org.apache.flink.examples.java.wordcount.util.WordCountData
;
import
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormFileSpout
;
import
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormInMemorySpout
;
import
org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.util.Collector
;
import
backtype.storm.topology.IRichSpout
;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The used data source is a Storm {@link IRichSpout bolt}.
*
* <p>
* The input is a plain text file with lines separated by newline characters.
*
* <p>
* Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>
* This example shows how to:
* <ul>
* <li>use a Storm bolt within a Flink Streaming program.
* </ul>
*/
public
class
SpoutSourceWordCount
{
// *************************************************************************
// PROGRAM
// *************************************************************************
public
static
void
main
(
final
String
[]
args
)
throws
Exception
{
if
(!
parseParameters
(
args
))
{
return
;
}
// set up the execution environment
final
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
// get input data
final
DataStream
<
String
>
text
=
getTextDataStream
(
env
);
final
DataStream
<
Tuple2
<
String
,
Integer
>>
counts
=
// split up the lines in pairs (2-tuples) containing: (word,1)
text
.
flatMap
(
new
Tokenizer
())
// group by the tuple field "0" and sum up tuple field "1"
.
groupBy
(
0
).
sum
(
1
);
// emit result
if
(
fileOutput
)
{
counts
.
writeAsText
(
outputPath
);
}
else
{
counts
.
print
();
}
// execute program
env
.
execute
(
"Streaming WordCount with Storm spout source"
);
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* Implements the string tokenizer that splits sentences into words as a user-defined FlatMapFunction. The function
* takes a line (String) and splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
public
static
final
class
Tokenizer
implements
FlatMapFunction
<
String
,
Tuple2
<
String
,
Integer
>>
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
flatMap
(
final
String
value
,
final
Collector
<
Tuple2
<
String
,
Integer
>>
out
)
throws
Exception
{
// normalize and split the line
final
String
[]
tokens
=
value
.
toLowerCase
().
split
(
"\\W+"
);
// emit the pairs
for
(
final
String
token
:
tokens
)
{
if
(
token
.
length
()
>
0
)
{
out
.
collect
(
new
Tuple2
<
String
,
Integer
>(
token
,
new
Integer
(
1
)));
}
}
}
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private
static
boolean
fileOutput
=
false
;
private
static
String
textPath
;
private
static
String
outputPath
;
private
static
boolean
parseParameters
(
final
String
[]
args
)
{
if
(
args
.
length
>
0
)
{
// parse input arguments
fileOutput
=
true
;
if
(
args
.
length
==
2
)
{
textPath
=
args
[
0
];
outputPath
=
args
[
1
];
}
else
{
System
.
err
.
println
(
"Usage: WordCount <text path> <result path>"
);
return
false
;
}
}
else
{
System
.
out
.
println
(
"Executing WordCount example with built-in default data."
);
System
.
out
.
println
(
" Provide parameters to read input data from a file."
);
System
.
out
.
println
(
" Usage: WordCount <text path> <result path>"
);
}
return
true
;
}
private
static
DataStream
<
String
>
getTextDataStream
(
final
StreamExecutionEnvironment
env
)
{
if
(
fileOutput
)
{
// read the text file from given input path
final
String
[]
tokens
=
textPath
.
split
(
":"
);
final
String
localFile
=
tokens
[
tokens
.
length
-
1
];
final
DataStream
<
String
>
stream
=
env
.
addSource
(
new
StormFiniteSpoutWrapper
<
String
>(
new
StormFileSpout
(
localFile
),
true
),
TypeExtractor
.
getForClass
(
String
.
class
)).
setParallelism
(
1
);
return
stream
;
}
return
env
.
addSource
(
new
StormFiniteSpoutWrapper
<
String
>(
new
StormInMemorySpout
(),
true
),
TypeExtractor
.
getForClass
(
String
.
class
));
}
}
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
0 → 100644
浏览文件 @
000b5d53
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.stormcompatibility.wordcount
;
import
org.apache.flink.examples.java.wordcount.util.WordCountData
;
import
org.apache.flink.stormcompatibility.api.FlinkLocalCluster
;
import
org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder
;
import
backtype.storm.LocalCluster
;
import
backtype.storm.generated.StormTopology
;
import
backtype.storm.utils.Utils
;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
* same way as to a Storm {@link LocalCluster}.
*
* This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
* via Flink command line clients (ie, bin/flink).
*
* <p>
* The input is a plain text file with lines separated by newline characters.
*
* <p>
* Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>
* This example shows how to:
* <ul>
* <li>run a regular Storm program locally on Flink
* </ul>
*/
public
class
StormWordCountLocal
{
public
final
static
String
topologyId
=
"Streaming WordCount"
;
// *************************************************************************
// PROGRAM
// *************************************************************************
public
static
void
main
(
final
String
[]
args
)
throws
Exception
{
if
(!
WordCountTopology
.
parseParameters
(
args
))
{
return
;
}
// build Topology the Storm way
final
FlinkTopologyBuilder
builder
=
WordCountTopology
.
buildTopology
();
// execute program locally
final
FlinkLocalCluster
cluster
=
FlinkLocalCluster
.
getLocalCluster
();
cluster
.
submitTopology
(
topologyId
,
null
,
builder
.
createTopology
());
Utils
.
sleep
(
5
*
1000
);
// TODO kill does no do anything so far
cluster
.
killTopology
(
topologyId
);
cluster
.
shutdown
();
}
}
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
0 → 100644
浏览文件 @
000b5d53
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.stormcompatibility.wordcount
;
import
org.apache.flink.examples.java.wordcount.util.WordCountData
;
import
org.apache.flink.stormcompatibility.api.FlinkClient
;
import
org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder
;
import
backtype.storm.Config
;
import
backtype.storm.generated.AlreadyAliveException
;
import
backtype.storm.generated.InvalidTopologyException
;
import
backtype.storm.generated.NotAliveException
;
import
backtype.storm.generated.StormTopology
;
import
backtype.storm.utils.NimbusClient
;
import
backtype.storm.utils.Utils
;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
* same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can be local or remote.
*
* This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via
* Flink command line clients (ie, bin/flink).
*
* <p>
* The input is a plain text file with lines separated by newline characters.
*
* <p>
* Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>
* This example shows how to:
* <ul>
* <li>submit a regular Storm program to a local or remote Flink cluster.
* </ul>
*/
public
class
StormWordCountRemoteByClient
{
public
final
static
String
topologyId
=
"Streaming WordCount"
;
private
final
static
String
uploadedJarLocation
=
"target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar"
;
// *************************************************************************
// PROGRAM
// *************************************************************************
public
static
void
main
(
final
String
[]
args
)
throws
AlreadyAliveException
,
InvalidTopologyException
,
NotAliveException
{
if
(!
WordCountTopology
.
parseParameters
(
args
))
{
return
;
}
// build Topology the Storm way
final
FlinkTopologyBuilder
builder
=
WordCountTopology
.
buildTopology
();
// execute program on Flink cluster
final
Config
conf
=
new
Config
();
conf
.
put
(
Config
.
NIMBUS_HOST
,
"localhost"
);
// can be changed to remote address
conf
.
put
(
Config
.
NIMBUS_THRIFT_PORT
,
new
Integer
(
6123
));
// use default flink jobmanger.rpc.port
final
FlinkClient
cluster
=
FlinkClient
.
getConfiguredClient
(
conf
);
cluster
.
submitTopology
(
topologyId
,
uploadedJarLocation
,
null
,
builder
.
createTopology
());
Utils
.
sleep
(
5
*
1000
);
cluster
.
killTopology
(
topologyId
);
}
}
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
0 → 100644
浏览文件 @
000b5d53
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.stormcompatibility.wordcount
;
import
org.apache.flink.examples.java.wordcount.util.WordCountData
;
import
org.apache.flink.stormcompatibility.api.FlinkClient
;
import
org.apache.flink.stormcompatibility.api.FlinkSubmitter
;
import
org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder
;
import
backtype.storm.Config
;
import
backtype.storm.StormSubmitter
;
import
backtype.storm.generated.StormTopology
;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
* same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can be local or remote.
*
* This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
*
* <p>
* The input is a plain text file with lines separated by newline characters.
*
* <p>
* Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>
* This example shows how to:
* <ul>
* <li>submit a regular Storm program to a local or remote Flink cluster.
* </ul>
*/
public
class
StormWordCountRemoteBySubmitter
{
public
final
static
String
topologyId
=
"Streaming WordCount"
;
// *************************************************************************
// PROGRAM
// *************************************************************************
public
static
void
main
(
final
String
[]
args
)
throws
Exception
{
if
(!
WordCountTopology
.
parseParameters
(
args
))
{
return
;
}
// build Topology the Storm way
final
FlinkTopologyBuilder
builder
=
WordCountTopology
.
buildTopology
();
// execute program on Flink cluster
final
Config
conf
=
new
Config
();
// we can set Jobmanager host/port values manually or leave them blank
// if not set and
// - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter
// - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter
// conf.put(Config.NIMBUS_HOST, "localhost"); // can be changed to remote address
// conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123)); // use default flink jobmanger.rpc.port
// the user jar file must be specified via JVM argument if executed via Java
// => -Dstorm.jar=target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar
// if bin/flink is used, the jar file is detected automatically
FlinkSubmitter
.
submitTopology
(
topologyId
,
conf
,
builder
.
createTopology
());
Thread
.
sleep
(
5
*
1000
);
FlinkClient
.
getConfiguredClient
(
conf
).
killTopology
(
topologyId
);
}
}
flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
0 → 100644
浏览文件 @
000b5d53
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.stormcompatibility.wordcount
;
import
org.apache.flink.examples.java.wordcount.util.WordCountData
;
import
org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder
;
import
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter
;
import
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltFileSink
;
import
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltPrintSink
;
import
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer
;
import
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormFileSpout
;
import
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormInMemorySpout
;
import
backtype.storm.generated.StormTopology
;
import
backtype.storm.tuple.Fields
;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The program is constructed as a regular {@link StormTopology}.
*
* <p>
* The input is a plain text file with lines separated by newline characters.
*
* <p>
* Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>
* This example shows how to:
* <ul>
* <li>how to construct a regular Storm topology as Flink program
* </ul>
*/
public
class
WordCountTopology
{
public
final
static
String
spoutId
=
"source"
;
public
final
static
String
tokenierzerId
=
"tokenizer"
;
public
final
static
String
counterId
=
"counter"
;
public
final
static
String
sinkId
=
"sink"
;
public
static
FlinkTopologyBuilder
buildTopology
()
{
final
FlinkTopologyBuilder
builder
=
new
FlinkTopologyBuilder
();
// get input data
if
(
fileInputOutput
)
{
// read the text file from given input path
final
String
[]
tokens
=
textPath
.
split
(
":"
);
final
String
inputFile
=
tokens
[
tokens
.
length
-
1
];
builder
.
setSpout
(
spoutId
,
new
StormFileSpout
(
inputFile
));
}
else
{
builder
.
setSpout
(
spoutId
,
new
StormInMemorySpout
());
}
// split up the lines in pairs (2-tuples) containing: (word,1)
builder
.
setBolt
(
tokenierzerId
,
new
StormBoltTokenizer
(),
new
Integer
(
4
)).
shuffleGrouping
(
spoutId
);
// group by the tuple field "0" and sum up tuple field "1"
builder
.
setBolt
(
counterId
,
new
StormBoltCounter
(),
new
Integer
(
4
)).
fieldsGrouping
(
tokenierzerId
,
new
Fields
(
StormBoltTokenizer
.
ATTRIBUTE_WORD
));
// emit result
if
(
fileInputOutput
)
{
// read the text file from given input path
final
String
[]
tokens
=
outputPath
.
split
(
":"
);
final
String
outputFile
=
tokens
[
tokens
.
length
-
1
];
builder
.
setBolt
(
sinkId
,
new
StormBoltFileSink
(
outputFile
)).
shuffleGrouping
(
counterId
);
}
else
{
builder
.
setBolt
(
sinkId
,
new
StormBoltPrintSink
(),
new
Integer
(
4
)).
shuffleGrouping
(
counterId
);
}
return
builder
;
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private
static
boolean
fileInputOutput
=
false
;
private
static
String
textPath
;
private
static
String
outputPath
;
static
boolean
parseParameters
(
final
String
[]
args
)
{
if
(
args
.
length
>
0
)
{
// parse input arguments
fileInputOutput
=
true
;
if
(
args
.
length
==
2
)
{
textPath
=
args
[
0
];
outputPath
=
args
[
1
];
}
else
{
System
.
err
.
println
(
"Usage: WordCount <text path> <result path>"
);
return
false
;
}
}
else
{
System
.
out
.
println
(
"Executing WordCount example with built-in default data."
);
System
.
out
.
println
(
" Provide parameters to read input data from a file."
);
System
.
out
.
println
(
" Usage: WordCount <text path> <result path>"
);
}
return
true
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录