Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
0c771a42
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
0c771a42
编写于
7月 14, 2014
作者:
G
gaborhermann
提交者:
Stephan Ewen
8月 18, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[streaming] Performance test scripts added
上级
266a7e36
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
74 addition
and
33 deletion
+74
-33
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java
...sphere/streaming/examples/wordcount/WordCountCounter.java
+2
-2
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java
...atosphere/streaming/examples/wordcount/WordCountSink.java
+1
-1
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java
...osphere/streaming/examples/wordcount/WordCountSource.java
+1
-1
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java
...phere/streaming/examples/wordcount/WordCountSplitter.java
+2
-2
flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py
...ming/src/test/resources/Performance/PerformanceTracker.py
+31
-27
flink-addons/flink-streaming/src/test/resources/Performance/copy-files.sh
...nk-streaming/src/test/resources/Performance/copy-files.sh
+27
-0
flink-addons/flink-streaming/src/test/resources/Performance/remove-files.sh
...-streaming/src/test/resources/Performance/remove-files.sh
+10
-0
未找到文件。
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java
浏览文件 @
0c771a42
...
...
@@ -30,8 +30,8 @@ public class WordCountCounter extends UserTaskInvokable {
private
String
word
=
""
;
private
Integer
count
=
0
;
PerformanceCounter
pCounter
=
new
PerformanceCounter
(
"CounterEmitCounter"
,
1000
,
1000
0
);
PerformanceTimer
pTimer
=
new
PerformanceTimer
(
"CounterEmitTimer"
,
1000
,
1000
0
,
true
);
PerformanceCounter
pCounter
=
new
PerformanceCounter
(
"CounterEmitCounter"
,
1000
,
1000
);
PerformanceTimer
pTimer
=
new
PerformanceTimer
(
"CounterEmitTimer"
,
1000
,
1000
,
true
);
private
StreamRecord
outRecord
=
new
StreamRecord
(
new
Tuple2
<
String
,
Integer
>());
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java
浏览文件 @
0c771a42
...
...
@@ -21,7 +21,7 @@ import eu.stratosphere.streaming.util.PerformanceCounter;
public
class
WordCountSink
extends
UserSinkInvokable
{
PerformanceCounter
perf
=
new
PerformanceCounter
(
"SinkEmitCounter"
,
1000
,
1000
0
);
PerformanceCounter
perf
=
new
PerformanceCounter
(
"SinkEmitCounter"
,
1000
,
1000
);
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java
浏览文件 @
0c771a42
...
...
@@ -37,7 +37,7 @@ public class WordCountSource extends UserSourceInvokable {
@Override
public
void
invoke
()
throws
Exception
{
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
for
(
int
i
=
0
;
i
<
2
;
i
++)
{
try
{
br
=
new
BufferedReader
(
new
FileReader
(
"/home/strato/strato-dist/resources/hamlet.txt"
));
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java
浏览文件 @
0c771a42
...
...
@@ -25,8 +25,8 @@ public class WordCountSplitter extends UserTaskInvokable {
private
String
[]
words
=
new
String
[]
{};
private
StreamRecord
outputRecord
=
new
StreamRecord
(
new
Tuple1
<
String
>());
PerformanceCounter
pCounter
=
new
PerformanceCounter
(
"SplitterEmitCounter"
,
1000
,
1000
0
);
PerformanceTimer
pTimer
=
new
PerformanceTimer
(
"SplitterEmitTimer"
,
1000
,
1000
0
,
true
);
PerformanceCounter
pCounter
=
new
PerformanceCounter
(
"SplitterEmitCounter"
,
1000
,
1000
);
PerformanceTimer
pTimer
=
new
PerformanceTimer
(
"SplitterEmitTimer"
,
1000
,
1000
,
true
);
@Override
public
void
invoke
(
StreamRecord
record
)
throws
Exception
{
...
...
flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py
浏览文件 @
0c771a42
...
...
@@ -12,21 +12,20 @@ import os
linestyles
=
[
'_'
,
'-'
,
'--'
,
':'
]
markers
=
[
'x'
,
'o'
,
'^'
,
'+'
]
def
readFiles
(
csv_dir
):
dataframes
=
{}
machine
=
[]
dataframes
=
[]
for
fname
in
os
.
listdir
(
csv_dir
):
if
'.csv'
in
fname
:
dataframes
[
fname
.
rstrip
(
'.csv'
)]
=
pd
.
read_csv
(
os
.
path
.
join
(
csv_dir
,
fname
),
index_col
=
'Time'
)
machine
.
append
(
int
(
fname
.
rstrip
(
'.csv'
)[
-
1
]))
return
dataframes
,
machine
dataframes
.
append
((
fname
.
rstrip
(
'.csv'
),
int
(
fname
.
rstrip
(
'.csv'
).
split
(
'-'
)[
-
1
])
-
1
,
pd
.
read_csv
(
os
.
path
.
join
(
csv_dir
,
fname
),
index_col
=
'Time'
)))
return
dataframes
def
plotCounter
(
csv_dir
,
smooth
=
5
):
dataframes
,
machine
=
readFiles
(
csv_dir
)
dataframes
=
readFiles
(
csv_dir
)
for
n
ame
in
dataframes
:
df
=
dataframe
s
[
name
]
for
datafr
ame
in
dataframes
:
df
=
dataframe
[
2
]
speed
=
[
0
]
values
=
list
(
df
.
ix
[:,
0
])
for
i
in
range
(
1
,
len
(
values
)):
...
...
@@ -36,39 +35,44 @@ def plotCounter(csv_dir, smooth=5):
plt
.
figure
(
figsize
=
(
12
,
8
),
dpi
=
80
)
plt
.
title
(
'Counter'
)
for
name
in
enumerate
(
dataframes
)
:
if
len
(
markers
)
>
machine
[
name
[
0
]
]:
m
=
markers
[
machine
[
name
[
0
]
]]
for
dataframe
in
dataframes
:
if
len
(
markers
)
>
dataframe
[
1
]:
m
=
markers
[
dataframe
[
1
]]
else
:
m
=
'*'
dataframes
[
name
[
1
]].
ix
[:,
0
].
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
(
dataframes
.
keys
())
dataframe
[
2
].
ix
[:,
0
].
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
([
x
[
0
]
for
x
in
dataframes
])
plt
.
figure
(
figsize
=
(
12
,
8
),
dpi
=
80
)
plt
.
title
(
'dC/dT'
)
for
name
in
enumerate
(
dataframes
)
:
if
len
(
markers
)
>
machine
[
name
[
0
]
]:
m
=
markers
[
machine
[
name
[
0
]
]]
else
:
m
=
'*'
pd
.
rolling_mean
(
dataframe
s
[
name
[
1
]
].
speed
,
smooth
).
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
(
dataframes
.
keys
()
)
for
dataframe
in
dataframes
:
if
len
(
markers
)
>
dataframe
[
1
]:
m
=
markers
[
dataframe
[
1
]]
else
:
m
=
'*'
pd
.
rolling_mean
(
dataframe
[
2
].
speed
,
smooth
).
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
(
[
x
[
0
]
for
x
in
dataframes
]
)
def
plotTimer
(
csv_dir
,
smooth
=
5
,
std
=
50
):
dataframes
,
machine
=
readFiles
(
csv_dir
)
dataframes
=
readFiles
(
csv_dir
)
plt
.
figure
(
figsize
=
(
12
,
8
),
dpi
=
80
)
plt
.
title
(
'Timer'
)
for
name
in
dataframes
:
pd
.
rolling_mean
(
dataframes
[
name
].
ix
[:,
0
],
smooth
).
plot
()
plt
.
legend
(
dataframes
.
keys
())
for
dataframe
in
dataframes
:
if
len
(
markers
)
>
dataframe
[
1
]:
m
=
markers
[
dataframe
[
1
]]
else
:
m
=
'*'
pd
.
rolling_mean
(
dataframe
[
2
].
ix
[:,
0
],
smooth
).
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
([
x
[
0
]
for
x
in
dataframes
])
plt
.
figure
(
figsize
=
(
12
,
8
),
dpi
=
80
)
plt
.
title
(
'Standard deviance'
)
for
name
in
dataframes
:
pd
.
rolling_std
(
dataframes
[
name
].
ix
[:,
0
],
std
).
plot
()
plt
.
legend
(
dataframes
.
keys
())
for
dataframe
in
dataframes
:
if
len
(
markers
)
>
dataframe
[
1
]:
m
=
markers
[
dataframe
[
1
]]
else
:
m
=
'*'
pd
.
rolling_std
(
dataframe
[
2
].
ix
[:,
0
],
std
).
plot
(
marker
=
m
,
markevery
=
10
,
markersize
=
10
)
plt
.
legend
([
x
[
0
]
for
x
in
dataframes
])
flink-addons/flink-streaming/src/test/resources/Performance/copy-files.sh
0 → 100644
浏览文件 @
0c771a42
#!/bin/bash
toDir
=
$1
if
[
-d
"
${
toDir
}
"
]
;
then
ssh strato@dell150.ilab.sztaki.hu
'
for j in {101..142} 144 145;
do
for i in $(ssh dell$j "ls stratosphere-distrib/log/counter/");
do scp strato@dell$j:stratosphere-distrib/log/counter/$i stratosphere-distrib/log/all_tests/counter/$i-$j.csv;
done
for i in $(ssh dell$j "ls stratosphere-distrib/log/timer/");
do scp strato@dell$j:stratosphere-distrib/log/timer/$i stratosphere-distrib/log/all_tests/timer/$i-$j.csv;
done
for i in $(ls stratosphere-distrib/log/counter/);
do cp stratosphere-distrib/log/counter/$i stratosphere-distrib/log/all_tests/counter/$i-150.csv;
done
for i in $(ls stratosphere-distrib/log/timer/);
do cp stratosphere-distrib/log/timer/$i stratosphere-distrib/log/all_tests/timer/$i-150.csv;
done
done
'
scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/counter/
*
$toDir
/counter/
scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/timer/
*
$toDir
/timer/
else
echo
"USAGE:"
echo
"run <directory>"
fi
\ No newline at end of file
flink-addons/flink-streaming/src/test/resources/Performance/remove-files.sh
0 → 100644
浏览文件 @
0c771a42
#!/bin/bash
ssh strato@dell150.ilab.sztaki.hu
'
for j in {101..142} 144 145;
do
$(ssh dell$j "rm stratosphere-distrib/log/counter/*");
$(ssh dell$j "rm stratosphere-distrib/log/timer/*");
rm stratosphere-distrib/log/counter/*
rm stratosphere-distrib/log/timer/*
done
'
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录