Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
7b6295d9
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
7b6295d9
编写于
7月 07, 2014
作者:
T
Till Rohrmann
提交者:
Stephan Ewen
7月 08, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-994] Added test cases for AbstractPagedInputView, fixed problem with hbase.
This closes #53
上级
98e659e5
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
273 addition
and
15 deletion
+273
-15
stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
...in/java/eu/stratosphere/addons/hbase/TableInputSplit.java
+4
-4
stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
...in/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
+4
-4
stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
...java/eu/stratosphere/addons/hbase/common/HBaseResult.java
+4
-4
stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
...ephele/services/memorymanager/AbstractPagedInputView.java
+3
-2
stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
...stratosphere/runtime/io/serialization/PagedViewsTest.java
+258
-1
未找到文件。
stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/TableInputSplit.java
浏览文件 @
7b6295d9
...
...
@@ -13,10 +13,10 @@
package
eu.stratosphere.addons.hbase
;
import
java.io.DataInput
;
import
java.io.DataOutput
;
import
java.io.IOException
;
import
eu.stratosphere.core.memory.DataOutputView
;
import
eu.stratosphere.core.memory.DataInputView
;
import
eu.stratosphere.core.io.LocatableInputSplit
;
/**
...
...
@@ -103,7 +103,7 @@ public class TableInputSplit extends LocatableInputSplit {
@Override
public
void
write
(
final
DataOutput
out
)
throws
IOException
{
public
void
write
(
final
DataOutput
View
out
)
throws
IOException
{
super
.
write
(
out
);
...
...
@@ -134,7 +134,7 @@ public class TableInputSplit extends LocatableInputSplit {
@Override
public
void
read
(
final
DataInput
in
)
throws
IOException
{
public
void
read
(
final
DataInput
View
in
)
throws
IOException
{
super
.
read
(
in
);
...
...
stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseKey.java
浏览文件 @
7b6295d9
...
...
@@ -13,12 +13,12 @@
package
eu.stratosphere.addons.hbase.common
;
import
java.io.DataInput
;
import
java.io.DataOutput
;
import
java.io.IOException
;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable
;
import
eu.stratosphere.core.memory.DataInputView
;
import
eu.stratosphere.core.memory.DataOutputView
;
import
eu.stratosphere.types.Key
;
/**
...
...
@@ -52,12 +52,12 @@ public class HBaseKey implements Key<HBaseKey> {
// --------------------------------------------------------------------------------------------
@Override
public
void
write
(
DataOutput
out
)
throws
IOException
{
public
void
write
(
DataOutput
View
out
)
throws
IOException
{
this
.
writable
.
write
(
out
);
}
@Override
public
void
read
(
DataInput
in
)
throws
IOException
{
public
void
read
(
DataInput
View
in
)
throws
IOException
{
this
.
writable
.
readFields
(
in
);
}
...
...
stratosphere-addons/hbase/src/main/java/eu/stratosphere/addons/hbase/common/HBaseResult.java
浏览文件 @
7b6295d9
...
...
@@ -13,12 +13,12 @@
package
eu.stratosphere.addons.hbase.common
;
import
java.io.DataInput
;
import
java.io.DataOutput
;
import
java.io.IOException
;
import
org.apache.hadoop.hbase.client.Result
;
import
eu.stratosphere.core.memory.DataInputView
;
import
eu.stratosphere.core.memory.DataOutputView
;
import
eu.stratosphere.types.Value
;
public
class
HBaseResult
implements
Value
{
...
...
@@ -53,12 +53,12 @@ public class HBaseResult implements Value {
}
@Override
public
void
read
(
DataInput
in
)
throws
IOException
{
public
void
read
(
DataInput
View
in
)
throws
IOException
{
this
.
result
.
readFields
(
in
);
}
@Override
public
void
write
(
DataOutput
out
)
throws
IOException
{
public
void
write
(
DataOutput
View
out
)
throws
IOException
{
this
.
result
.
write
(
out
);
}
}
stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
浏览文件 @
7b6295d9
...
...
@@ -222,6 +222,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
try
{
advance
();
}
catch
(
EOFException
eof
){
this
.
positionInSegment
+=
toRead
;
return
bytesRead
;
}
remaining
=
this
.
limitInSegment
-
this
.
positionInSegment
;
...
...
@@ -244,8 +245,8 @@ public abstract class AbstractPagedInputView implements DataInputView {
public
void
readFully
(
byte
[]
b
,
int
off
,
int
len
)
throws
IOException
{
int
bytesRead
=
read
(
b
,
off
,
len
);
if
(
bytesRead
==
-
1
){
throw
new
EOFException
(
"There is no
more
data left in the DataInputView."
);
if
(
bytesRead
<
len
){
throw
new
EOFException
(
"There is no
enough
data left in the DataInputView."
);
}
}
...
...
stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
浏览文件 @
7b6295d9
...
...
@@ -21,10 +21,13 @@ import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFacto
import
eu.stratosphere.runtime.io.serialization.types.Util
;
import
org.junit.Test
;
import
java.io.EOFException
;
import
java.io.IOException
;
import
java.util.*
;
import
static
org
.
junit
.
Assert
.
assertArrayEquals
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
import
static
org
.
junit
.
Assert
.
fail
;
public
class
PagedViewsTest
{
...
...
@@ -69,6 +72,260 @@ public class PagedViewsTest {
}
}
@Test
public
void
testReadFully
()
{
int
bufferSize
=
100
;
byte
[]
expected
=
new
byte
[
bufferSize
];
new
Random
().
nextBytes
(
expected
);
TestOutputView
outputView
=
new
TestOutputView
(
bufferSize
);
try
{
outputView
.
write
(
expected
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not write to TestOutputView."
);
}
outputView
.
close
();
TestInputView
inputView
=
new
TestInputView
(
outputView
.
segments
);
byte
[]
buffer
=
new
byte
[
bufferSize
];
try
{
inputView
.
readFully
(
buffer
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not read TestInputView."
);
}
assertEquals
(
inputView
.
getCurrentPositionInSegment
(),
bufferSize
);
assertArrayEquals
(
expected
,
buffer
);
}
@Test
public
void
testReadFullyAcrossSegments
()
{
int
bufferSize
=
100
;
int
segmentSize
=
30
;
byte
[]
expected
=
new
byte
[
bufferSize
];
new
Random
().
nextBytes
(
expected
);
TestOutputView
outputView
=
new
TestOutputView
(
segmentSize
);
try
{
outputView
.
write
(
expected
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not write to TestOutputView."
);
}
outputView
.
close
();
TestInputView
inputView
=
new
TestInputView
(
outputView
.
segments
);
byte
[]
buffer
=
new
byte
[
bufferSize
];
try
{
inputView
.
readFully
(
buffer
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not read TestInputView."
);
}
assertEquals
(
inputView
.
getCurrentPositionInSegment
(),
bufferSize
%
segmentSize
);
assertArrayEquals
(
expected
,
buffer
);
}
@Test
public
void
testReadAcrossSegments
()
{
int
bufferSize
=
100
;
int
bytes2Write
=
75
;
int
segmentSize
=
30
;
byte
[]
expected
=
new
byte
[
bytes2Write
];
new
Random
().
nextBytes
(
expected
);
TestOutputView
outputView
=
new
TestOutputView
(
segmentSize
);
try
{
outputView
.
write
(
expected
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not write to TestOutputView."
);
}
outputView
.
close
();
TestInputView
inputView
=
new
TestInputView
(
outputView
.
segments
);
byte
[]
buffer
=
new
byte
[
bufferSize
];
int
bytesRead
=
0
;
try
{
bytesRead
=
inputView
.
read
(
buffer
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not read TestInputView."
);
}
assertEquals
(
bytes2Write
,
bytesRead
);
assertEquals
(
inputView
.
getCurrentPositionInSegment
(),
bytes2Write
%
segmentSize
);
byte
[]
tempBuffer
=
new
byte
[
bytesRead
];
System
.
arraycopy
(
buffer
,
0
,
tempBuffer
,
0
,
bytesRead
);
assertArrayEquals
(
expected
,
tempBuffer
);
}
@Test
public
void
testEmptyingInputView
()
{
int
bufferSize
=
100
;
int
bytes2Write
=
75
;
int
segmentSize
=
30
;
byte
[]
expected
=
new
byte
[
bytes2Write
];
new
Random
().
nextBytes
(
expected
);
TestOutputView
outputView
=
new
TestOutputView
(
segmentSize
);
try
{
outputView
.
write
(
expected
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not write to TestOutputView."
);
}
outputView
.
close
();
TestInputView
inputView
=
new
TestInputView
(
outputView
.
segments
);
byte
[]
buffer
=
new
byte
[
bufferSize
];
int
bytesRead
=
0
;
try
{
bytesRead
=
inputView
.
read
(
buffer
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not read TestInputView."
);
}
assertEquals
(
bytes2Write
,
bytesRead
);
byte
[]
tempBuffer
=
new
byte
[
bytesRead
];
System
.
arraycopy
(
buffer
,
0
,
tempBuffer
,
0
,
bytesRead
);
assertArrayEquals
(
expected
,
tempBuffer
);
try
{
bytesRead
=
inputView
.
read
(
buffer
);
}
catch
(
IOException
e
){
e
.
printStackTrace
();
fail
(
"Unexpected exception: Input view should be empty and thus return -1."
);
}
assertEquals
(-
1
,
bytesRead
);
assertEquals
(
inputView
.
getCurrentPositionInSegment
(),
bytes2Write
%
segmentSize
);
}
@Test
public
void
testReadFullyWithNotEnoughData
()
{
int
bufferSize
=
100
;
int
bytes2Write
=
99
;
int
segmentSize
=
30
;
byte
[]
expected
=
new
byte
[
bytes2Write
];
new
Random
().
nextBytes
(
expected
);
TestOutputView
outputView
=
new
TestOutputView
(
segmentSize
);
try
{
outputView
.
write
(
expected
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not write to TestOutputView."
);
}
outputView
.
close
();
TestInputView
inputView
=
new
TestInputView
(
outputView
.
segments
);
byte
[]
buffer
=
new
byte
[
bufferSize
];
boolean
eofException
=
false
;
try
{
inputView
.
readFully
(
buffer
);
}
catch
(
EOFException
e
){
//Expected exception
eofException
=
true
;
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not read TestInputView."
);
}
assertTrue
(
"EOFException should have occurred."
,
eofException
);
int
bytesRead
=
0
;
try
{
bytesRead
=
inputView
.
read
(
buffer
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not read TestInputView."
);
}
assertEquals
(-
1
,
bytesRead
);
}
@Test
public
void
testReadFullyWithOffset
(){
int
bufferSize
=
100
;
int
segmentSize
=
30
;
byte
[]
expected
=
new
byte
[
bufferSize
];
new
Random
().
nextBytes
(
expected
);
TestOutputView
outputView
=
new
TestOutputView
(
segmentSize
);
try
{
outputView
.
write
(
expected
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not write to TestOutputView."
);
}
outputView
.
close
();
TestInputView
inputView
=
new
TestInputView
(
outputView
.
segments
);
byte
[]
buffer
=
new
byte
[
2
*
bufferSize
];
try
{
inputView
.
readFully
(
buffer
,
bufferSize
,
bufferSize
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not read TestInputView."
);
}
assertEquals
(
inputView
.
getCurrentPositionInSegment
(),
bufferSize
%
segmentSize
);
byte
[]
tempBuffer
=
new
byte
[
bufferSize
];
System
.
arraycopy
(
buffer
,
bufferSize
,
tempBuffer
,
0
,
bufferSize
);
assertArrayEquals
(
expected
,
tempBuffer
);
}
@Test
public
void
testReadFullyEmptyView
(){
int
segmentSize
=
30
;
TestOutputView
outputView
=
new
TestOutputView
(
segmentSize
);
outputView
.
close
();
TestInputView
inputView
=
new
TestInputView
(
outputView
.
segments
);
byte
[]
buffer
=
new
byte
[
segmentSize
];
boolean
eofException
=
false
;
try
{
inputView
.
readFully
(
buffer
);
}
catch
(
EOFException
e
){
//expected Exception
eofException
=
true
;
}
catch
(
Exception
e
){
e
.
printStackTrace
();
fail
(
"Unexpected exception: Could not read TestInputView."
);
}
assertTrue
(
"EOFException expected."
,
eofException
);
}
private
static
void
testSequenceOfTypes
(
Iterable
<
SerializationTestType
>
sequence
,
int
segmentSize
)
throws
Exception
{
List
<
SerializationTestType
>
elements
=
new
ArrayList
<
SerializationTestType
>(
512
);
...
...
@@ -148,7 +405,7 @@ public class PagedViewsTest {
if
(
num
<
segments
.
size
())
{
return
segments
.
get
(
num
).
segment
;
}
else
{
return
null
;
throw
new
EOFException
()
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录