Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
2e6e4de5
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
2e6e4de5
编写于
8月 27, 2015
作者:
C
chengxiang li
提交者:
Stephan Ewen
9月 02, 2015
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-2545] add bucket member verification while build bloom filter.
This closes #1067
上级
dd9979f8
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
71 addition
and
3 deletion
+71
-3
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
...apache/flink/runtime/operators/hash/MutableHashTable.java
+12
-3
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
.../apache/flink/runtime/operators/hash/HashTableITCase.java
+59
-0
未找到文件。
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
浏览文件 @
2e6e4de5
...
...
@@ -1120,9 +1120,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
final
protected
void
buildBloomFilterForBucketsInPartition
(
int
partNum
,
HashPartition
<
BT
,
PT
>
partition
)
{
// Find all the buckets which belongs to this partition, and build bloom filter for each bucket(include its overflow buckets).
final
int
bucketsPerSegment
=
this
.
bucketsPerSegmentMask
+
1
;
for
(
MemorySegment
segment
:
this
.
buckets
)
{
for
(
int
i
=
0
;
i
<
bucketsPerSegment
;
i
++)
{
final
int
bucketInSegmentOffset
=
i
*
HASH_BUCKET_SIZE
;
int
numSegs
=
this
.
buckets
.
length
;
// go over all segments that are part of the table
for
(
int
i
=
0
,
bucket
=
0
;
i
<
numSegs
&&
bucket
<
numBuckets
;
i
++)
{
final
MemorySegment
segment
=
this
.
buckets
[
i
];
// go over all buckets in the segment
for
(
int
k
=
0
;
k
<
bucketsPerSegment
&&
bucket
<
numBuckets
;
k
++,
bucket
++)
{
final
int
bucketInSegmentOffset
=
k
*
HASH_BUCKET_SIZE
;
byte
partitionNumber
=
segment
.
get
(
bucketInSegmentOffset
+
HEADER_PARTITION_OFFSET
);
if
(
partitionNumber
==
partNum
)
{
byte
status
=
segment
.
get
(
bucketInSegmentOffset
+
HEADER_STATUS_OFFSET
);
...
...
@@ -1140,6 +1145,10 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
*/
final
void
buildBloomFilterForBucket
(
int
bucketInSegmentPos
,
MemorySegment
bucket
,
HashPartition
<
BT
,
PT
>
p
)
{
final
int
count
=
bucket
.
getShort
(
bucketInSegmentPos
+
HEADER_COUNT_OFFSET
);
if
(
count
<=
0
)
{
return
;
}
int
[]
hashCodes
=
new
int
[
count
];
// As the hashcode and bloom filter occupy same bytes, so we read all hashcode out at first and then write back to bloom filter.
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
...
...
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
浏览文件 @
2e6e4de5
...
...
@@ -1477,6 +1477,65 @@ public class HashTableITCase {
this
.
memManager
.
release
(
join
.
getFreedMemory
());
}
@Test
public
void
testBucketsNotFulfillSegment
()
throws
Exception
{
final
int
NUM_KEYS
=
10000
;
final
int
BUILD_VALS_PER_KEY
=
3
;
final
int
PROBE_VALS_PER_KEY
=
10
;
// create a build input that gives 30000 pairs with 3 values sharing the same key
MutableObjectIterator
<
IntPair
>
buildInput
=
new
UniformIntPairGenerator
(
NUM_KEYS
,
BUILD_VALS_PER_KEY
,
false
);
// create a probe input that gives 100000 pairs with 10 values sharing a key
MutableObjectIterator
<
IntPair
>
probeInput
=
new
UniformIntPairGenerator
(
NUM_KEYS
,
PROBE_VALS_PER_KEY
,
true
);
// allocate the memory for the HashTable
List
<
MemorySegment
>
memSegments
;
try
{
// 33 is minimum number of pages required to perform hash join this inputs
memSegments
=
this
.
memManager
.
allocatePages
(
MEM_OWNER
,
33
);
}
catch
(
MemoryAllocationException
maex
)
{
fail
(
"Memory for the Join could not be provided."
);
return
;
}
// For FLINK-2545, the buckets data may not fulfill it's buffer, for example, the buffer may contains 256 buckets,
// while hash table only assign 250 bucket on it. The unused buffer bytes may contains arbitrary data, which may
// influence hash table if forget to skip it. To mock this, put the invalid bucket data(partition=1, inMemory=true, count=-1)
// at the end of buffer.
for
(
MemorySegment
segment
:
memSegments
)
{
int
newBucketOffset
=
segment
.
size
()
-
128
;
// initialize the header fields
segment
.
put
(
newBucketOffset
+
0
,
(
byte
)
0
);
segment
.
put
(
newBucketOffset
+
1
,
(
byte
)
0
);
segment
.
putShort
(
newBucketOffset
+
2
,
(
short
)
-
1
);
segment
.
putLong
(
newBucketOffset
+
4
,
~
0x0
L
);
}
// ----------------------------------------------------------------------------------------
final
MutableHashTable
<
IntPair
,
IntPair
>
join
=
new
MutableHashTable
<
IntPair
,
IntPair
>(
this
.
pairBuildSideAccesssor
,
this
.
pairProbeSideAccesssor
,
this
.
pairBuildSideComparator
,
this
.
pairProbeSideComparator
,
this
.
pairComparator
,
memSegments
,
ioManager
);
join
.
open
(
buildInput
,
probeInput
);
final
IntPair
recordReuse
=
new
IntPair
();
int
numRecordsInJoinResult
=
0
;
while
(
join
.
nextRecord
())
{
HashBucketIterator
<
IntPair
,
IntPair
>
buildSide
=
join
.
getBuildSideIterator
();
while
(
buildSide
.
next
(
recordReuse
)
!=
null
)
{
numRecordsInJoinResult
++;
}
}
Assert
.
assertEquals
(
"Wrong number of records in join result."
,
NUM_KEYS
*
BUILD_VALS_PER_KEY
*
PROBE_VALS_PER_KEY
,
numRecordsInJoinResult
);
join
.
close
();
this
.
memManager
.
release
(
join
.
getFreedMemory
());
}
// ============================================================================================
/**
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录