Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
a6d09db3
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a6d09db3
编写于
7月 14, 2014
作者:
M
Márton Balassi
提交者:
Stephan Ewen
8月 18, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[streaming] getFieldSpeedTest
上级
cc7d7ef2
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
83 addition
and
28 deletion
+83
-28
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
...stratosphere/streaming/api/streamrecord/StreamRecord.java
+30
-14
flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java
...tosphere/streaming/api/streamrecord/StreamRecordTest.java
+53
-14
未找到文件。
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java
浏览文件 @
a6d09db3
...
...
@@ -95,7 +95,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
tupleBatch
=
new
ArrayList
<
Tuple
>();
}
public
StreamRecord
(
int
numOfFields
,
int
batchSize
)
{
this
.
numOfFields
=
numOfFields
;
this
.
numOfTuples
=
0
;
...
...
@@ -208,6 +208,21 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
public
Object
getFieldFast
(
int
tupleNumber
,
int
fieldNumber
)
throws
NoSuchTupleException
,
NoSuchFieldException
{
Tuple
tuple
;
try
{
tuple
=
tupleBatch
.
get
(
tupleNumber
);
}
catch
(
IndexOutOfBoundsException
e
)
{
throw
(
new
NoSuchTupleException
());
}
try
{
return
tuple
.
getFieldFast
(
fieldNumber
);
}
catch
(
IndexOutOfBoundsException
e
)
{
throw
(
new
NoSuchFieldException
());
}
}
/**
* Get a Boolean from the given field of the first Tuple of the batch
*
...
...
@@ -244,8 +259,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*
* @param fieldNumber
* Position of the field in the tuple
* @return value of the field as Double * @throws NoSuchTupleException ,
* NoSuchFieldException
* @return value of the field as Double
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public
Double
getDouble
(
int
fieldNumber
)
throws
NoSuchTupleException
,
NoSuchFieldException
{
...
...
@@ -259,8 +275,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Position of the tuple in the batch
* @param fieldNumber
* Position of the field in the tuple
* @return value of the field as Double * @throws NoSuchTupleException ,
* NoSuchFieldException
* @return value of the field as Double
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public
Double
getDouble
(
int
tupleNumber
,
int
fieldNumber
)
throws
NoSuchTupleException
,
NoSuchFieldException
{
...
...
@@ -464,7 +481,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public
void
setInteger
(
int
fieldNumber
,
Integer
i
)
throws
NoSuchFieldException
{
setInteger
(
0
,
fieldNumber
,
i
);
setInteger
(
0
,
fieldNumber
,
i
);
}
/**
...
...
@@ -669,7 +686,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
public
StreamRecord
copySerialized
()
{
ByteArrayOutputStream
buff
=
new
ByteArrayOutputStream
();
...
...
@@ -686,7 +702,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return
newRecord
;
}
/**
* Creates a deep copy of the StreamRecord
*
...
...
@@ -694,13 +710,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*
*/
public
StreamRecord
copy
()
{
StreamRecord
newRecord
=
new
StreamRecord
(
numOfFields
,
numOfTuples
);
newRecord
.
uid
=
new
StringValue
(
uid
.
getValue
());
for
(
Tuple
tuple:
tupleBatch
)
{
newRecord
.
tupleBatch
.
add
(
StreamRecord
.
copyTuple
(
tuple
));
StreamRecord
newRecord
=
new
StreamRecord
(
numOfFields
,
numOfTuples
);
newRecord
.
uid
=
new
StringValue
(
uid
.
getValue
());
for
(
Tuple
tuple
:
tupleBatch
)
{
newRecord
.
tupleBatch
.
add
(
copyTuple
(
tuple
));
}
return
newRecord
;
}
...
...
flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java
浏览文件 @
a6d09db3
...
...
@@ -30,12 +30,14 @@ import org.junit.Test;
import
eu.stratosphere.api.java.tuple.Tuple1
;
import
eu.stratosphere.api.java.tuple.Tuple2
;
import
eu.stratosphere.api.java.tuple.Tuple4
;
public
class
StreamRecordTest
{
@Test
public
void
singleRecordSetGetTest
()
{
StreamRecord
record
=
new
StreamRecord
(
new
Tuple2
<
String
,
Integer
>(
"Stratosphere"
,
1
));
StreamRecord
record
=
new
StreamRecord
(
new
Tuple2
<
String
,
Integer
>(
"Stratosphere"
,
1
));
assertEquals
(
2
,
record
.
getNumOfFields
());
assertEquals
(
1
,
record
.
getNumOfTuples
());
...
...
@@ -46,19 +48,21 @@ public class StreamRecordTest {
record
.
setInteger
(
1
,
2
);
assertEquals
(
"Big Data"
,
record
.
getString
(
0
));
assertEquals
((
Integer
)
2
,
record
.
getInteger
(
1
));
record
.
setTuple
(
new
Tuple2
<
String
,
Long
>(
"Big Data looks tiny from here."
,
2L
));
record
.
setTuple
(
new
Tuple2
<
String
,
Long
>(
"Big Data looks tiny from here."
,
2L
));
assertEquals
(
2
,
record
.
getNumOfFields
());
assertEquals
(
1
,
record
.
getNumOfTuples
());
assertEquals
((
Long
)
2L
,
record
.
getLong
(
1
));
record
.
setTuple
(
new
Tuple2
<
String
,
Boolean
>(
"Big Data looks tiny from here."
,
true
));
record
.
setTuple
(
new
Tuple2
<
String
,
Boolean
>(
"Big Data looks tiny from here."
,
true
));
assertEquals
(
2
,
record
.
getNumOfFields
());
assertEquals
(
1
,
record
.
getNumOfTuples
());
assertEquals
(
true
,
record
.
getBoolean
(
1
));
record
.
setTuple
(
new
Tuple2
<
String
,
Double
>(
"Big Data looks tiny from here."
,
2.5
));
record
.
setTuple
(
new
Tuple2
<
String
,
Double
>(
"Big Data looks tiny from here."
,
2.5
));
assertEquals
(
2
,
record
.
getNumOfFields
());
assertEquals
(
1
,
record
.
getNumOfTuples
());
assertEquals
((
Double
)
2.5
,
record
.
getDouble
(
1
));
...
...
@@ -78,7 +82,8 @@ public class StreamRecordTest {
@Test
public
void
batchRecordSetGetTest
()
{
StreamRecord
record
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Integer
>(
1
,
2
));
StreamRecord
record
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
Integer
>(
1
,
2
));
record
.
addTuple
(
new
Tuple2
<
Integer
,
Integer
>(
2
,
2
));
try
{
record
.
addTuple
(
new
Tuple1
<
String
>(
"4"
));
...
...
@@ -100,7 +105,6 @@ public class StreamRecordTest {
@Test
public
void
copyTest
()
{
// TODO:test ID copy
StreamRecord
a
=
new
StreamRecord
(
new
Tuple1
<
String
>(
"Big"
));
StreamRecord
b
=
a
.
copy
();
assertTrue
(
a
.
getField
(
0
).
equals
(
b
.
getField
(
0
)));
...
...
@@ -109,16 +113,48 @@ public class StreamRecordTest {
b
.
setTuple
(
new
Tuple1
<
String
>(
"Data"
));
assertFalse
(
a
.
getId
().
equals
(
b
.
getId
()));
assertFalse
(
a
.
getField
(
0
).
equals
(
b
.
getField
(
0
)));
final
int
ITERATION
=
10000
;
StreamRecord
c
=
new
StreamRecord
(
new
Tuple1
<
String
>(
"Big"
));
long
t
=
System
.
nanoTime
();
c
.
copySerialized
();
System
.
out
.
println
(
"Serialized copy:\t"
+
(
System
.
nanoTime
()
-
t
));
for
(
int
i
=
0
;
i
<
ITERATION
;
i
++)
{
c
.
copySerialized
();
}
long
t2
=
System
.
nanoTime
()
-
t
;
System
.
out
.
println
(
"Serialized copy:\t"
+
t2
+
" ns"
);
t
=
System
.
nanoTime
();
for
(
int
i
=
0
;
i
<
ITERATION
;
i
++)
{
c
.
copy
();
}
t2
=
System
.
nanoTime
()
-
t
;
System
.
out
.
println
(
"Copy:\t"
+
t2
+
" ns"
);
}
@Test
public
void
getFieldSpeedTest
()
{
final
int
ITERATION
=
10000
;
StreamRecord
record
=
new
StreamRecord
(
new
Tuple4
<
Integer
,
Long
,
String
,
String
>(
0
,
42L
,
"Stratosphere"
,
"Streaming"
));
long
t
=
System
.
nanoTime
();
for
(
int
i
=
0
;
i
<
ITERATION
;
i
++)
{
record
.
getField
(
0
,
3
);
}
long
t2
=
System
.
nanoTime
()
-
t
;
System
.
out
.
println
(
"getField:\t"
+
t2
+
" ns"
);
t
=
System
.
nanoTime
();
c
.
copy
();
System
.
out
.
println
(
"New copy:\t"
+
(
System
.
nanoTime
()
-
t
));
for
(
int
i
=
0
;
i
<
ITERATION
;
i
++)
{
record
.
getFieldFast
(
0
,
3
);
}
t2
=
System
.
nanoTime
()
-
t
;
System
.
out
.
println
(
"getFieldFast:\t"
+
t2
+
" ns"
);
}
...
...
@@ -158,16 +194,19 @@ public class StreamRecordTest {
int
num
=
42
;
String
str
=
"above clouds"
;
StreamRecord
rec
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
String
>(
num
,
str
));
StreamRecord
rec
=
new
StreamRecord
(
new
Tuple2
<
Integer
,
String
>(
num
,
str
));
try
{
rec
.
write
(
out
);
DataInputStream
in
=
new
DataInputStream
(
new
ByteArrayInputStream
(
buff
.
toByteArray
()));
DataInputStream
in
=
new
DataInputStream
(
new
ByteArrayInputStream
(
buff
.
toByteArray
()));
StreamRecord
newRec
=
new
StreamRecord
();
newRec
.
read
(
in
);
@SuppressWarnings
(
"unchecked"
)
Tuple2
<
Integer
,
String
>
tupleOut
=
(
Tuple2
<
Integer
,
String
>)
newRec
.
getTuple
(
0
);
Tuple2
<
Integer
,
String
>
tupleOut
=
(
Tuple2
<
Integer
,
String
>)
newRec
.
getTuple
(
0
);
assertEquals
(
tupleOut
.
getField
(
0
),
42
);
}
catch
(
IOException
e
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录