Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
28ba85b5
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
28ba85b5
编写于
7月 14, 2014
作者:
M
Márton Balassi
提交者:
Stephan Ewen
8月 18, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[streaming] Minor WordCount Refactor
上级
839d46ce
变更
40
隐藏空白更改
内联
并排
Showing
40 changed file
with
649 addition
and
1239 deletion
+649
-1239
flink-addons/flink-streaming/pom.xml
flink-addons/flink-streaming/pom.xml
+1
-6
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkFunction.java
...main/java/eu/stratosphere/streaming/api/SinkFunction.java
+28
-28
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java
...phere/streaming/api/streamcomponent/StreamWindowTask.java
+39
-39
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringLocal.java
...tive/collaborativefilter/CollaborativeFilteringLocal.java
+0
-46
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSink.java
...ative/collaborativefilter/CollaborativeFilteringSink.java
+0
-37
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSource.java
...ive/collaborativefilter/CollaborativeFilteringSource.java
+0
-54
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringTask.java
...ative/collaborativefilter/CollaborativeFilteringTask.java
+0
-52
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansSource.java
...ere/streaming/examples/iterative/kmeans/KMeansSource.java
+0
-84
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansTask.java
...phere/streaming/examples/iterative/kmeans/KMeansTask.java
+0
-42
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/Graph.java
...tosphere/streaming/examples/iterative/pagerank/Graph.java
+0
-47
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankLocal.java
.../streaming/examples/iterative/pagerank/PageRankLocal.java
+0
-46
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSink.java
...e/streaming/examples/iterative/pagerank/PageRankSink.java
+0
-37
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSource.java
...streaming/examples/iterative/pagerank/PageRankSource.java
+0
-52
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankTask.java
...e/streaming/examples/iterative/pagerank/PageRankTask.java
+0
-39
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/Graph.java
...stratosphere/streaming/examples/iterative/sssp/Graph.java
+0
-47
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPLocal.java
...tosphere/streaming/examples/iterative/sssp/SSSPLocal.java
+0
-46
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPSink.java
...atosphere/streaming/examples/iterative/sssp/SSSPSink.java
+0
-37
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPTask.java
...atosphere/streaming/examples/iterative/sssp/SSSPTask.java
+0
-39
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java
...re/streaming/examples/ml/IncrementalLearningSkeleton.java
+46
-100
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java
...eu/stratosphere/streaming/examples/ml/IncrementalOLS.java
+54
-87
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java
...ere/streaming/examples/window/sum/WindowSumAggregate.java
+52
-49
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java
...osphere/streaming/examples/window/sum/WindowSumLocal.java
+63
-13
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java
...here/streaming/examples/window/sum/WindowSumMultiple.java
+11
-9
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java
...tosphere/streaming/examples/window/sum/WindowSumSink.java
+12
-6
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java
...sphere/streaming/examples/window/sum/WindowSumSource.java
+10
-10
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java
...ing/examples/window/wordcount/WindowWordCountCounter.java
+58
-55
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java
...aming/examples/window/wordcount/WindowWordCountLocal.java
+63
-13
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java
...eaming/examples/window/wordcount/WindowWordCountSink.java
+17
-6
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java
...ming/examples/window/wordcount/WindowWordCountSource.java
+20
-11
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java
...ng/examples/window/wordcount/WindowWordCountSplitter.java
+13
-13
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java
...tosphere/streaming/examples/wordcount/WordCountLocal.java
+3
-3
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java
...phere/streaming/examples/wordcount/WordCountSplitter.java
+1
-3
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaProducer.java
...n/java/eu/stratosphere/streaming/kafka/KafkaProducer.java
+0
-58
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaSource.java
...ain/java/eu/stratosphere/streaming/kafka/KafkaSource.java
+3
-2
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableState.java
...n/java/eu/stratosphere/streaming/state/LogTableState.java
+92
-0
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableStateIterator.java
...u/stratosphere/streaming/state/LogTableStateIterator.java
+25
-11
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableState.java
...va/eu/stratosphere/streaming/state/MutableTableState.java
+1
-2
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/SlidingWindowState.java
...a/eu/stratosphere/streaming/state/SlidingWindowState.java
+7
-9
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/TableState.java
...main/java/eu/stratosphere/streaming/state/TableState.java
+1
-1
flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/state/InternalStateTest.java
...va/eu/stratosphere/streaming/state/InternalStateTest.java
+29
-0
未找到文件。
flink-addons/flink-streaming/pom.xml
浏览文件 @
28ba85b5
...
...
@@ -87,12 +87,7 @@
<dependency>
<groupId>
org.apache.kafka
</groupId>
<artifactId>
kafka_2.10
</artifactId>
<version>
0.8.0
</version>
</dependency>
<dependency>
<groupId>
org.jblas
</groupId>
<artifactId>
jblas
</artifactId>
<version>
1.2.3
</version>
<version>
0.8.1.1
</version>
</dependency>
</dependencies>
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/SinkFunction.java
100644 → 100755
浏览文件 @
28ba85b5
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.api
;
import
java.io.Serializable
;
import
eu.stratosphere.api.java.tuple.Tuple
;
public
abstract
class
SinkFunction
<
IN
extends
Tuple
>
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
public
abstract
void
invoke
(
IN
tuple
);
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.api
;
import
java.io.Serializable
;
import
eu.stratosphere.api.java.tuple.Tuple
;
public
abstract
class
SinkFunction
<
IN
extends
Tuple
>
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
public
abstract
void
invoke
(
IN
tuple
);
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java
浏览文件 @
28ba85b5
...
...
@@ -15,21 +15,20 @@
package
eu.stratosphere.streaming.api.streamcomponent
;
import
java.util.ArrayList
;
import
eu.stratosphere.
api.java.functions.FlatMapFunction
;
import
eu.stratosphere.
api.java.tuple.Tuple
;
import
eu.stratosphere.streaming.api.StreamCollector
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.
streaming.api.streamrecord.ArrayStreamRecord
;
import
eu.stratosphere.
streaming.api.streamrecord.StreamRecord
;
import
eu.stratosphere.streaming.state.MutableTableState
;
import
eu.stratosphere.streaming.state.SlidingWindowState
;
import
eu.stratosphere.util.Collector
;
public
class
StreamWindowTask
extends
FlatMapFunction
<
Tuple
,
Tuple
>
{
public
class
StreamWindowTask
extends
UserTaskInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
int
computeGranularity
;
private
int
windowFieldId
=
1
;
private
ArrayList
tempArrayList
;
private
StreamRecord
tempRecord
;
private
SlidingWindowState
<
Integer
>
window
;
private
MutableTableState
<
String
,
Integer
>
sum
;
private
long
initTimestamp
=
-
1
;
...
...
@@ -45,42 +44,43 @@ public class StreamWindowTask extends FlatMapFunction<Tuple, Tuple> {
sum
.
put
(
"sum"
,
0
);
}
private
void
incrementCompute
(
ArrayList
tupleArray
)
{}
private
void
decrementCompute
(
ArrayList
tupleArray
)
{}
private
void
produceOutput
(
long
progress
,
Collector
out
)
{}
private
void
incrementCompute
(
StreamRecord
record
){}
private
void
decrementCompute
(
StreamRecord
record
){}
private
void
produceRecord
(
long
progress
){}
@Override
public
void
flatMap
(
Tuple
value
,
Collector
<
Tuple
>
out
)
throws
Exception
{
// TODO Auto-generated method stub
long
progress
=
value
.
getField
(
windowFieldId
);
if
(
initTimestamp
==
-
1
)
{
initTimestamp
=
progress
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempArrayList
=
new
ArrayList
();
}
else
{
if
(
progress
>
nextTimestamp
)
{
if
(
window
.
isFull
())
{
ArrayList
expiredArrayList
=
window
.
popFront
();
incrementCompute
(
tempArrayList
);
decrementCompute
(
expiredArrayList
);
window
.
pushBack
(
tempArrayList
);
if
(
window
.
isEmittable
())
{
produceOutput
(
progress
,
out
);
}
}
else
{
incrementCompute
(
tempArrayList
);
window
.
pushBack
(
tempArrayList
);
public
void
invoke
(
StreamRecord
record
,
StreamCollector
collector
)
throws
Exception
{
int
numTuple
=
record
.
getBatchSize
();
int
tupleIndex
=
0
;
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
long
progress
=
record
.
getTuple
(
i
).
getField
(
windowFieldId
);
if
(
initTimestamp
==
-
1
)
{
initTimestamp
=
progress
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempRecord
=
new
ArrayStreamRecord
(
record
.
getBatchSize
());
}
else
{
if
(
progress
>
nextTimestamp
)
{
if
(
window
.
isFull
())
{
produceOutput
(
progress
,
out
);
StreamRecord
expiredRecord
=
window
.
popFront
();
incrementCompute
(
tempRecord
);
decrementCompute
(
expiredRecord
);
window
.
pushBack
(
tempRecord
);
if
(
window
.
isEmittable
())
{
produceRecord
(
progress
);
}
}
else
{
incrementCompute
(
tempRecord
);
window
.
pushBack
(
tempRecord
);
if
(
window
.
isFull
())
{
produceRecord
(
progress
);
}
}
initTimestamp
=
nextTimestamp
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempRecord
=
new
ArrayStreamRecord
(
record
.
getBatchSize
());
}
initTimestamp
=
nextTimestamp
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempArrayList
=
new
ArrayList
();
tempRecord
.
setTuple
(
tupleIndex
++,
record
.
getTuple
(
i
));
}
tempArrayList
.
add
(
value
);
}
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringLocal.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.collaborativefilter
;
import
org.apache.log4j.Level
;
import
eu.stratosphere.nephele.jobgraph.JobGraph
;
import
eu.stratosphere.streaming.api.JobGraphBuilder
;
import
eu.stratosphere.streaming.faulttolerance.FaultToleranceType
;
import
eu.stratosphere.streaming.util.ClusterUtil
;
import
eu.stratosphere.streaming.util.LogUtils
;
public
class
CollaborativeFilteringLocal
{
public
static
JobGraph
getJobGraph
()
{
JobGraphBuilder
graphBuilder
=
new
JobGraphBuilder
(
"testGraph"
,
FaultToleranceType
.
NONE
);
graphBuilder
.
setSource
(
"Source"
,
new
CollaborativeFilteringSource
());
graphBuilder
.
setTask
(
"Task"
,
new
CollaborativeFilteringTask
(),
1
,
1
);
graphBuilder
.
setSink
(
"Sink"
,
new
CollaborativeFilteringSink
());
graphBuilder
.
fieldsConnect
(
"Source"
,
"Task"
,
0
);
graphBuilder
.
shuffleConnect
(
"Task"
,
"Sink"
);
return
graphBuilder
.
getJobGraph
();
}
public
static
void
main
(
String
[]
args
)
{
LogUtils
.
initializeDefaultConsoleLogger
(
Level
.
DEBUG
,
Level
.
INFO
);
ClusterUtil
.
runOnMiniCluster
(
getJobGraph
());
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSink.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.collaborativefilter
;
import
eu.stratosphere.streaming.api.invokable.UserSinkInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
CollaborativeFilteringSink
extends
UserSinkInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
// TODO Auto-generated method stub
System
.
out
.
println
(
"received record..."
);
int
tupleNum
=
record
.
getNumOfTuples
();
System
.
out
.
println
(
"============================================"
);
for
(
int
i
=
0
;
i
<
tupleNum
;
++
i
)
{
System
.
out
.
println
(
"name="
+
record
.
getField
(
i
,
0
)
+
", grade="
+
record
.
getField
(
i
,
1
)
+
", salary="
+
record
.
getField
(
i
,
2
));
}
System
.
out
.
println
(
"============================================"
);
}
}
\ No newline at end of file
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringSource.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.collaborativefilter
;
import
java.io.BufferedReader
;
import
java.io.FileReader
;
import
eu.stratosphere.api.java.tuple.Tuple3
;
import
eu.stratosphere.streaming.api.invokable.UserSourceInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
CollaborativeFilteringSource
extends
UserSourceInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
BufferedReader
br
=
null
;
private
String
line
=
new
String
();
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple3
<
Integer
,
Integer
,
Integer
>());
@Override
public
void
invoke
()
throws
Exception
{
// TODO Auto-generated method stub
br
=
new
BufferedReader
(
new
FileReader
(
"src/test/resources/testdata/MovieLens100k.data"
));
while
(
true
)
{
line
=
br
.
readLine
();
if
(
line
==
null
)
{
break
;
}
if
(
line
!=
""
)
{
String
[]
items
=
line
.
split
(
"\t"
);
outRecord
.
setInteger
(
0
,
Integer
.
valueOf
(
items
[
0
]));
outRecord
.
setInteger
(
1
,
Integer
.
valueOf
(
items
[
1
]));
outRecord
.
setInteger
(
2
,
Integer
.
valueOf
(
items
[
2
]));
emit
(
outRecord
);
performanceCounter
.
count
();
}
line
=
br
.
readLine
();
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringTask.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.collaborativefilter
;
import
java.util.HashMap
;
import
org.jblas.DoubleMatrix
;
import
eu.stratosphere.api.java.tuple.Tuple1
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
CollaborativeFilteringTask
extends
UserTaskInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple1
<
String
>());
HashMap
<
Integer
,
Integer
>
rowIndex
=
new
HashMap
<
Integer
,
Integer
>();
HashMap
<
Integer
,
Integer
>
columnIndex
=
new
HashMap
<
Integer
,
Integer
>();
DoubleMatrix
userItem
=
new
DoubleMatrix
(
1000
,
2000
);
DoubleMatrix
coOccurence
=
new
DoubleMatrix
(
2000
,
2000
);
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
// TODO Auto-generated method stub
int
userId
=
record
.
getInteger
(
0
,
0
);
int
itemId
=
record
.
getInteger
(
0
,
1
);
int
rating
=
record
.
getInteger
(
0
,
2
);
if
(!
rowIndex
.
containsKey
(
userId
)){
rowIndex
.
put
(
userId
,
rowIndex
.
size
());
}
if
(!
columnIndex
.
containsKey
(
itemId
)){
columnIndex
.
put
(
itemId
,
columnIndex
.
size
());
}
userItem
.
put
(
rowIndex
.
get
(
userId
),
columnIndex
.
get
(
itemId
),
rating
);
//outRecord.setString(0, line);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansSource.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.kmeans
;
import
java.util.Random
;
import
eu.stratosphere.api.java.tuple.Tuple1
;
import
eu.stratosphere.streaming.api.invokable.UserSourceInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
KMeansSource
extends
UserSourceInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
long
DEFAULT_SEED
=
4650285087650871364L
;
private
Random
random
=
new
Random
(
DEFAULT_SEED
);
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple1
<
String
>());
private
int
numCenter
;
private
int
dimension
;
private
double
absoluteStdDev
;
private
double
range
;
private
StringBuilder
buffer
=
new
StringBuilder
();
public
KMeansSource
(
int
numCenter
,
int
dimension
,
double
stddev
,
double
range
){
this
.
numCenter
=
numCenter
;
this
.
dimension
=
dimension
;
this
.
absoluteStdDev
=
stddev
*
range
;
this
.
range
=
range
;
}
@Override
public
void
invoke
()
throws
Exception
{
// TODO Auto-generated method stub
double
[][]
means
=
uniformRandomCenters
(
random
,
numCenter
,
dimension
,
range
);
double
[]
point
=
new
double
[
dimension
];
int
nextCentroid
=
0
;
while
(
true
)
{
// generate a point for the current centroid
double
[]
centroid
=
means
[
nextCentroid
];
for
(
int
d
=
0
;
d
<
dimension
;
d
++)
{
point
[
d
]
=
(
random
.
nextGaussian
()
*
absoluteStdDev
)
+
centroid
[
d
];
}
nextCentroid
=
(
nextCentroid
+
1
)
%
numCenter
;
String
pointString
=
generatePointString
(
point
);
outRecord
.
setString
(
0
,
pointString
);
emit
(
outRecord
);
}
}
private
double
[][]
uniformRandomCenters
(
Random
rnd
,
int
num
,
int
dimensionality
,
double
range
)
{
final
double
halfRange
=
range
/
2
;
final
double
[][]
points
=
new
double
[
num
][
dimensionality
];
for
(
int
i
=
0
;
i
<
num
;
i
++)
{
for
(
int
dim
=
0
;
dim
<
dimensionality
;
dim
++)
{
points
[
i
][
dim
]
=
(
rnd
.
nextDouble
()
*
range
)
-
halfRange
;
}
}
return
points
;
}
private
String
generatePointString
(
double
[]
point
){
buffer
.
setLength
(
0
);
for
(
int
j
=
0
;
j
<
dimension
;
j
++)
{
buffer
.
append
(
point
[
j
]);
if
(
j
<
dimension
-
1
)
{
buffer
.
append
(
" "
);
}
}
return
buffer
.
toString
();
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansTask.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.kmeans
;
import
eu.stratosphere.api.java.tuple.Tuple1
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
KMeansTask
extends
UserTaskInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple1
<
String
>());
private
double
[]
point
=
null
;
public
KMeansTask
(
int
dimension
){
point
=
new
double
[
dimension
];
}
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
// TODO Auto-generated method stub
String
[]
pointStr
=
record
.
getString
(
0
,
0
).
split
(
" "
);
for
(
int
i
=
0
;
i
<
pointStr
.
length
;
++
i
){
point
[
i
]=
Double
.
valueOf
(
pointStr
[
i
]);
}
outRecord
.
setString
(
0
,
record
.
getString
(
0
,
0
));
emit
(
outRecord
);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/Graph.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.pagerank
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.Map
;
import
java.util.Set
;
public
class
Graph
{
public
Map
<
Integer
,
Set
<
Integer
>>
_vertices
=
null
;
public
Graph
()
{
_vertices
=
new
HashMap
<
Integer
,
Set
<
Integer
>>();
}
public
void
insertDirectedEdge
(
int
sourceNode
,
int
targetNode
)
{
if
(!
_vertices
.
containsKey
(
sourceNode
))
{
_vertices
.
put
(
sourceNode
,
new
HashSet
<
Integer
>());
}
_vertices
.
get
(
sourceNode
).
add
(
targetNode
);
}
public
void
insertUndirectedEdge
(
int
sourceNode
,
int
targetNode
){
if
(!
_vertices
.
containsKey
(
sourceNode
)){
_vertices
.
put
(
sourceNode
,
new
HashSet
<
Integer
>());
}
if
(!
_vertices
.
containsKey
(
targetNode
)){
_vertices
.
put
(
targetNode
,
new
HashSet
<
Integer
>());
}
_vertices
.
get
(
sourceNode
).
add
(
targetNode
);
_vertices
.
get
(
targetNode
).
add
(
sourceNode
);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankLocal.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.pagerank
;
import
org.apache.log4j.Level
;
import
eu.stratosphere.nephele.jobgraph.JobGraph
;
import
eu.stratosphere.streaming.api.JobGraphBuilder
;
import
eu.stratosphere.streaming.faulttolerance.FaultToleranceType
;
import
eu.stratosphere.streaming.util.ClusterUtil
;
import
eu.stratosphere.streaming.util.LogUtils
;
public
class
PageRankLocal
{
public
static
JobGraph
getJobGraph
()
{
JobGraphBuilder
graphBuilder
=
new
JobGraphBuilder
(
"testGraph"
,
FaultToleranceType
.
NONE
);
graphBuilder
.
setSource
(
"Source"
,
new
PageRankSource
());
graphBuilder
.
setTask
(
"Task"
,
new
PageRankTask
(),
1
,
1
);
graphBuilder
.
setSink
(
"Sink"
,
new
PageRankSink
());
graphBuilder
.
fieldsConnect
(
"Source"
,
"Task"
,
0
);
graphBuilder
.
shuffleConnect
(
"Task"
,
"Sink"
);
return
graphBuilder
.
getJobGraph
();
}
public
static
void
main
(
String
[]
args
)
{
LogUtils
.
initializeDefaultConsoleLogger
(
Level
.
DEBUG
,
Level
.
INFO
);
ClusterUtil
.
runOnMiniCluster
(
getJobGraph
());
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSink.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.pagerank
;
import
eu.stratosphere.streaming.api.invokable.UserSinkInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
PageRankSink
extends
UserSinkInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
// TODO Auto-generated method stub
System
.
out
.
println
(
"received record..."
);
int
tupleNum
=
record
.
getNumOfTuples
();
System
.
out
.
println
(
"============================================"
);
for
(
int
i
=
0
;
i
<
tupleNum
;
++
i
)
{
System
.
out
.
println
(
"name="
+
record
.
getField
(
i
,
0
)
+
", grade="
+
record
.
getField
(
i
,
1
)
+
", salary="
+
record
.
getField
(
i
,
2
));
}
System
.
out
.
println
(
"============================================"
);
}
}
\ No newline at end of file
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankSource.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.pagerank
;
import
java.io.BufferedReader
;
import
java.io.FileReader
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.invokable.UserSourceInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
PageRankSource
extends
UserSourceInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
BufferedReader
br
=
null
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Integer
>());
@Override
public
void
invoke
()
throws
Exception
{
// TODO Auto-generated method stub
br
=
new
BufferedReader
(
new
FileReader
(
"src/test/resources/testdata/ASTopology.data"
));
while
(
true
)
{
String
line
=
br
.
readLine
();
if
(
line
==
null
)
{
break
;
}
if
(
line
!=
""
)
{
String
[]
link
=
line
.
split
(
":"
);
outRecord
.
setInteger
(
0
,
Integer
.
valueOf
(
link
[
0
]));
outRecord
.
setInteger
(
0
,
Integer
.
valueOf
(
link
[
1
]));
emit
(
outRecord
);
performanceCounter
.
count
();
}
line
=
br
.
readLine
();
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PageRankTask.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.pagerank
;
import
eu.stratosphere.api.java.tuple.Tuple1
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
PageRankTask
extends
UserTaskInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple1
<
String
>());
private
Graph
linkGraph
=
new
Graph
();
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
// TODO Auto-generated method stub
Integer
sourceNode
=
record
.
getInteger
(
0
,
0
);
Integer
targetNode
=
record
.
getInteger
(
0
,
1
);
// set the input graph.
linkGraph
.
insertDirectedEdge
(
sourceNode
,
targetNode
);
//outRecord.setString(0, line);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/Graph.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.sssp
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.Map
;
import
java.util.Set
;
public
class
Graph
{
public
Map
<
Integer
,
Set
<
Integer
>>
_vertices
=
null
;
public
Graph
()
{
_vertices
=
new
HashMap
<
Integer
,
Set
<
Integer
>>();
}
public
void
insertDirectedEdge
(
int
sourceNode
,
int
targetNode
)
{
if
(!
_vertices
.
containsKey
(
sourceNode
))
{
_vertices
.
put
(
sourceNode
,
new
HashSet
<
Integer
>());
}
_vertices
.
get
(
sourceNode
).
add
(
targetNode
);
}
public
void
insertUndirectedEdge
(
int
sourceNode
,
int
targetNode
){
if
(!
_vertices
.
containsKey
(
sourceNode
)){
_vertices
.
put
(
sourceNode
,
new
HashSet
<
Integer
>());
}
if
(!
_vertices
.
containsKey
(
targetNode
)){
_vertices
.
put
(
targetNode
,
new
HashSet
<
Integer
>());
}
_vertices
.
get
(
sourceNode
).
add
(
targetNode
);
_vertices
.
get
(
targetNode
).
add
(
sourceNode
);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPLocal.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.sssp
;
import
org.apache.log4j.Level
;
import
eu.stratosphere.nephele.jobgraph.JobGraph
;
import
eu.stratosphere.streaming.api.JobGraphBuilder
;
import
eu.stratosphere.streaming.faulttolerance.FaultToleranceType
;
import
eu.stratosphere.streaming.util.ClusterUtil
;
import
eu.stratosphere.streaming.util.LogUtils
;
public
class
SSSPLocal
{
public
static
JobGraph
getJobGraph
()
{
JobGraphBuilder
graphBuilder
=
new
JobGraphBuilder
(
"testGraph"
,
FaultToleranceType
.
NONE
);
graphBuilder
.
setSource
(
"Source"
,
new
SSSPSource
());
graphBuilder
.
setTask
(
"Task"
,
new
SSSPTask
(),
1
,
1
);
graphBuilder
.
setSink
(
"Sink"
,
new
SSSPSink
());
graphBuilder
.
fieldsConnect
(
"Source"
,
"Task"
,
0
);
graphBuilder
.
shuffleConnect
(
"Task"
,
"Sink"
);
return
graphBuilder
.
getJobGraph
();
}
public
static
void
main
(
String
[]
args
)
{
LogUtils
.
initializeDefaultConsoleLogger
(
Level
.
DEBUG
,
Level
.
INFO
);
ClusterUtil
.
runOnMiniCluster
(
getJobGraph
());
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPSink.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.sssp
;
import
eu.stratosphere.streaming.api.invokable.UserSinkInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
SSSPSink
extends
UserSinkInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
// TODO Auto-generated method stub
System
.
out
.
println
(
"received record..."
);
int
tupleNum
=
record
.
getNumOfTuples
();
System
.
out
.
println
(
"============================================"
);
for
(
int
i
=
0
;
i
<
tupleNum
;
++
i
)
{
System
.
out
.
println
(
"name="
+
record
.
getField
(
i
,
0
)
+
", grade="
+
record
.
getField
(
i
,
1
)
+
", salary="
+
record
.
getField
(
i
,
2
));
}
System
.
out
.
println
(
"============================================"
);
}
}
\ No newline at end of file
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPTask.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.iterative.sssp
;
import
eu.stratosphere.api.java.tuple.Tuple1
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
SSSPTask
extends
UserTaskInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple1
<
String
>());
private
Graph
linkGraph
=
new
Graph
();
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
// TODO Auto-generated method stub
Integer
sourceNode
=
record
.
getInteger
(
0
,
0
);
Integer
targetNode
=
record
.
getInteger
(
0
,
1
);
// set the input graph.
linkGraph
.
insertDirectedEdge
(
sourceNode
,
targetNode
);
//outRecord.setString(0, line);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalLearningSkeleton.java
浏览文件 @
28ba85b5
...
...
@@ -14,188 +14,134 @@
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.ml
;
import
java.net.InetSocketAddress
;
import
org.apache.log4j.Level
;
import
eu.stratosphere.api.java.tuple.Tuple
;
import
eu.stratosphere.api.java.functions.MapFunction
;
import
eu.stratosphere.api.java.tuple.Tuple1
;
import
eu.stratosphere.client.minicluster.NepheleMiniCluster
;
import
eu.stratosphere.client.program.Client
;
import
eu.stratosphere.configuration.Configuration
;
import
eu.stratosphere.nephele.jobgraph.JobGraph
;
import
eu.stratosphere.streaming.api.JobGraphBuilder
;
import
eu.stratosphere.streaming.api.invokable.UserSinkInvokable
;
import
eu.stratosphere.streaming.api.invokable.UserSourceInvokable
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
import
eu.stratosphere.streaming.util.LogUtils
;
import
eu.stratosphere.streaming.api.DataStream
;
import
eu.stratosphere.streaming.api.SinkFunction
;
import
eu.stratosphere.streaming.api.SourceFunction
;
import
eu.stratosphere.streaming.api.StreamExecutionEnvironment
;
import
eu.stratosphere.util.Collector
;
public
class
IncrementalLearningSkeleton
{
// Source for feeding new data for prediction
public
static
class
NewDataSource
extends
UserSourceInvokable
{
public
static
class
NewDataSource
extends
SourceFunction
<
Tuple1
<
Integer
>>
{
private
static
final
long
serialVersionUID
=
1L
;
StreamRecord
record
=
new
StreamRecord
(
new
Tuple1
<
Integer
>(
1
));
@Override
public
void
invoke
()
throws
Exception
{
public
void
invoke
(
Collector
<
Tuple1
<
Integer
>>
collector
)
throws
Exception
{
while
(
true
)
{
record
.
setTuple
(
getNewData
());
emit
(
record
);
collector
.
collect
(
getNewData
());
}
}
// Method for pulling new data for prediction
private
Tuple
getNewData
()
throws
InterruptedException
{
private
Tuple
1
<
Integer
>
getNewData
()
throws
InterruptedException
{
return
new
Tuple1
<
Integer
>(
1
);
}
}
// Source for feeding new training data for partial model building
public
static
class
TrainingDataSource
extends
UserSourceInvokable
{
public
static
class
TrainingDataSource
extends
SourceFunction
<
Tuple1
<
Integer
>>
{
private
static
final
long
serialVersionUID
=
1L
;
// Number of tuples grouped for building partial model
// TODO: batch training data
private
final
int
BATCH_SIZE
=
1000
;
StreamRecord
record
=
new
StreamRecord
(
1
,
BATCH_SIZE
);
@Override
public
void
invoke
()
throws
Exception
{
record
.
initRecords
();
public
void
invoke
(
Collector
<
Tuple1
<
Integer
>>
collector
)
throws
Exception
{
while
(
true
)
{
// Group the predefined number of records in a streamrecord then
// emit for model building
for
(
int
i
=
0
;
i
<
BATCH_SIZE
;
i
++)
{
record
.
setTuple
(
i
,
getTrainingData
());
}
emit
(
record
);
collector
.
collect
(
getTrainingData
());;
}
}
// Method for pulling new training data
private
Tuple
getTrainingData
()
throws
InterruptedException
{
private
Tuple
1
<
Integer
>
getTrainingData
()
throws
InterruptedException
{
return
new
Tuple1
<
Integer
>(
1
);
}
}
// Task for building up-to-date partial models on new training data
public
static
class
PartialModelBuilder
extends
UserTaskInvokable
{
public
static
class
PartialModelBuilder
extends
MapFunction
<
Tuple1
<
Integer
>,
Tuple1
<
Integer
>>
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
emit
(
buildPartialModel
(
record
)
);
public
Tuple1
<
Integer
>
map
(
Tuple1
<
Integer
>
inTuple
)
throws
Exception
{
return
buildPartialModel
(
inTuple
);
}
// Method for building partial model on the grouped training data
protected
StreamRecord
buildPartialModel
(
StreamRecord
record
)
{
return
new
StreamRecord
(
new
Tuple1
<
Integer
>(
1
)
);
protected
Tuple1
<
Integer
>
buildPartialModel
(
Tuple1
<
Integer
>
inTuple
)
{
return
new
Tuple1
<
Integer
>(
1
);
}
}
// Task for performing prediction using the model produced in
// batch-processing and the up-to-date partial model
public
static
class
Predictor
extends
UserTaskInvokable
{
public
static
class
Predictor
extends
MapFunction
<
Tuple1
<
Integer
>,
Tuple1
<
Integer
>>
{
private
static
final
long
serialVersionUID
=
1L
;
StreamRecord
batchModel
=
null
;
StreamRecord
partialModel
=
null
;
Tuple1
<
Integer
>
batchModel
=
null
;
Tuple1
<
Integer
>
partialModel
=
null
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
if
(
isModel
(
record
))
{
partialModel
=
record
;
public
Tuple1
<
Integer
>
map
(
Tuple1
<
Integer
>
inTuple
)
throws
Exception
{
if
(
isModel
(
inTuple
))
{
partialModel
=
inTuple
;
batchModel
=
getBatchModel
();
return
null
;
//TODO: fix
}
else
{
emit
(
predict
(
record
)
);
return
predict
(
inTuple
);
}
}
// Pulls model built with batch-job on the old training data
protected
StreamRecord
getBatchModel
()
{
return
new
StreamRecord
(
new
Tuple1
<
Integer
>(
1
)
);
protected
Tuple1
<
Integer
>
getBatchModel
()
{
return
new
Tuple1
<
Integer
>(
1
);
}
// Checks whether the record is a model or a new data
protected
boolean
isModel
(
StreamRecord
record
)
{
protected
boolean
isModel
(
Tuple1
<
Integer
>
inTuple
)
{
return
true
;
}
// Performs prediction using the two models
protected
StreamRecord
predict
(
StreamRecord
record
)
{
return
new
StreamRecord
(
new
Tuple1
<
Integer
>(
0
)
);
protected
Tuple1
<
Integer
>
predict
(
Tuple1
<
Integer
>
inTuple
)
{
return
new
Tuple1
<
Integer
>(
0
);
}
}
public
static
class
Sink
extends
UserSinkInvokable
{
public
static
class
IMLSink
extends
SinkFunction
<
Tuple1
<
Integer
>>
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
public
void
invoke
(
Tuple1
<
Integer
>
inTuple
)
{
// do nothing
}
}
private
static
JobGraph
getJobGraph
()
throws
Exception
{
JobGraphBuilder
graphBuilder
=
new
JobGraphBuilder
(
"IncrementalLearning"
);
graphBuilder
.
setSource
(
"NewData"
,
NewDataSource
.
class
,
1
,
1
);
graphBuilder
.
setSource
(
"TrainingData"
,
TrainingDataSource
.
class
,
1
,
1
);
graphBuilder
.
setTask
(
"PartialModelBuilder"
,
PartialModelBuilder
.
class
,
1
,
1
);
graphBuilder
.
setTask
(
"Predictor"
,
Predictor
.
class
,
1
,
1
);
graphBuilder
.
setSink
(
"Sink"
,
Sink
.
class
,
1
,
1
);
graphBuilder
.
shuffleConnect
(
"TrainingData"
,
"PartialModelBuilder"
);
graphBuilder
.
shuffleConnect
(
"NewData"
,
"Predictor"
);
graphBuilder
.
broadcastConnect
(
"PartialModelBuilder"
,
"Predictor"
);
graphBuilder
.
shuffleConnect
(
"Predictor"
,
"Sink"
);
return
graphBuilder
.
getJobGraph
();
}
public
static
void
main
(
String
[]
args
)
{
// set logging parameters for local run
LogUtils
.
initializeDefaultConsoleLogger
(
Level
.
INFO
,
Level
.
INFO
);
try
{
// generate JobGraph
JobGraph
jG
=
getJobGraph
();
Configuration
configuration
=
jG
.
getJobConfiguration
();
if
(
args
.
length
==
0
||
args
[
0
].
equals
(
"local"
))
{
System
.
out
.
println
(
"Running in Local mode"
);
// start local cluster and submit JobGraph
NepheleMiniCluster
exec
=
new
NepheleMiniCluster
();
exec
.
start
();
Client
client
=
new
Client
(
new
InetSocketAddress
(
"localhost"
,
6498
),
configuration
);
StreamExecutionEnvironment
env
=
new
StreamExecutionEnvironment
();
client
.
run
(
jG
,
true
);
exec
.
stop
();
}
else
if
(
args
[
0
].
equals
(
"cluster"
))
{
System
.
out
.
println
(
"Running in Cluster mode"
);
// submit JobGraph to the running cluster
Client
client
=
new
Client
(
new
InetSocketAddress
(
"dell150"
,
6123
),
configuration
);
client
.
run
(
jG
,
true
);
}
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
e
);
}
DataStream
<
Tuple1
<
Integer
>>
model
=
env
.
addSource
(
new
TrainingDataSource
())
.
map
(
new
PartialModelBuilder
())
.
broadcast
();
DataStream
<
Tuple1
<
Integer
>>
prediction
=
env
.
addSource
(
new
NewDataSource
())
.
connectWith
(
model
)
.
map
(
new
Predictor
())
.
addSink
(
new
IMLSink
());
}
}
\ No newline at end of file
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java
浏览文件 @
28ba85b5
...
...
@@ -18,73 +18,57 @@ import java.util.Random;
import
org.apache.commons.lang.ArrayUtils
;
import
org.apache.commons.math.stat.regression.OLSMultipleLinearRegression
;
import
org.apache.log4j.Level
;
import
eu.stratosphere.api.java.functions.MapFunction
;
import
eu.stratosphere.api.java.tuple.Tuple
;
import
eu.stratosphere.api.java.tuple.Tuple1
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.nephele.jobgraph.JobGraph
;
import
eu.stratosphere.streaming.api.JobGraphBuilder
;
import
eu.stratosphere.streaming.api.invokable.UserSinkInvokable
;
import
eu.stratosphere.streaming.api.invokable.UserSourceInvokable
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
import
eu.stratosphere.streaming.faulttolerance.FaultToleranceType
;
import
eu.stratosphere.streaming.util.ClusterUtil
;
import
eu.stratosphere.streaming.util.LogUtils
;
import
eu.stratosphere.streaming.api.DataStream
;
import
eu.stratosphere.streaming.api.SinkFunction
;
import
eu.stratosphere.streaming.api.SourceFunction
;
import
eu.stratosphere.streaming.api.StreamExecutionEnvironment
;
import
eu.stratosphere.util.Collector
;
public
class
IncrementalOLS
{
public
static
class
NewDataSource
extends
UserSourceInvokable
{
public
static
class
NewDataSource
extends
SourceFunction
<
Tuple2
<
Boolean
,
Double
[]>>
{
private
static
final
long
serialVersionUID
=
1L
;
StreamRecord
record
=
new
StreamRecord
(
2
,
1
);
Random
rnd
=
new
Random
();
@Override
public
void
invoke
()
throws
Exception
{
record
.
initRecords
();
public
void
invoke
(
Collector
<
Tuple2
<
Boolean
,
Double
[]>>
collector
)
throws
Exception
{
while
(
true
)
{
// pull new record from data source
record
.
setTuple
(
getNewData
());
emit
(
record
);
collector
.
collect
(
getNewData
());
}
}
private
Tuple
getNewData
()
throws
InterruptedException
{
private
Tuple
2
<
Boolean
,
Double
[]>
getNewData
()
throws
InterruptedException
{
return
new
Tuple2
<
Boolean
,
Double
[]>(
false
,
new
Double
[]
{
rnd
.
nextDouble
()
*
3
,
rnd
.
nextDouble
()
*
5
});
}
}
public
static
class
TrainingDataSource
extends
UserSourceInvokable
{
public
static
class
TrainingDataSource
extends
SourceFunction
<
Tuple2
<
Double
,
Double
[]>>
{
private
static
final
long
serialVersionUID
=
1L
;
// TODO: batch training data
private
final
int
BATCH_SIZE
=
1000
;
StreamRecord
record
=
new
StreamRecord
(
2
,
BATCH_SIZE
);
Random
rnd
=
new
Random
();
@Override
public
void
invoke
()
throws
Exception
{
record
.
initRecords
();
public
void
invoke
(
Collector
<
Tuple2
<
Double
,
Double
[]>>
collector
)
throws
Exception
{
while
(
true
)
{
for
(
int
i
=
0
;
i
<
BATCH_SIZE
;
i
++)
{
record
.
setTuple
(
i
,
getTrainingData
());
}
emit
(
record
);
collector
.
collect
(
getTrainingData
());
}
}
private
Tuple
getTrainingData
()
throws
InterruptedException
{
private
Tuple
2
<
Double
,
Double
[]>
getTrainingData
()
throws
InterruptedException
{
return
new
Tuple2
<
Double
,
Double
[]>(
rnd
.
nextDouble
()
*
10
,
new
Double
[]
{
rnd
.
nextDouble
()
*
3
,
rnd
.
nextDouble
()
*
5
});
...
...
@@ -92,25 +76,29 @@ public class IncrementalOLS {
}
}
public
static
class
PartialModelBuilder
extends
UserTaskInvokable
{
public
static
class
PartialModelBuilder
extends
MapFunction
<
Tuple2
<
Double
,
Double
[]>,
Tuple2
<
Boolean
,
Double
[]>>
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
emit
(
buildPartialModel
(
record
)
);
public
Tuple2
<
Boolean
,
Double
[]>
map
(
Tuple2
<
Double
,
Double
[]>
inTuple
)
throws
Exception
{
return
buildPartialModel
(
inTuple
);
}
protected
StreamRecord
buildPartialModel
(
StreamRecord
record
)
{
// TODO: deal with batchsize
protected
Tuple2
<
Boolean
,
Double
[]>
buildPartialModel
(
Tuple2
<
Double
,
Double
[]>
inTuple
)
{
Integer
numOfTuples
=
record
.
getNumOfTuples
();
Integer
numOfFeatures
=
((
Double
[])
record
.
getField
(
1
)).
length
;
// Integer numOfTuples = record.getNumOfTuples();
Integer
numOfTuples
=
1
;
Integer
numOfFeatures
=
((
Double
[])
inTuple
.
getField
(
1
)).
length
;
double
[][]
x
=
new
double
[
numOfTuples
][
numOfFeatures
];
double
[]
y
=
new
double
[
numOfTuples
];
for
(
int
i
=
0
;
i
<
numOfTuples
;
i
++)
{
Tuple
t
=
record
.
getTuple
(
i
);
// Tuple t = record.getTuple(i);
Tuple
t
=
inTuple
;
Double
[]
x_i
=
(
Double
[])
t
.
getField
(
1
);
y
[
i
]
=
(
Double
)
t
.
getField
(
0
);
for
(
int
j
=
0
;
j
<
numOfFeatures
;
j
++)
{
...
...
@@ -121,90 +109,69 @@ public class IncrementalOLS {
OLSMultipleLinearRegression
ols
=
new
OLSMultipleLinearRegression
();
ols
.
newSampleData
(
y
,
x
);
return
new
StreamRecord
(
new
Tuple2
<
Boolean
,
Double
[]>(
true
,
(
Double
[])
ArrayUtils
.
toObject
(
ols
.
estimateRegressionParameters
()
)));
return
new
Tuple2
<
Boolean
,
Double
[]>(
true
,
(
Double
[])
ArrayUtils
.
toObject
(
ols
.
estimateRegressionParameters
(
)));
}
}
public
static
class
Predictor
extends
UserTaskInvokable
{
// TODO: How do I know the x for which I have predicted y?
public
static
class
Predictor
extends
MapFunction
<
Tuple2
<
Boolean
,
Double
[]>,
Tuple1
<
Double
>>
{
private
static
final
long
serialVersionUID
=
1L
;
// StreamRecord batchModel = null;
Double
[]
partialModel
=
new
Double
[]
{
0.0
,
0.0
};
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
if
(
isModel
(
record
))
{
partialModel
=
(
Double
[])
record
.
getField
(
1
)
;
public
Tuple1
<
Double
>
map
(
Tuple2
<
Boolean
,
Double
[]>
inTuple
)
throws
Exception
{
if
(
isModel
(
inTuple
))
{
partialModel
=
inTuple
.
f1
;
// batchModel = getBatchModel();
return
null
;
//TODO: fix
}
else
{
emit
(
predict
(
record
)
);
return
predict
(
inTuple
);
}
}
// protected StreamRecord getBatchModel() {
// return new StreamRecord(new Tuple1<Integer>(1));
// }
protected
boolean
isModel
(
StreamRecord
record
)
{
return
record
.
getBoolean
(
0
);
protected
boolean
isModel
(
Tuple2
<
Boolean
,
Double
[]>
inTuple
)
{
return
inTuple
.
f0
;
}
protected
StreamRecord
predict
(
StreamRecord
record
)
{
Double
[]
x
=
(
Double
[])
record
.
getField
(
1
)
;
protected
Tuple1
<
Double
>
predict
(
Tuple2
<
Boolean
,
Double
[]>
inTuple
)
{
Double
[]
x
=
inTuple
.
f1
;
Double
prediction
=
0.0
;
for
(
int
i
=
0
;
i
<
x
.
length
;
i
++)
{
prediction
=
prediction
+
x
[
i
]
*
partialModel
[
i
];
}
return
new
StreamRecord
(
new
Tuple1
<
Double
>(
prediction
)
);
return
new
Tuple1
<
Double
>(
prediction
);
}
}
public
static
class
Sink
extends
UserSinkInvokable
{
public
static
class
IncOLSSink
extends
SinkFunction
<
Tuple1
<
Double
>>
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
public
void
invoke
(
Tuple1
<
Double
>
inTuple
)
{
System
.
out
.
println
(
inTuple
);
}
}
private
static
JobGraph
getJobGraph
()
{
JobGraphBuilder
graphBuilder
=
new
JobGraphBuilder
(
"IncrementalOLS"
,
FaultToleranceType
.
NONE
);
graphBuilder
.
setSource
(
"NewData"
,
new
NewDataSource
(),
1
,
1
);
graphBuilder
.
setSource
(
"TrainingData"
,
new
TrainingDataSource
(),
1
,
1
);
graphBuilder
.
setTask
(
"PartialModelBuilder"
,
new
PartialModelBuilder
(),
1
,
1
);
graphBuilder
.
setTask
(
"Predictor"
,
new
Predictor
(),
1
,
1
);
graphBuilder
.
setSink
(
"Sink"
,
new
Sink
(),
1
,
1
);
graphBuilder
.
shuffleConnect
(
"TrainingData"
,
"PartialModelBuilder"
);
graphBuilder
.
shuffleConnect
(
"NewData"
,
"Predictor"
);
graphBuilder
.
broadcastConnect
(
"PartialModelBuilder"
,
"Predictor"
);
graphBuilder
.
shuffleConnect
(
"Predictor"
,
"Sink"
);
return
graphBuilder
.
getJobGraph
();
}
public
static
void
main
(
String
[]
args
)
{
// set logging parameters for local run
LogUtils
.
initializeDefaultConsoleLogger
(
Level
.
DEBUG
,
Level
.
INFO
);
StreamExecutionEnvironment
env
=
new
StreamExecutionEnvironment
();
if
(
args
.
length
==
0
)
{
args
=
new
String
[]
{
"local"
};
}
if
(
args
[
0
].
equals
(
"local"
))
{
ClusterUtil
.
runOnMiniCluster
(
getJobGraph
());
}
else
if
(
args
[
0
].
equals
(
"cluster"
))
{
ClusterUtil
.
runOnLocalCluster
(
getJobGraph
(),
"hadoop02.ilab.sztaki.hu"
,
6123
);
}
DataStream
<
Tuple2
<
Boolean
,
Double
[]>>
model
=
env
.
addSource
(
new
TrainingDataSource
())
.
map
(
new
PartialModelBuilder
())
.
broadcast
();
DataStream
<
Tuple1
<
Double
>>
prediction
=
env
.
addSource
(
new
NewDataSource
())
.
connectWith
(
model
)
.
map
(
new
Predictor
())
.
addSink
(
new
IncOLSSink
());
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java
浏览文件 @
28ba85b5
...
...
@@ -15,87 +15,90 @@
package
eu.stratosphere.streaming.examples.window.sum
;
import
java.util.ArrayList
;
import
eu.stratosphere.api.java.functions.FlatMapFunction
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
import
eu.stratosphere.streaming.state.MutableTableState
;
import
eu.stratosphere.streaming.state.SlidingWindowState
;
import
eu.stratosphere.util.Collector
;
public
class
WindowSumAggregate
extends
FlatMapFunction
<
Tuple2
<
Integer
,
Long
>,
Tuple2
<
Integer
,
Long
>>
{
public
class
WindowSumAggregate
extends
UserTaskInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
int
windowSize
=
100
;
private
int
slidingStep
=
20
;
private
int
computeGranularity
=
10
;
private
int
windowFieldId
=
1
;
private
ArrayList
<
Tuple2
<
Integer
,
Long
>>
tempTupleArray
=
null
;
private
Tuple2
<
Integer
,
Long
>
outTuple
=
new
Tuple2
<
Integer
,
Long
>();
private
SlidingWindowState
window
;
private
StreamRecord
tempRecord
;
private
SlidingWindowState
<
Integer
>
window
;
private
MutableTableState
<
String
,
Integer
>
sum
;
private
long
initTimestamp
=
-
1
;
private
long
nextTimestamp
=
-
1
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Long
>());
public
WindowSumAggregate
()
{
window
=
new
SlidingWindowState
(
windowSize
,
slidingStep
,
window
=
new
SlidingWindowState
<
Integer
>
(
windowSize
,
slidingStep
,
computeGranularity
);
sum
=
new
MutableTableState
<
String
,
Integer
>();
sum
.
put
(
"sum"
,
0
);
}
private
void
incrementCompute
(
ArrayList
<
Tuple2
<
Integer
,
Long
>>
tupleArray
)
{
for
(
int
i
=
0
;
i
<
tupleArray
.
size
();
++
i
)
{
int
number
=
tupleArray
.
get
(
i
).
f0
;
private
void
incrementCompute
(
StreamRecord
record
)
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
int
number
=
record
.
getInteger
(
i
,
0
);
sum
.
put
(
"sum"
,
sum
.
get
(
"sum"
)
+
number
);
}
}
private
void
decrementCompute
(
ArrayList
<
Tuple2
<
Integer
,
Long
>>
tupleArray
)
{
for
(
int
i
=
0
;
i
<
tupleArray
.
size
();
++
i
)
{
int
number
=
tupleArray
.
get
(
i
).
f0
;
private
void
decrementCompute
(
StreamRecord
record
)
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
int
number
=
record
.
getInteger
(
i
,
0
);
sum
.
put
(
"sum"
,
sum
.
get
(
"sum"
)
-
number
);
}
}
private
void
produceOutput
(
long
progress
,
Collector
<
Tuple2
<
Integer
,
Long
>>
out
){
outTuple
.
f0
=
sum
.
get
(
"sum"
);
outTuple
.
f1
=
progress
;
out
.
collect
(
outTuple
);
}
private
void
produceRecord
(
long
progress
){
outRecord
.
setInteger
(
0
,
sum
.
get
(
"sum"
));
outRecord
.
setLong
(
1
,
progress
);
emit
(
outRecord
);
}
@Override
public
void
flatMap
(
Tuple2
<
Integer
,
Long
>
value
,
Collector
<
Tuple2
<
Integer
,
Long
>>
out
)
throws
Exception
{
// TODO Auto-generated method stub
long
progress
=
value
.
f1
;
if
(
initTimestamp
==
-
1
)
{
initTimestamp
=
progress
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempTupleArray
=
new
ArrayList
<
Tuple2
<
Integer
,
Long
>>();
}
else
{
if
(
progress
>=
nextTimestamp
)
{
if
(
window
.
isFull
())
{
ArrayList
<
Tuple2
<
Integer
,
Long
>>
expiredTupleArray
=
window
.
popFront
();
incrementCompute
(
tempTupleArray
);
decrementCompute
(
expiredTupleArray
);
window
.
pushBack
(
tempTupleArray
);
if
(
window
.
isEmittable
())
{
produceOutput
(
progress
,
out
);
}
}
else
{
incrementCompute
(
tempTupleArray
);
window
.
pushBack
(
tempTupleArray
);
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
long
progress
=
record
.
getLong
(
i
,
windowFieldId
);
if
(
initTimestamp
==
-
1
)
{
initTimestamp
=
progress
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempRecord
=
new
StreamRecord
(
record
.
getNumOfFields
());
}
else
{
if
(
progress
>=
nextTimestamp
)
{
if
(
window
.
isFull
())
{
produceOutput
(
progress
,
out
);
StreamRecord
expiredRecord
=
window
.
popFront
();
incrementCompute
(
tempRecord
);
decrementCompute
(
expiredRecord
);
window
.
pushBack
(
tempRecord
);
if
(
window
.
isEmittable
())
{
produceRecord
(
progress
);
}
}
else
{
incrementCompute
(
tempRecord
);
window
.
pushBack
(
tempRecord
);
if
(
window
.
isFull
())
{
produceRecord
(
progress
);
}
}
initTimestamp
=
nextTimestamp
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempRecord
=
new
StreamRecord
(
record
.
getNumOfFields
());
}
initTimestamp
=
nextTimestamp
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempTupleArray
=
new
ArrayList
<
Tuple2
<
Integer
,
Long
>>();
}
tempRecord
.
addTuple
(
record
.
getTuple
(
i
));
}
tempTupleArray
.
add
(
value
);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java
浏览文件 @
28ba85b5
...
...
@@ -15,21 +15,71 @@
package
eu.stratosphere.streaming.examples.window.sum
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.DataStream
;
import
eu.stratosphere.streaming.api.StreamExecutionEnvironment
;
import
java.net.InetSocketAddress
;
import
org.apache.log4j.Level
;
import
eu.stratosphere.client.minicluster.NepheleMiniCluster
;
import
eu.stratosphere.client.program.Client
;
import
eu.stratosphere.configuration.Configuration
;
import
eu.stratosphere.nephele.jobgraph.JobGraph
;
import
eu.stratosphere.streaming.api.JobGraphBuilder
;
import
eu.stratosphere.streaming.util.LogUtils
;
public
class
WindowSumLocal
{
public
static
JobGraph
getJobGraph
()
{
JobGraphBuilder
graphBuilder
=
new
JobGraphBuilder
(
"testGraph"
);
graphBuilder
.
setSource
(
"WindowSumSource"
,
WindowSumSource
.
class
);
graphBuilder
.
setTask
(
"WindowSumMultiple"
,
WindowSumMultiple
.
class
,
1
,
1
);
graphBuilder
.
setTask
(
"WindowSumAggregate"
,
WindowSumAggregate
.
class
,
1
,
1
);
graphBuilder
.
setSink
(
"WindowSumSink"
,
WindowSumSink
.
class
);
graphBuilder
.
shuffleConnect
(
"WindowSumSource"
,
"WindowSumMultiple"
);
graphBuilder
.
shuffleConnect
(
"WindowSumMultiple"
,
"WindowSumAggregate"
);
graphBuilder
.
shuffleConnect
(
"WindowSumAggregate"
,
"WindowSumSink"
);
return
graphBuilder
.
getJobGraph
();
}
public
static
void
main
(
String
[]
args
)
{
StreamExecutionEnvironment
context
=
new
StreamExecutionEnvironment
();
@SuppressWarnings
(
"unused"
)
DataStream
<
Tuple2
<
Integer
,
Long
>>
dataStream
=
context
.
addSource
(
new
WindowSumSource
())
.
map
(
new
WindowSumMultiple
())
.
flatMap
(
new
WindowSumAggregate
())
.
addSink
(
new
WindowSumSink
());
context
.
execute
();
LogUtils
.
initializeDefaultConsoleLogger
(
Level
.
DEBUG
,
Level
.
INFO
);
try
{
JobGraph
jG
=
getJobGraph
();
Configuration
configuration
=
jG
.
getJobConfiguration
();
if
(
args
.
length
==
0
)
{
args
=
new
String
[]
{
"local"
};
}
if
(
args
[
0
].
equals
(
"local"
))
{
System
.
out
.
println
(
"Running in Local mode"
);
NepheleMiniCluster
exec
=
new
NepheleMiniCluster
();
exec
.
start
();
Client
client
=
new
Client
(
new
InetSocketAddress
(
"localhost"
,
6498
),
configuration
);
client
.
run
(
jG
,
true
);
exec
.
stop
();
}
else
if
(
args
[
0
].
equals
(
"cluster"
))
{
System
.
out
.
println
(
"Running in Cluster2 mode"
);
Client
client
=
new
Client
(
new
InetSocketAddress
(
"hadoop02.ilab.sztaki.hu"
,
6123
),
configuration
);
client
.
run
(
jG
,
true
);
}
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
e
);
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java
浏览文件 @
28ba85b5
...
...
@@ -15,19 +15,21 @@
package
eu.stratosphere.streaming.examples.window.sum
;
import
eu.stratosphere.api.java.functions.MapFunction
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
WindowSumMultiple
extends
MapFunction
<
Tuple2
<
Integer
,
Long
>,
Tuple2
<
Integer
,
Long
>>
{
public
class
WindowSumMultiple
extends
UserTaskInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
Tuple2
<
Integer
,
Long
>
outTuple
=
new
Tuple2
<
Integer
,
Long
>(
);
private
StreamRecord
outputRecord
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Long
>()
);
@Override
public
Tuple2
<
Integer
,
Long
>
map
(
Tuple2
<
Integer
,
Long
>
inTuple
)
throws
Exception
{
// TODO Auto-generated method stub
outTuple
.
f0
=
inTuple
.
f0
*
2
;
outTuple
.
f1
=
inTuple
.
f1
;
return
outTuple
;
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
Integer
number
=
record
.
getInteger
(
0
);
Long
timestamp
=
record
.
getLong
(
1
);
outputRecord
.
setInteger
(
0
,
number
+
1
);
outputRecord
.
setLong
(
1
,
timestamp
);
emit
(
outputRecord
);
}
}
\ No newline at end of file
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java
浏览文件 @
28ba85b5
...
...
@@ -15,15 +15,21 @@
package
eu.stratosphere.streaming.examples.window.sum
;
import
eu.stratosphere.
api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.
SinkFunction
;
import
eu.stratosphere.
streaming.api.invokable.UserSinkInvokable
;
import
eu.stratosphere.streaming.api.
streamrecord.StreamRecord
;
public
class
WindowSumSink
extends
SinkFunction
<
Tuple2
<
Integer
,
Long
>>
{
public
class
WindowSumSink
extends
UserSinkInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
Integer
sum
=
0
;
private
long
timestamp
=
0
;
@Override
public
void
invoke
(
Tuple2
<
Integer
,
Long
>
inTuple
)
{
// TODO Auto-generated method stub
System
.
out
.
println
(
inTuple
);
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
sum
=
record
.
getInteger
(
0
);
timestamp
=
record
.
getLong
(
1
);
System
.
out
.
println
(
"============================================"
);
System
.
out
.
println
(
sum
+
" "
+
timestamp
);
System
.
out
.
println
(
"============================================"
);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java
浏览文件 @
28ba85b5
...
...
@@ -16,23 +16,23 @@
package
eu.stratosphere.streaming.examples.window.sum
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.
SourceFunction
;
import
eu.stratosphere.
util.Collector
;
import
eu.stratosphere.streaming.api.
invokable.UserSourceInvokable
;
import
eu.stratosphere.
streaming.api.streamrecord.StreamRecord
;
public
class
WindowSumSource
extends
SourceFunction
<
Tuple2
<
Integer
,
Long
>>
{
public
class
WindowSumSource
extends
UserSourceInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
Tuple2
<
Integer
,
Long
>
outRecord
=
new
Tuple2
<
Integer
,
Long
>();
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Long
>());
private
Long
timestamp
=
0L
;
@Override
public
void
invoke
(
Collector
<
Tuple2
<
Integer
,
Long
>>
collector
)
throws
Exception
{
// TODO Auto-generated method stub
public
void
invoke
()
throws
Exception
{
for
(
int
i
=
0
;
i
<
1000
;
++
i
)
{
outRecord
.
f0
=
i
;
outRecord
.
f1
=
timestamp
;
collector
.
collect
(
outRecord
);
outRecord
.
setInteger
(
0
,
i
);
outRecord
.
setLong
(
1
,
timestamp
);
timestamp
++;
}
emit
(
outRecord
);
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java
浏览文件 @
28ba85b5
...
...
@@ -15,41 +15,40 @@
package
eu.stratosphere.streaming.examples.window.wordcount
;
import
java.util.ArrayList
;
import
eu.stratosphere.api.java.functions.FlatMapFunction
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.api.java.tuple.Tuple3
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
import
eu.stratosphere.streaming.state.MutableTableState
;
import
eu.stratosphere.streaming.state.MutableTableStateIterator
;
import
eu.stratosphere.streaming.state.SlidingWindowState
;
import
eu.stratosphere.util.Collector
;
public
class
WindowWordCountCounter
extends
FlatMapFunction
<
Tuple2
<
String
,
Long
>,
Tuple3
<
String
,
Integer
,
Long
>>
{
public
class
WindowWordCountCounter
extends
UserTaskInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
int
windowSize
=
10
;
private
int
slidingStep
=
2
;
private
int
computeGranularity
=
1
;
private
int
windowFieldId
=
2
;
private
int
windowSize
=
10
;
private
int
slidingStep
=
2
;
private
int
computeGranularity
=
1
;
private
ArrayList
<
Tuple2
<
String
,
Long
>>
tempTupleArray
=
null
;
private
Tuple3
<
String
,
Integer
,
Long
>
outTuple
=
new
Tuple3
<
String
,
Integer
,
Long
>();
private
SlidingWindowState
window
;
private
StreamRecord
tempRecord
;
private
SlidingWindowState
<
Integer
>
window
;
private
MutableTableState
<
String
,
Integer
>
wordCounts
;
private
long
initTimestamp
=
-
1
;
private
long
nextTimestamp
=
-
1
;
private
long
initTimestamp
=
-
1
;
private
long
nextTimestamp
=
-
1
;
private
Long
timestamp
=
0L
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
3
);
public
WindowWordCountCounter
()
{
window
=
new
SlidingWindowState
(
windowSize
,
slidingStep
,
window
=
new
SlidingWindowState
<
Integer
>
(
windowSize
,
slidingStep
,
computeGranularity
);
wordCounts
=
new
MutableTableState
<
String
,
Integer
>();
}
private
void
incrementCompute
(
ArrayList
<
Tuple2
<
String
,
Long
>>
tupleArray
)
{
for
(
int
i
=
0
;
i
<
tupleArray
.
size
();
++
i
)
{
String
word
=
tupleArray
.
get
(
i
).
f0
;
private
void
incrementCompute
(
StreamRecord
record
)
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
String
word
=
record
.
getString
(
i
,
0
);
if
(
wordCounts
.
containsKey
(
word
))
{
int
count
=
wordCounts
.
get
(
word
)
+
1
;
wordCounts
.
put
(
word
,
count
);
...
...
@@ -59,9 +58,10 @@ public class WindowWordCountCounter extends
}
}
private
void
decrementCompute
(
ArrayList
<
Tuple2
<
String
,
Long
>>
tupleArray
)
{
for
(
int
i
=
0
;
i
<
tupleArray
.
size
();
++
i
)
{
String
word
=
tupleArray
.
get
(
i
).
f0
;
private
void
decrementCompute
(
StreamRecord
record
)
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
String
word
=
record
.
getString
(
i
,
0
);
int
count
=
wordCounts
.
get
(
word
)
-
1
;
if
(
count
==
0
)
{
wordCounts
.
delete
(
word
);
...
...
@@ -71,48 +71,51 @@ public class WindowWordCountCounter extends
}
}
private
void
produceOutput
(
long
progress
,
Collector
<
Tuple3
<
String
,
Integer
,
Long
>>
out
)
{
MutableTableStateIterator
<
String
,
Integer
>
iterator
=
wordCounts
.
getIterator
();
private
void
produceRecord
(
long
progress
){
outRecord
.
Clear
();
MutableTableStateIterator
<
String
,
Integer
>
iterator
=
wordCounts
.
getIterator
();
while
(
iterator
.
hasNext
())
{
Tuple2
<
String
,
Integer
>
tuple
=
iterator
.
next
();
outTuple
.
f0
=
tuple
.
f0
;
outTuple
.
f1
=
tuple
.
f1
;
outTuple
.
f2
=
timestamp
;
out
.
collect
(
outTuple
);
Tuple3
<
String
,
Integer
,
Long
>
outputTuple
=
new
Tuple3
<
String
,
Integer
,
Long
>(
(
String
)
tuple
.
getField
(
0
),
(
Integer
)
tuple
.
getField
(
1
),
timestamp
);
outRecord
.
addTuple
(
outputTuple
);
}
emit
(
outRecord
);
}
@Override
public
void
flatMap
(
Tuple2
<
String
,
Long
>
value
,
Collector
<
Tuple3
<
String
,
Integer
,
Long
>>
out
)
throws
Exception
{
// TODO Auto-generated method stub
timestamp
=
value
.
f1
;
if
(
initTimestamp
==
-
1
)
{
initTimestamp
=
timestamp
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempTupleArray
=
new
ArrayList
<
Tuple2
<
String
,
Long
>>();
}
else
{
if
(
timestamp
>=
nextTimestamp
)
{
if
(
window
.
isFull
())
{
ArrayList
<
Tuple2
<
String
,
Long
>>
expiredTupleArray
=
window
.
popFront
();
incrementCompute
(
tempTupleArray
);
decrementCompute
(
expiredTupleArray
);
window
.
pushBack
(
tempTupleArray
);
if
(
window
.
isEmittable
())
{
produceOutput
(
timestamp
,
out
);
}
}
else
{
incrementCompute
(
tempTupleArray
);
window
.
pushBack
(
tempTupleArray
);
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
long
progress
=
record
.
getLong
(
i
,
windowFieldId
);
if
(
initTimestamp
==
-
1
)
{
initTimestamp
=
progress
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempRecord
=
new
StreamRecord
(
record
.
getNumOfFields
());
}
else
{
if
(
progress
>=
nextTimestamp
)
{
if
(
window
.
isFull
())
{
produceOutput
(
timestamp
,
out
);
StreamRecord
expiredRecord
=
window
.
popFront
();
incrementCompute
(
tempRecord
);
decrementCompute
(
expiredRecord
);
window
.
pushBack
(
tempRecord
);
if
(
window
.
isEmittable
())
{
produceRecord
(
progress
);
}
}
else
{
incrementCompute
(
tempRecord
);
window
.
pushBack
(
tempRecord
);
if
(
window
.
isFull
())
{
produceRecord
(
progress
);
}
}
initTimestamp
=
nextTimestamp
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempRecord
=
new
StreamRecord
(
record
.
getNumOfFields
());
}
initTimestamp
=
nextTimestamp
;
nextTimestamp
=
initTimestamp
+
computeGranularity
;
tempTupleArray
=
new
ArrayList
<
Tuple2
<
String
,
Long
>>();
}
tempRecord
.
addTuple
(
record
.
getTuple
(
i
));
}
tempTupleArray
.
add
(
value
);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java
浏览文件 @
28ba85b5
...
...
@@ -15,22 +15,72 @@
package
eu.stratosphere.streaming.examples.window.wordcount
;
import
eu.stratosphere.api.java.tuple.Tuple3
;
import
eu.stratosphere.streaming.api.DataStream
;
import
eu.stratosphere.streaming.api.StreamExecutionEnvironment
;
import
java.net.InetSocketAddress
;
import
org.apache.log4j.Level
;
import
eu.stratosphere.client.minicluster.NepheleMiniCluster
;
import
eu.stratosphere.client.program.Client
;
import
eu.stratosphere.configuration.Configuration
;
import
eu.stratosphere.nephele.jobgraph.JobGraph
;
import
eu.stratosphere.streaming.api.JobGraphBuilder
;
import
eu.stratosphere.streaming.faulttolerance.FaultToleranceType
;
import
eu.stratosphere.streaming.util.LogUtils
;
public
class
WindowWordCountLocal
{
public
static
JobGraph
getJobGraph
()
{
JobGraphBuilder
graphBuilder
=
new
JobGraphBuilder
(
"testGraph"
,
FaultToleranceType
.
NONE
);
graphBuilder
.
setSource
(
"WindowWordCountSource"
,
WindowWordCountSource
.
class
);
graphBuilder
.
setTask
(
"WindowWordCountSplitter"
,
WindowWordCountSplitter
.
class
,
1
,
1
);
graphBuilder
.
setTask
(
"WindowWordCountCounter"
,
WindowWordCountCounter
.
class
,
1
,
1
);
graphBuilder
.
setSink
(
"WindowWordCountSink"
,
WindowWordCountSink
.
class
);
graphBuilder
.
shuffleConnect
(
"WindowWordCountSource"
,
"WindowWordCountSplitter"
);
graphBuilder
.
fieldsConnect
(
"WindowWordCountSplitter"
,
"WindowWordCountCounter"
,
0
);
graphBuilder
.
shuffleConnect
(
"WindowWordCountCounter"
,
"WindowWordCountSink"
);
return
graphBuilder
.
getJobGraph
();
}
public
static
void
main
(
String
[]
args
)
{
StreamExecutionEnvironment
context
=
new
StreamExecutionEnvironment
();
@SuppressWarnings
(
"unused"
)
DataStream
<
Tuple3
<
String
,
Integer
,
Long
>>
dataStream
=
context
.
addSource
(
new
WindowWordCountSource
())
.
flatMap
(
new
WindowWordCountSplitter
())
.
partitionBy
(
0
)
.
flatMap
(
new
WindowWordCountCounter
())
.
addSink
(
new
WindowWordCountSink
());
context
.
execute
();
LogUtils
.
initializeDefaultConsoleLogger
(
Level
.
DEBUG
,
Level
.
INFO
);
try
{
JobGraph
jG
=
getJobGraph
();
Configuration
configuration
=
jG
.
getJobConfiguration
();
if
(
args
.
length
==
0
)
{
args
=
new
String
[]
{
"local"
};
}
if
(
args
[
0
].
equals
(
"local"
))
{
System
.
out
.
println
(
"Running in Local mode"
);
NepheleMiniCluster
exec
=
new
NepheleMiniCluster
();
exec
.
start
();
Client
client
=
new
Client
(
new
InetSocketAddress
(
"localhost"
,
6498
),
configuration
);
client
.
run
(
jG
,
true
);
exec
.
stop
();
}
else
if
(
args
[
0
].
equals
(
"cluster"
))
{
System
.
out
.
println
(
"Running in Cluster2 mode"
);
Client
client
=
new
Client
(
new
InetSocketAddress
(
"hadoop02.ilab.sztaki.hu"
,
6123
),
configuration
);
client
.
run
(
jG
,
true
);
}
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
e
);
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java
浏览文件 @
28ba85b5
...
...
@@ -15,15 +15,26 @@
package
eu.stratosphere.streaming.examples.window.wordcount
;
import
eu.stratosphere.
api.java.tuple.Tuple3
;
import
eu.stratosphere.streaming.api.
SinkFunction
;
import
eu.stratosphere.
streaming.api.invokable.UserSinkInvokable
;
import
eu.stratosphere.streaming.api.
streamrecord.StreamRecord
;
public
class
WindowWordCountSink
extends
SinkFunction
<
Tuple3
<
String
,
Integer
,
Long
>>
{
public
class
WindowWordCountSink
extends
UserSinkInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
String
word
=
""
;
private
Integer
count
=
0
;
private
Long
timestamp
=
0L
;
@Override
public
void
invoke
(
Tuple3
<
String
,
Integer
,
Long
>
inTuple
)
{
// TODO Auto-generated method stubs
System
.
out
.
println
(
inTuple
);
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
word
=
record
.
getString
(
i
,
0
);
count
=
record
.
getInteger
(
i
,
1
);
timestamp
=
record
.
getLong
(
i
,
2
);
System
.
out
.
println
(
"============================================"
);
System
.
out
.
println
(
word
+
" "
+
count
+
" "
+
timestamp
);
System
.
out
.
println
(
"============================================"
);
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java
浏览文件 @
28ba85b5
...
...
@@ -16,23 +16,32 @@
package
eu.stratosphere.streaming.examples.window.wordcount
;
import
java.io.BufferedReader
;
import
java.io.FileNotFoundException
;
import
java.io.FileReader
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.
SourceFunction
;
import
eu.stratosphere.
util.Collector
;
import
eu.stratosphere.streaming.api.
invokable.UserSourceInvokable
;
import
eu.stratosphere.
streaming.api.streamrecord.StreamRecord
;
public
class
WindowWordCountSource
extends
SourceFunction
<
Tuple2
<
String
,
Long
>>
{
public
class
WindowWordCountSource
extends
UserSourceInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
BufferedReader
br
=
null
;
private
String
line
=
""
;
private
Tuple2
<
String
,
Long
>
outRecord
=
new
Tuple2
<
String
,
Long
>(
);
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple2
<
String
,
Long
>()
);
private
Long
timestamp
=
0L
;
public
WindowWordCountSource
()
{
try
{
br
=
new
BufferedReader
(
new
FileReader
(
"src/test/resources/testdata/hamlet.txt"
));
}
catch
(
FileNotFoundException
e
)
{
e
.
printStackTrace
();
}
timestamp
=
0L
;
}
@Override
public
void
invoke
(
Collector
<
Tuple2
<
String
,
Long
>>
collector
)
throws
Exception
{
// TODO Auto-generated method stub
BufferedReader
br
=
new
BufferedReader
(
new
FileReader
(
"src/test/resources/testdata/hamlet.txt"
));
public
void
invoke
()
throws
Exception
{
while
(
true
){
line
=
br
.
readLine
();
if
(
line
==
null
){
...
...
@@ -40,11 +49,11 @@ public class WindowWordCountSource extends SourceFunction<Tuple2<String, Long>>
}
if
(
line
!=
""
)
{
line
=
line
.
replaceAll
(
"[\\-\\+\\.\\^:,]"
,
""
);
outRecord
.
f0
=
line
;
outRecord
.
f1
=
timestamp
;
collector
.
collect
(
outRecord
);
outRecord
.
setString
(
0
,
line
);
outRecord
.
setLong
(
1
,
timestamp
);
timestamp
++;
emit
(
outRecord
);
}
}
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java
浏览文件 @
28ba85b5
...
...
@@ -15,26 +15,26 @@
package
eu.stratosphere.streaming.examples.window.wordcount
;
import
eu.stratosphere.api.java.tuple.Tuple
2
;
import
eu.stratosphere.
util.Collector
;
import
eu.stratosphere.
api.java.functions.FlatMapFunction
;
import
eu.stratosphere.api.java.tuple.Tuple
3
;
import
eu.stratosphere.
streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.
streaming.api.streamrecord.StreamRecord
;
public
class
WindowWordCountSplitter
extends
FlatMapFunction
<
Tuple2
<
String
,
Long
>,
Tuple2
<
String
,
Long
>>
{
public
class
WindowWordCountSplitter
extends
UserTaskInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
String
[]
words
=
new
String
[]
{};
private
Long
timestamp
=
0L
;
private
Tuple2
<
String
,
Long
>
outTuple
=
new
Tuple2
<
String
,
Long
>(
);
private
StreamRecord
outputRecord
=
new
StreamRecord
(
3
);
@Override
public
void
flatMap
(
Tuple2
<
String
,
Long
>
inTuple
,
Collector
<
Tuple2
<
String
,
Long
>>
out
)
throws
Exception
{
words
=
inTuple
.
f0
.
split
(
" "
);
timestamp
=
inTuple
.
f1
;
for
(
String
word
:
words
){
outTuple
.
f0
=
word
;
outTuple
.
f1
=
timestamp
;
out
.
collect
(
outTuple
);
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
outputRecord
.
Clear
();
words
=
record
.
getString
(
0
).
split
(
" "
);
timestamp
=
record
.
getLong
(
1
);
for
(
String
word
:
words
)
{
Tuple3
<
String
,
Integer
,
Long
>
tuple
=
new
Tuple3
<
String
,
Integer
,
Long
>(
word
,
1
,
timestamp
);
outputRecord
.
addTuple
(
tuple
);
}
emit
(
outputRecord
);
}
}
\ No newline at end of file
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java
浏览文件 @
28ba85b5
...
...
@@ -22,16 +22,16 @@ import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public
class
WordCountLocal
{
public
static
void
main
(
String
[]
args
)
{
StreamExecutionEnvironment
context
=
new
StreamExecutionEnvironment
();
StreamExecutionEnvironment
env
=
new
StreamExecutionEnvironment
();
@SuppressWarnings
(
"unused"
)
DataStream
<
Tuple2
<
String
,
Integer
>>
dataStream
=
context
DataStream
<
Tuple2
<
String
,
Integer
>>
dataStream
=
env
.
readTextFile
(
"src/test/resources/testdata/hamlet.txt"
)
.
flatMap
(
new
WordCountSplitter
())
.
partitionBy
(
0
)
.
map
(
new
WordCountCounter
())
.
addSink
(
new
WordCountSink
());
context
.
execute
();
env
.
execute
();
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java
浏览文件 @
28ba85b5
...
...
@@ -22,7 +22,6 @@ import eu.stratosphere.util.Collector;
public
class
WordCountSplitter
extends
FlatMapFunction
<
Tuple1
<
String
>,
Tuple1
<
String
>>
{
private
static
final
long
serialVersionUID
=
1L
;
private
String
[]
words
=
new
String
[]
{};
private
Tuple1
<
String
>
outTuple
=
new
Tuple1
<
String
>();
//TODO move the performance tracked version to a separate package and clean this
...
...
@@ -36,8 +35,7 @@ public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<St
@Override
public
void
flatMap
(
Tuple1
<
String
>
inTuple
,
Collector
<
Tuple1
<
String
>>
out
)
throws
Exception
{
words
=
inTuple
.
f0
.
split
(
" "
);
for
(
String
word
:
words
)
{
for
(
String
word
:
inTuple
.
f0
.
split
(
" "
))
{
outTuple
.
f0
=
word
;
// pTimer.startTimer();
out
.
collect
(
outTuple
);
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaProducer.java
已删除
100644 → 0
浏览文件 @
839d46ce
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.kafka
;
import
java.io.BufferedReader
;
import
java.io.FileReader
;
import
java.util.Properties
;
import
kafka.producer.KeyedMessage
;
import
kafka.producer.ProducerConfig
;
public
class
KafkaProducer
{
static
kafka
.
javaapi
.
producer
.
Producer
<
Integer
,
String
>
producer
;
static
Properties
props
=
new
Properties
();
public
static
void
ProducerPrepare
(
String
brokerAddr
)
{
props
.
put
(
"serializer.class"
,
"kafka.serializer.StringEncoder"
);
props
.
put
(
"metadata.broker.list"
,
brokerAddr
);
producer
=
new
kafka
.
javaapi
.
producer
.
Producer
<
Integer
,
String
>(
new
ProducerConfig
(
props
));
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
if
(
args
.
length
==
1
)
{
String
infilename
=
args
[
0
];
String
topicId
=
args
[
1
];
String
brokerAddr
=
args
[
2
];
ProducerPrepare
(
brokerAddr
);
BufferedReader
reader
=
new
BufferedReader
(
new
FileReader
(
infilename
));
while
(
true
)
{
String
line
=
reader
.
readLine
();
if
(
line
==
null
){
reader
.
close
();
reader
=
new
BufferedReader
(
new
FileReader
(
infilename
));
continue
;
}
producer
.
send
(
new
KeyedMessage
<
Integer
,
String
>(
topicId
,
line
));
}
}
else
{
System
.
out
.
println
(
"please set filename!"
);
System
.
exit
(-
1
);
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaSource.java
浏览文件 @
28ba85b5
...
...
@@ -29,8 +29,9 @@ import eu.stratosphere.streaming.api.SourceFunction;
import
eu.stratosphere.util.Collector
;
/**
* Source for reading messages from a Kafka queue.
* The source currently only support string messages.
* Source for reading messages from a Kafka queue. The source currently only
* support string messages. Other types will be added soon.
*
*/
public
class
KafkaSource
extends
SourceFunction
<
Tuple1
<
String
>>
{
private
static
final
long
serialVersionUID
=
1L
;
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/
examples/iterative/sssp/SSSPSourc
e.java
→
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/
state/LogTableStat
e.java
浏览文件 @
28ba85b5
...
...
@@ -13,40 +13,80 @@
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.
examples.iterative.sssp
;
package
eu.stratosphere.streaming.
state
;
import
java.
io.BufferedReader
;
import
java.
io.FileReader
;
import
java.
util.ArrayList
;
import
java.
util.HashMap
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.invokable.UserSourceInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
import
eu.stratosphere.streaming.index.IndexPair
;
/**
* The log-structured key value store thats accept any modification operation by
* appending the value to the end of the state.
*/
public
class
LogTableState
<
K
,
V
>
implements
TableState
<
K
,
V
>
{
private
HashMap
<
K
,
IndexPair
>
hashMap
=
new
HashMap
<
K
,
IndexPair
>();
private
HashMap
<
Integer
,
ArrayList
<
V
>>
blockList
=
new
HashMap
<
Integer
,
ArrayList
<
V
>>();
private
final
int
perBlockEntryCount
=
1000
;
private
IndexPair
nextInsertPos
=
new
IndexPair
(-
1
,
-
1
);
public
LogTableState
()
{
blockList
.
put
(
0
,
new
ArrayList
<
V
>());
nextInsertPos
.
setIndexPair
(
0
,
0
);
}
public
class
SSSPSource
extends
UserSourceInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
private
BufferedReader
br
=
null
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Integer
>());
@Override
public
void
invoke
()
throws
Exception
{
public
void
put
(
K
key
,
V
value
)
{
// TODO Auto-generated method stub
br
=
new
BufferedReader
(
new
FileReader
(
"src/test/resources/testdata/ASTopology.data"
));
while
(
true
)
{
String
line
=
br
.
readLine
();
if
(
line
==
null
)
{
break
;
}
if
(
line
!=
""
)
{
String
[]
link
=
line
.
split
(
":"
);
outRecord
.
setInteger
(
0
,
Integer
.
valueOf
(
link
[
0
]));
outRecord
.
setInteger
(
0
,
Integer
.
valueOf
(
link
[
1
]));
emit
(
outRecord
);
performanceCounter
.
count
();
}
line
=
br
.
readLine
();
if
(
nextInsertPos
.
entryId
==
perBlockEntryCount
)
{
blockList
.
put
(
nextInsertPos
.
blockId
+
1
,
new
ArrayList
<
V
>());
nextInsertPos
.
IncrementBlock
();
}
blockList
.
get
(
nextInsertPos
.
blockId
).
add
(
value
);
hashMap
.
put
(
key
,
new
IndexPair
(
nextInsertPos
));
nextInsertPos
.
entryId
+=
1
;
}
@Override
public
V
get
(
K
key
)
{
// TODO Auto-generated method stub
IndexPair
index
=
hashMap
.
get
(
key
);
if
(
index
==
null
)
{
return
null
;
}
else
{
return
blockList
.
get
(
index
.
blockId
).
get
(
index
.
entryId
);
}
}
@Override
public
void
delete
(
K
key
)
{
// TODO Auto-generated method stub
hashMap
.
remove
(
key
);
}
@Override
public
boolean
containsKey
(
K
key
)
{
// TODO Auto-generated method stub
return
hashMap
.
containsKey
(
key
);
}
@Override
public
String
serialize
()
{
// TODO Auto-generated method stub
return
null
;
}
@Override
public
void
deserialize
(
String
str
)
{
// TODO Auto-generated method stub
}
@Override
public
TableStateIterator
<
K
,
V
>
getIterator
()
{
// TODO Auto-generated method stub
return
new
LogTableStateIterator
<
K
,
V
>(
hashMap
.
entrySet
().
iterator
(),
blockList
);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/
examples/iterative/kmeans/KMeansSink
.java
→
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/
state/LogTableStateIterator
.java
浏览文件 @
28ba85b5
...
...
@@ -13,20 +13,34 @@
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.
examples.iterative.kmeans
;
package
eu.stratosphere.streaming.
state
;
import
eu.stratosphere.streaming.api.invokable.UserSinkInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.Iterator
;
import
java.util.Map.Entry
;
public
class
KMeansSink
extends
UserSinkInvokable
{
private
static
final
long
serialVersionUID
=
1L
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.index.IndexPair
;
public
class
LogTableStateIterator
<
K
,
V
>
implements
TableStateIterator
<
K
,
V
>{
private
Iterator
<
Entry
<
K
,
IndexPair
>>
iterator
;
private
HashMap
<
Integer
,
ArrayList
<
V
>>
blockList
;
public
LogTableStateIterator
(
Iterator
<
Entry
<
K
,
IndexPair
>>
iter
,
HashMap
<
Integer
,
ArrayList
<
V
>>
blocks
){
iterator
=
iter
;
blockList
=
blocks
;
}
@Override
public
boolean
hasNext
()
{
// TODO Auto-generated method stub
return
false
;
}
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
public
Tuple2
<
K
,
V
>
next
()
{
// TODO Auto-generated method stub
//int tupleNum = record.getNumOfTuples();
System
.
out
.
println
(
"============================================"
);
System
.
out
.
println
(
"record="
+
record
.
getString
(
0
,
0
));
System
.
out
.
println
(
"============================================"
);
return
null
;
}
}
\ No newline at end of file
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableState.java
浏览文件 @
28ba85b5
...
...
@@ -15,14 +15,13 @@
package
eu.stratosphere.streaming.state
;
import
java.io.Serializable
;
import
java.util.LinkedHashMap
;
import
java.util.Map
;
/**
* The most general internal state that stores data in a mutable map.
*/
public
class
MutableTableState
<
K
,
V
>
implements
TableState
<
K
,
V
>
,
Serializable
{
public
class
MutableTableState
<
K
,
V
>
implements
TableState
<
K
,
V
>
{
private
Map
<
K
,
V
>
state
=
new
LinkedHashMap
<
K
,
V
>();
@Override
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/SlidingWindowState.java
浏览文件 @
28ba85b5
...
...
@@ -15,19 +15,17 @@
package
eu.stratosphere.streaming.state
;
import
java.io.Serializable
;
import
java.util.ArrayList
;
import
org.apache.commons.collections.buffer.CircularFifoBuffer
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
/**
* The window state for window operator. To be general enough, this class
* implements a count based window operator. It is possible for the user to
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public
class
SlidingWindowState
implements
Serializable
{
private
static
final
long
serialVersionUID
=
-
2376149970115888901L
;
public
class
SlidingWindowState
<
K
>
{
private
int
currentRecordCount
;
private
int
fullRecordCount
;
private
int
slideRecordCount
;
...
...
@@ -43,13 +41,13 @@ public class SlidingWindowState implements Serializable{
this
.
buffer
=
new
CircularFifoBuffer
(
fullRecordCount
);
}
public
void
pushBack
(
ArrayList
tupleArray
)
{
buffer
.
add
(
tupleArray
);
public
void
pushBack
(
StreamRecord
record
)
{
buffer
.
add
(
record
);
currentRecordCount
+=
1
;
}
public
ArrayList
popFront
()
{
ArrayList
frontRecord
=
(
ArrayList
)
buffer
.
get
();
public
StreamRecord
popFront
()
{
StreamRecord
frontRecord
=
(
StreamRecord
)
buffer
.
get
();
buffer
.
remove
();
return
frontRecord
;
}
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/TableState.java
浏览文件 @
28ba85b5
...
...
@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.state;
/**
* An internal state interface that supports stateful operator.
*/
public
interface
TableState
<
K
,
V
>{
public
interface
TableState
<
K
,
V
>
{
public
void
put
(
K
key
,
V
value
);
public
V
get
(
K
key
);
public
void
delete
(
K
key
);
...
...
flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/state/InternalStateTest.java
浏览文件 @
28ba85b5
...
...
@@ -18,6 +18,7 @@ package eu.stratosphere.streaming.state;
import
org.junit.Test
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.state.LogTableState
;
import
eu.stratosphere.streaming.state.MutableTableState
;
import
eu.stratosphere.streaming.state.TableStateIterator
;
import
eu.stratosphere.streaming.state.SlidingWindowState
;
...
...
@@ -52,6 +53,34 @@ public class InternalStateTest {
}
}
@Test
public
void
LogTableStateTest
(){
LogTableState
<
String
,
String
>
state
=
new
LogTableState
<
String
,
String
>();
state
.
put
(
"abc"
,
"hello"
);
state
.
put
(
"test"
,
"world"
);
state
.
put
(
"state"
,
"mutable"
);
state
.
put
(
"streaming"
,
"persist"
);
String
s
=
state
.
get
(
"streaming"
);
if
(
s
==
null
){
System
.
out
.
println
(
"key does not exist!"
);
}
else
{
System
.
out
.
println
(
"value="
+
s
);
}
s
=
state
.
get
(
"null"
);
if
(
s
==
null
){
System
.
out
.
println
(
"key does not exist!"
);
}
else
{
System
.
out
.
println
(
"value="
+
s
);
}
TableStateIterator
<
String
,
String
>
iterator
=
state
.
getIterator
();
while
(
iterator
.
hasNext
()){
Tuple2
<
String
,
String
>
tuple
=
iterator
.
next
();
System
.
out
.
println
(
tuple
.
getField
(
0
)+
", "
+
tuple
.
getField
(
1
));
}
}
@Test
public
void
WindowStateTest
(){
SlidingWindowState
<
String
>
state
=
new
SlidingWindowState
<
String
>(
100
,
20
,
10
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录