Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
d6c56edd
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d6c56edd
编写于
7月 14, 2014
作者:
G
gyfora
提交者:
Stephan Ewen
8月 18, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[streaming] Incremental OLS fix
上级
7b780ca6
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
150 addition
and
461 deletion
+150
-461
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
...stratosphere/streaming/api/streamrecord/StreamRecord.java
+63
-66
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java
...eu/stratosphere/streaming/examples/ml/IncrementalOLS.java
+2
-2
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java
...ere/streaming/examples/window/sum/WindowSumAggregate.java
+0
-88
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java
...osphere/streaming/examples/window/sum/WindowSumLocal.java
+0
-86
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java
...here/streaming/examples/window/sum/WindowSumMultiple.java
+0
-39
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java
...tosphere/streaming/examples/window/sum/WindowSumSink.java
+0
-34
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java
...sphere/streaming/examples/window/sum/WindowSumSource.java
+0
-37
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java
...ing/examples/window/wordcount/WindowWordCountCounter.java
+1
-1
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java
...ng/examples/window/wordcount/WindowWordCountSplitter.java
+1
-1
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java
...atosphere/streaming/examples/wordcount/WordCountSink.java
+2
-13
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java
...ain/java/eu/stratosphere/streaming/state/WindowState.java
+0
-8
flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py
...ming/src/test/resources/Performance/PerformanceTracker.py
+81
-86
未找到文件。
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
浏览文件 @
d6c56edd
...
...
@@ -86,16 +86,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public
StreamRecord
()
{
}
public
StreamRecord
(
StreamRecord
record
)
{
this
.
numOfFields
=
record
.
getNumOfFields
();
this
.
numOfTuples
=
0
;
tupleBatch
=
new
ArrayList
<
Tuple
>();
this
.
uid
=
new
UID
(
Arrays
.
copyOf
(
record
.
getId
().
getId
(),
20
));
for
(
int
i
=
0
;
i
<
record
.
getNumOfTuples
();
++
i
)
{
this
.
tupleBatch
.
add
(
copyTuple
(
record
.
getTuple
(
i
)));
}
}
/**
* Creates empty StreamRecord with number of fields set
*
...
...
@@ -105,8 +95,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public
StreamRecord
(
int
numOfFields
)
{
this
.
numOfFields
=
numOfFields
;
this
.
numOfTuples
=
0
;
this
.
batchSize
=
1
;
tupleBatch
=
new
ArrayList
<
Tuple
>(
batchSize
);
tupleBatch
=
new
ArrayList
<
Tuple
>();
}
/**
...
...
@@ -124,6 +113,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
tupleBatch
=
new
ArrayList
<
Tuple
>(
batchSize
);
}
public
StreamRecord
(
StreamRecord
record
)
{
this
.
numOfFields
=
record
.
getNumOfFields
();
this
.
numOfTuples
=
0
;
tupleBatch
=
new
ArrayList
<
Tuple
>();
this
.
uid
=
new
UID
(
Arrays
.
copyOf
(
record
.
getId
().
getId
(),
20
));
for
(
int
i
=
0
;
i
<
record
.
getNumOfTuples
();
++
i
)
{
this
.
tupleBatch
.
add
(
copyTuple
(
record
.
getTuple
(
i
)));
}
}
/**
* Creates a new batch of records containing only the given Tuple as element
* and sets desired batch size.
...
...
@@ -139,7 +138,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this
.
batchSize
=
batchSize
;
tupleBatch
=
new
ArrayList
<
Tuple
>(
batchSize
);
tupleBatch
.
add
(
tuple
);
System
.
out
.
println
(
"here, the tuple batch size is"
+
tupleBatch
.
size
());
}
/**
...
...
@@ -149,6 +147,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param tupleList
* Tuples to bes stored in the StreamRecord
*/
public
StreamRecord
(
List
<
Tuple
>
tupleList
)
{
numOfFields
=
tupleList
.
get
(
0
).
getArity
();
numOfTuples
=
tupleList
.
size
();
...
...
@@ -167,59 +166,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this
(
tuple
,
1
);
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public
void
addTuple
(
Tuple
tuple
)
throws
TupleSizeMismatchException
{
addTuple
(
numOfTuples
,
tuple
);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public
void
addTuple
(
int
index
,
Tuple
tuple
)
throws
TupleSizeMismatchException
{
if
(
tuple
.
getArity
()
==
numOfFields
)
{
tupleBatch
.
add
(
index
,
tuple
);
numOfTuples
++;
}
else
{
throw
new
TupleSizeMismatchException
();
}
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public
Tuple
removeTuple
(
int
index
)
throws
TupleSizeMismatchException
{
if
(
index
<
numOfTuples
)
{
numOfTuples
--;
return
tupleBatch
.
remove
(
index
);
}
else
{
throw
new
TupleSizeMismatchException
();
}
}
public
boolean
isEmpty
()
{
return
(
this
.
numOfTuples
==
0
);
}
...
...
@@ -995,6 +941,57 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public
void
addTuple
(
Tuple
tuple
)
throws
TupleSizeMismatchException
{
addTuple
(
numOfTuples
,
tuple
);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public
void
addTuple
(
int
index
,
Tuple
tuple
)
throws
TupleSizeMismatchException
{
if
(
tuple
.
getArity
()
==
numOfFields
)
{
tupleBatch
.
add
(
index
,
tuple
);
numOfTuples
++;
}
else
{
throw
new
TupleSizeMismatchException
();
}
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public
Tuple
removeTuple
(
int
index
)
throws
TupleSizeMismatchException
{
if
(
index
<
numOfTuples
)
{
numOfTuples
--;
return
tupleBatch
.
remove
(
index
);
}
else
{
throw
new
TupleSizeMismatchException
();
}
}
/**
* Creates a copy of the StreamRecord object by Serializing and
* deserializing it
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/ml/IncrementalOLS.java
浏览文件 @
d6c56edd
...
...
@@ -109,8 +109,8 @@ public class IncrementalOLS {
for
(
int
i
=
0
;
i
<
numOfTuples
;
i
++)
{
Tuple
t
=
record
.
getTuple
(
i
);
Double
[]
x_i
=
t
.
getField
(
1
);
y
[
i
]
=
t
.
getField
(
0
);
Double
[]
x_i
=
(
Double
[])
t
.
getField
(
1
);
y
[
i
]
=
(
Double
)
t
.
getField
(
0
);
for
(
int
j
=
0
;
j
<
numOfFeatures
;
j
++)
{
x
[
i
][
j
]
=
x_i
[
j
];
}
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java
已删除
100644 → 0
浏览文件 @
7b780ca6
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.window.sum
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
import
eu.stratosphere.streaming.state.MutableTableState
;
import
eu.stratosphere.streaming.state.WindowState
;
public
class
WindowSumAggregate
extends
UserTaskInvokable
{
private
int
windowSize
;
private
int
slidingStep
;
private
int
computeGranularity
;
private
int
windowFieldId
;
private
WindowState
<
Integer
>
window
;
private
MutableTableState
<
String
,
Integer
>
sum
;
private
Integer
number
=
0
;
private
Integer
timestamp
=
0
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Integer
>());
public
WindowSumAggregate
()
{
windowSize
=
100
;
slidingStep
=
20
;
computeGranularity
=
10
;
windowFieldId
=
1
;
window
=
new
WindowState
<
Integer
>(
windowSize
,
slidingStep
,
computeGranularity
,
windowFieldId
);
sum
=
new
MutableTableState
<
String
,
Integer
>();
sum
.
put
(
"sum"
,
0
);
}
private
void
incrementCompute
(
StreamRecord
record
)
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
number
=
record
.
getInteger
(
i
,
0
);
sum
.
put
(
"sum"
,
sum
.
get
(
"sum"
)+
number
);
}
}
private
void
decrementCompute
(
StreamRecord
record
)
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
number
=
record
.
getInteger
(
i
,
0
);
sum
.
put
(
"sum"
,
sum
.
get
(
"sum"
)-
number
);
}
}
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
if
(
window
.
isFull
())
{
StreamRecord
expiredRecord
=
window
.
popFront
();
incrementCompute
(
record
);
decrementCompute
(
expiredRecord
);
window
.
pushBack
(
record
);
if
(
window
.
isComputable
())
{
outRecord
.
setInteger
(
0
,
sum
.
get
(
"sum"
));
outRecord
.
setInteger
(
1
,
record
.
getInteger
(
1
));
emit
(
outRecord
);
}
}
else
{
incrementCompute
(
record
);
window
.
pushBack
(
record
);
if
(
window
.
isFull
())
{
outRecord
.
setInteger
(
0
,
sum
.
get
(
"sum"
));
outRecord
.
setInteger
(
1
,
record
.
getInteger
(
1
));
emit
(
outRecord
);
}
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java
已删除
100644 → 0
浏览文件 @
7b780ca6
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.window.sum
;
import
java.net.InetSocketAddress
;
import
org.apache.log4j.Level
;
import
eu.stratosphere.client.minicluster.NepheleMiniCluster
;
import
eu.stratosphere.client.program.Client
;
import
eu.stratosphere.configuration.Configuration
;
import
eu.stratosphere.nephele.jobgraph.JobGraph
;
import
eu.stratosphere.streaming.api.JobGraphBuilder
;
import
eu.stratosphere.streaming.util.LogUtils
;
//TODO: window operator remains unfinished.
public
class
WindowSumLocal
{
public
static
JobGraph
getJobGraph
()
{
JobGraphBuilder
graphBuilder
=
new
JobGraphBuilder
(
"testGraph"
);
graphBuilder
.
setSource
(
"WindowSumSource"
,
WindowSumSource
.
class
);
graphBuilder
.
setTask
(
"WindowSumMultiple"
,
WindowSumMultiple
.
class
,
1
,
1
);
graphBuilder
.
setTask
(
"WindowSumAggregate"
,
WindowSumAggregate
.
class
,
1
,
1
);
graphBuilder
.
setSink
(
"WindowSumSink"
,
WindowSumSink
.
class
);
graphBuilder
.
shuffleConnect
(
"WindowSumSource"
,
"WindowSumMultiple"
);
graphBuilder
.
shuffleConnect
(
"WindowSumMultiple"
,
"WindowSumAggregate"
);
graphBuilder
.
shuffleConnect
(
"WindowSumAggregate"
,
"WindowSumSink"
);
return
graphBuilder
.
getJobGraph
();
}
public
static
void
main
(
String
[]
args
)
{
LogUtils
.
initializeDefaultConsoleLogger
(
Level
.
DEBUG
,
Level
.
INFO
);
try
{
JobGraph
jG
=
getJobGraph
();
Configuration
configuration
=
jG
.
getJobConfiguration
();
if
(
args
.
length
==
0
)
{
args
=
new
String
[]
{
"local"
};
}
if
(
args
[
0
].
equals
(
"local"
))
{
System
.
out
.
println
(
"Running in Local mode"
);
NepheleMiniCluster
exec
=
new
NepheleMiniCluster
();
exec
.
start
();
Client
client
=
new
Client
(
new
InetSocketAddress
(
"localhost"
,
6498
),
configuration
);
client
.
run
(
jG
,
true
);
exec
.
stop
();
}
else
if
(
args
[
0
].
equals
(
"cluster"
))
{
System
.
out
.
println
(
"Running in Cluster2 mode"
);
Client
client
=
new
Client
(
new
InetSocketAddress
(
"hadoop02.ilab.sztaki.hu"
,
6123
),
configuration
);
client
.
run
(
jG
,
true
);
}
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
e
);
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java
已删除
100644 → 0
浏览文件 @
7b780ca6
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.window.sum
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.invokable.UserTaskInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
WindowSumMultiple
extends
UserTaskInvokable
{
private
StreamRecord
outputRecord
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Integer
>());
private
Integer
number
=
0
;
private
Integer
timestamp
=
0
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
number
=
record
.
getInteger
(
0
);
timestamp
=
record
.
getInteger
(
1
);
System
.
out
.
println
(
"number="
+
number
+
", timestamp="
+
timestamp
);
outputRecord
.
setInteger
(
0
,
number
);
outputRecord
.
setInteger
(
1
,
timestamp
);
emit
(
outputRecord
);
}
}
\ No newline at end of file
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java
已删除
100644 → 0
浏览文件 @
7b780ca6
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.window.sum
;
import
eu.stratosphere.streaming.api.invokable.UserSinkInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
WindowSumSink
extends
UserSinkInvokable
{
private
Integer
sum
=
0
;
private
Integer
timestamp
=
0
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
sum
=
record
.
getInteger
(
0
,
0
);
timestamp
=
record
.
getInteger
(
0
,
1
);
System
.
out
.
println
(
"============================================"
);
System
.
out
.
println
(
sum
+
" "
+
timestamp
);
System
.
out
.
println
(
"============================================"
);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java
已删除
100644 → 0
浏览文件 @
7b780ca6
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
eu.stratosphere.streaming.examples.window.sum
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.streaming.api.invokable.UserSourceInvokable
;
import
eu.stratosphere.streaming.api.streamrecord.StreamRecord
;
public
class
WindowSumSource
extends
UserSourceInvokable
{
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Integer
>());
private
Integer
timestamp
=
0
;
@Override
public
void
invoke
()
throws
Exception
{
for
(
int
i
=
0
;
i
<
1000
;
++
i
)
{
outRecord
.
setInteger
(
0
,
i
);
outRecord
.
setInteger
(
1
,
timestamp
);
timestamp
++;
emit
(
outRecord
);
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java
浏览文件 @
d6c56edd
...
...
@@ -36,6 +36,7 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private
String
word
=
""
;
private
Integer
count
=
0
;
private
Long
timestamp
=
0L
;
private
StreamRecord
outRecord
=
new
StreamRecord
(
3
);
public
WindowWordCountCounter
()
{
windowSize
=
100
;
...
...
@@ -76,7 +77,6 @@ public class WindowWordCountCounter extends UserTaskInvokable {
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
StreamRecord
outRecord
=
new
StreamRecord
(
3
);
if
(
window
.
isFull
())
{
StreamRecord
expiredRecord
=
window
.
popFront
();
incrementCompute
(
record
);
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java
浏览文件 @
d6c56edd
...
...
@@ -21,12 +21,12 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public
class
WindowWordCountSplitter
extends
UserTaskInvokable
{
private
String
[]
words
=
new
String
[]
{};
private
StreamRecord
outputRecord
=
new
StreamRecord
(
3
);
private
Long
timestamp
=
0L
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
StreamRecord
outputRecord
=
new
StreamRecord
(
3
);
words
=
record
.
getString
(
0
).
split
(
" "
);
timestamp
=
record
.
getLong
(
1
);
System
.
out
.
println
(
"sentence="
+
record
.
getString
(
0
)
+
", timestamp="
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java
浏览文件 @
d6c56edd
...
...
@@ -20,20 +20,9 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public
class
WordCountSink
extends
UserSinkInvokable
{
private
String
word
=
""
;
private
Integer
count
=
0
;
private
Long
timestamp
=
0L
;
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
int
numTuple
=
record
.
getNumOfTuples
();
for
(
int
i
=
0
;
i
<
numTuple
;
++
i
)
{
word
=
record
.
getString
(
i
,
0
);
count
=
record
.
getInteger
(
i
,
1
);
timestamp
=
record
.
getLong
(
i
,
2
);
System
.
out
.
println
(
"============================================"
);
System
.
out
.
println
(
word
+
" "
+
count
+
" "
+
timestamp
);
System
.
out
.
println
(
"============================================"
);
}
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java
浏览文件 @
d6c56edd
...
...
@@ -62,14 +62,6 @@ public class WindowState<K> {
this
.
windowIndex
=
new
HashMap
<
K
,
IndexPair
>();
this
.
buffer
=
new
CircularFifoBuffer
(
fullRecordCount
);
}
public
int
getWindowSize
(){
return
windowSize
;
}
public
int
getSlidingStep
(){
return
slidingStep
;
}
public
void
pushBack
(
StreamRecord
record
)
{
if
(
initTimestamp
==
-
1
)
{
...
...
flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py
浏览文件 @
d6c56edd
...
...
@@ -10,112 +10,107 @@ import pandas as pd
import
os
import
operator
linestyles
=
[
'_'
,
'-'
,
'--'
,
':'
]
markers
=
[
'D'
,
's'
,
'|'
,
''
,
'x'
,
'_'
,
'^'
,
' '
,
'd'
,
'h'
,
'+'
,
'*'
,
','
,
'o'
,
'.'
,
'1'
,
'p'
,
'H'
,
'v'
,
'>'
];
colors
=
[
'b'
,
'g'
,
'r'
,
'c'
,
'm'
,
'y'
,
'k'
]
def
readFiles
(
csv_dir
):
dataframes
=
[]
counters
=
[]
for
fname
in
os
.
listdir
(
csv_dir
):
if
'.csv'
in
fname
:
dataframe
s
.
append
((
fname
.
rstrip
(
'.csv'
),
int
(
fname
.
rstrip
(
'.csv'
).
split
(
'-'
)[
-
1
])
-
1
,
pd
.
read_csv
(
os
.
path
.
join
(
csv_dir
,
fname
),
index_col
=
'Time'
)))
return
dataframe
s
counter
s
.
append
((
fname
.
rstrip
(
'.csv'
),
int
(
fname
.
rstrip
(
'.csv'
).
split
(
'-'
)[
-
1
])
-
1
,
pd
.
read_csv
(
os
.
path
.
join
(
csv_dir
,
fname
),
index_col
=
'Time'
)))
return
counter
s
def
plotCounter
(
csv_dir
,
name
=
''
,
smooth
=
5
):
dataframe
s
=
readFiles
(
csv_dir
)
def
plotCounter
(
csv_dir
,
sname
=
''
,
smooth
=
5
,
savePath
=
''
):
counter
s
=
readFiles
(
csv_dir
)
addSpeed
(
counters
)
for
dataframe
in
dataframes
:
df
=
dataframe
[
2
]
speed
=
[
0
]
values
=
list
(
df
.
ix
[:,
0
])
for
i
in
range
(
1
,
len
(
values
)):
speed
.
append
(
float
(
values
[
i
]
-
values
[
i
-
1
])
/
float
(
df
.
index
[
i
]
-
df
.
index
[
i
-
1
]
+
0.01
))
df
[
'speed'
]
=
speed
plt
.
figure
(
figsize
=
(
12
,
8
),
dpi
=
80
)
plt
.
title
(
'Counter'
)
if
name
==
''
:
for
dataframe
in
dataframes
:
m
=
markers
[
dataframe
[
1
]
%
len
(
markers
)]
dataframe
[
2
].
ix
[:,
0
].
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
([
x
[
0
]
for
x
in
dataframes
])
selectedCounters
=
[]
for
(
name
,
number
,
df
)
in
counters
:
if
sname
in
name
:
selectedCounters
.
append
((
name
,
number
,
df
))
if
sname
==
''
:
sname
=
'counters'
save
=
savePath
!=
''
plotDfs
(
selectedCounters
,
smooth
,
save
,
savePath
+
'/'
+
sname
)
def
plotDfs
(
counters
,
smooth
,
save
,
saveFile
):
plt
.
figure
(
figsize
=
(
12
,
8
),
dpi
=
80
)
plt
.
title
(
'dC/dT'
)
for
dataframe
in
dataframes
:
m
=
markers
[
dataframe
[
1
]
%
len
(
markers
)]
plt
.
title
(
'Counter'
)
for
(
name
,
number
,
df
)
in
counters
:
pd
.
rolling_mean
(
dataframe
[
2
].
speed
,
smooth
).
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
([
x
[
0
]
for
x
in
dataframes
])
else
:
df2
=
[]
for
dataframe
in
dataframes
:
if
name
in
dataframe
[
0
]:
df2
.
append
(
dataframe
)
for
dataframe
in
df2
:
m
=
markers
[
dataframe
[
1
]
%
len
(
markers
)]
m
=
markers
[
number
%
len
(
markers
)]
dataframe
[
2
].
ix
[:,
0
].
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
([
x
[
0
]
for
x
in
df2
])
df
.
ix
[:,
0
].
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
([
x
[
0
]
for
x
in
counters
])
if
save
:
plt
.
savefig
(
saveFile
+
'C.png'
)
plt
.
figure
(
figsize
=
(
12
,
8
),
dpi
=
80
)
plt
.
title
(
'dC/dT'
)
for
dataframe
in
df2
:
for
(
name
,
number
,
df
)
in
counters
:
m
=
markers
[
dataframe
[
1
]
%
len
(
markers
)]
m
=
markers
[
number
%
len
(
markers
)]
pd
.
rolling_mean
(
dataframe
[
2
].
speed
,
smooth
).
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
([
x
[
0
]
for
x
in
df2
])
def
plotThroughput
(
csv_dir
,
taskname
,
smooth
=
5
):
dataframes
=
readFiles
(
csv_dir
)
for
dataframe
in
dataframes
:
df
=
dataframe
[
2
]
speed
=
[
0
]
values
=
list
(
df
.
ix
[:,
0
])
for
i
in
range
(
1
,
len
(
values
)):
speed
.
append
(
float
(
values
[
i
]
-
values
[
i
-
1
])
/
float
(
df
.
index
[
i
]
-
df
.
index
[
i
-
1
]
+
0.01
))
df
[
'speed'
]
=
speed
selected
=
{}
for
df
in
dataframes
:
if
taskname
in
df
[
0
]:
if
df
[
1
]
in
selected
:
selected
[
df
[
1
]].
append
(
df
[
2
])
else
:
selected
[
df
[
1
]]
=
[
df
[
2
]]
plt
.
figure
()
plt
.
title
(
taskname
)
for
i
in
selected
:
selected
[
i
]
=
reduce
(
operator
.
add
,
selected
[
i
])
m
=
markers
[
i
%
len
(
markers
)]
selected
[
i
].
ix
[:,
0
].
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
(
selected
.
keys
())
plt
.
figure
()
plt
.
title
(
taskname
+
" - dC/dT"
)
for
i
in
selected
:
m
=
markers
[
i
%
len
(
markers
)]
pd
.
rolling_mean
(
selected
[
i
].
speed
,
smooth
).
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
pd
.
rolling_mean
(
df
.
speed
,
smooth
).
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
([
x
[
0
]
for
x
in
counters
])
if
save
:
plt
.
savefig
(
saveFile
+
'D.png'
)
def
addSpeed
(
counters
):
for
(
tname
,
number
,
df
)
in
counters
:
speed
=
[
0
]
values
=
list
(
df
.
ix
[:,
0
])
for
i
in
range
(
1
,
len
(
values
)):
speed
.
append
(
float
(
values
[
i
]
-
values
[
i
-
1
])
/
float
(
df
.
index
[
i
]
-
df
.
index
[
i
-
1
]
+
0.01
))
df
[
'speed'
]
=
speed
return
counters
plt
.
legend
(
selected
.
keys
())
def
plotThroughput
(
csv_dir
,
tasknames
,
smooth
=
5
,
savePath
=
''
):
if
type
(
tasknames
)
!=
list
:
tasknames
=
[
tasknames
]
for
taskname
in
tasknames
:
counters
=
readFiles
(
csv_dir
)
addSpeed
(
counters
)
selected
=
{}
for
(
tname
,
number
,
df
)
in
counters
:
if
taskname
in
tname
:
if
number
in
selected
:
selected
[
number
].
append
(
df
)
else
:
selected
[
number
]
=
[
df
]
plt
.
figure
()
plt
.
title
(
taskname
)
for
i
in
selected
:
if
len
(
selected
[
i
])
>
1
:
selected
[
i
]
=
reduce
(
operator
.
add
,
selected
[
i
])
else
:
selected
[
i
]
=
selected
[
i
][
0
]
m
=
markers
[
i
%
len
(
markers
)]
selected
[
i
].
ix
[:,
0
].
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
(
selected
.
keys
())
if
savePath
!=
''
:
plt
.
savefig
(
savePath
+
'/'
+
taskname
+
'C.png'
)
plt
.
figure
()
plt
.
title
(
taskname
+
" - dC/dT"
)
for
i
in
selected
:
m
=
markers
[
i
%
len
(
markers
)]
pd
.
rolling_mean
(
selected
[
i
].
speed
,
smooth
).
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
(
selected
.
keys
())
if
savePath
!=
''
:
plt
.
savefig
(
savePath
+
'/'
+
taskname
+
'D.png'
)
def
plotTimer
(
csv_dir
,
smooth
=
5
,
std
=
50
):
dataframes
=
readFiles
(
csv_dir
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录