Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
941ac6df
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
941ac6df
编写于
8月 03, 2015
作者:
R
r-pogalz
提交者:
Fabian Hueske
8月 04, 2015
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-2105] Implement Sort-Merge Outer Join algorithm
This closes #907
上级
df9f4819
变更
11
展开全部
隐藏空白更改
内联
并排
Showing
11 changed file
with
974 addition
and
36 deletion
+974
-36
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
...e/flink/runtime/operators/sort/AbstractMergeIterator.java
+29
-29
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
...untime/operators/sort/AbstractMergeOuterJoinIterator.java
+189
-0
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
...time/operators/sort/NonReusingMergeOuterJoinIterator.java
+60
-0
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
...runtime/operators/sort/ReusingMergeOuterJoinIterator.java
+63
-0
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
...rators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+462
-0
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
...tors/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
+2
-2
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
...tors/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
+82
-0
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
...erators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
+2
-2
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
...erators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
+82
-0
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
...a/org/apache/flink/runtime/operators/testutils/Match.java
+1
-1
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
...link/runtime/operators/testutils/MatchRemovingJoiner.java
+2
-2
未找到文件。
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
浏览文件 @
941ac6df
...
...
@@ -115,20 +115,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
}
/**
* Calls the <code>JoinFunction#
match
()</code> method for all two key-value pairs that share the same key and come
* from different inputs. The output of the <code>
match
()</code> method is forwarded.
* Calls the <code>JoinFunction#
join
()</code> method for all two key-value pairs that share the same key and come
* from different inputs. The output of the <code>
join
()</code> method is forwarded.
* <p>
* This method first zig-zags between the two sorted inputs in order to find a common
* key, and then calls the
match
stub with the cross product of the values.
* key, and then calls the
join
stub with the cross product of the values.
*
* @throws Exception Forwards all exceptions from the user code and the I/O system.
* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
*/
@Override
public
abstract
boolean
callWithNextKey
(
final
FlatJoinFunction
<
T1
,
T2
,
O
>
match
Function
,
final
Collector
<
O
>
collector
)
public
abstract
boolean
callWithNextKey
(
final
FlatJoinFunction
<
T1
,
T2
,
O
>
join
Function
,
final
Collector
<
O
>
collector
)
throws
Exception
;
protected
void
crossMatchingGroup
(
Iterator
<
T1
>
values1
,
Iterator
<
T2
>
values2
,
FlatJoinFunction
<
T1
,
T2
,
O
>
match
Function
,
Collector
<
O
>
collector
)
throws
Exception
{
protected
void
crossMatchingGroup
(
Iterator
<
T1
>
values1
,
Iterator
<
T2
>
values2
,
FlatJoinFunction
<
T1
,
T2
,
O
>
join
Function
,
Collector
<
O
>
collector
)
throws
Exception
{
final
T1
firstV1
=
values1
.
next
();
final
T2
firstV2
=
values2
.
next
();
...
...
@@ -143,23 +143,23 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
if
(
v2HasNext
)
{
// both sides contain more than one value
// TODO: Decide which side to spill and which to block!
crossMwithNValues
(
firstV1
,
values1
,
firstV2
,
values2
,
match
Function
,
collector
);
crossMwithNValues
(
firstV1
,
values1
,
firstV2
,
values2
,
join
Function
,
collector
);
}
else
{
crossSecond1withNValues
(
firstV2
,
firstV1
,
values1
,
match
Function
,
collector
);
crossSecond1withNValues
(
firstV2
,
firstV1
,
values1
,
join
Function
,
collector
);
}
}
else
{
if
(
v2HasNext
)
{
crossFirst1withNValues
(
firstV1
,
firstV2
,
values2
,
match
Function
,
collector
);
crossFirst1withNValues
(
firstV1
,
firstV2
,
values2
,
join
Function
,
collector
);
}
else
{
// both sides contain only one value
match
Function
.
join
(
firstV1
,
firstV2
,
collector
);
join
Function
.
join
(
firstV1
,
firstV2
,
collector
);
}
}
}
/**
* Crosses a single value from the first input with N values, all sharing a common key.
* Effectively realizes a <i>1:N</i>
match (join)
.
* Effectively realizes a <i>1:N</i>
join
.
*
* @param val1 The value form the <i>1</i> side.
* @param firstValN The first of the values from the <i>N</i> side.
...
...
@@ -167,21 +167,21 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
* @throws Exception Forwards all exceptions thrown by the stub.
*/
private
void
crossFirst1withNValues
(
final
T1
val1
,
final
T2
firstValN
,
final
Iterator
<
T2
>
valsN
,
final
FlatJoinFunction
<
T1
,
T2
,
O
>
match
Function
,
final
Collector
<
O
>
collector
)
final
Iterator
<
T2
>
valsN
,
final
FlatJoinFunction
<
T1
,
T2
,
O
>
join
Function
,
final
Collector
<
O
>
collector
)
throws
Exception
{
T1
copy1
=
createCopy
(
serializer1
,
val1
,
this
.
copy1
);
match
Function
.
join
(
copy1
,
firstValN
,
collector
);
join
Function
.
join
(
copy1
,
firstValN
,
collector
);
// set copy and
match
first element
// set copy and
join
first element
boolean
more
=
true
;
do
{
final
T2
nRec
=
valsN
.
next
();
if
(
valsN
.
hasNext
())
{
copy1
=
createCopy
(
serializer1
,
val1
,
this
.
copy1
);
match
Function
.
join
(
copy1
,
nRec
,
collector
);
join
Function
.
join
(
copy1
,
nRec
,
collector
);
}
else
{
match
Function
.
join
(
val1
,
nRec
,
collector
);
join
Function
.
join
(
val1
,
nRec
,
collector
);
more
=
false
;
}
}
...
...
@@ -190,7 +190,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
/**
* Crosses a single value from the second side with N values, all sharing a common key.
* Effectively realizes a <i>N:1</i>
match (join)
.
* Effectively realizes a <i>N:1</i>
join
.
*
* @param val1 The value form the <i>1</i> side.
* @param firstValN The first of the values from the <i>N</i> side.
...
...
@@ -198,20 +198,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
* @throws Exception Forwards all exceptions thrown by the stub.
*/
private
void
crossSecond1withNValues
(
T2
val1
,
T1
firstValN
,
Iterator
<
T1
>
valsN
,
FlatJoinFunction
<
T1
,
T2
,
O
>
match
Function
,
Collector
<
O
>
collector
)
throws
Exception
{
Iterator
<
T1
>
valsN
,
FlatJoinFunction
<
T1
,
T2
,
O
>
join
Function
,
Collector
<
O
>
collector
)
throws
Exception
{
T2
copy2
=
createCopy
(
serializer2
,
val1
,
this
.
copy2
);
match
Function
.
join
(
firstValN
,
copy2
,
collector
);
join
Function
.
join
(
firstValN
,
copy2
,
collector
);
// set copy and
match
first element
// set copy and
join
first element
boolean
more
=
true
;
do
{
final
T1
nRec
=
valsN
.
next
();
if
(
valsN
.
hasNext
())
{
copy2
=
createCopy
(
serializer2
,
val1
,
this
.
copy2
);
match
Function
.
join
(
nRec
,
copy2
,
collector
);
join
Function
.
join
(
nRec
,
copy2
,
collector
);
}
else
{
match
Function
.
join
(
nRec
,
val1
,
collector
);
join
Function
.
join
(
nRec
,
val1
,
collector
);
more
=
false
;
}
}
...
...
@@ -220,7 +220,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
private
void
crossMwithNValues
(
final
T1
firstV1
,
Iterator
<
T1
>
spillVals
,
final
T2
firstV2
,
final
Iterator
<
T2
>
blockVals
,
final
FlatJoinFunction
<
T1
,
T2
,
O
>
match
Function
,
final
Collector
<
O
>
collector
)
throws
Exception
{
final
FlatJoinFunction
<
T1
,
T2
,
O
>
join
Function
,
final
Collector
<
O
>
collector
)
throws
Exception
{
// ==================================================
// We have one first (head) element from both inputs (firstV1 and firstV2)
// We have an iterator for both inputs.
...
...
@@ -237,13 +237,13 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
// 5) cross the head of the spilling side with the next block
// 6) cross the spilling iterator with the next block.
//
match
the first values first
//
join
the first values first
T1
copy1
=
this
.
createCopy
(
serializer1
,
firstV1
,
this
.
copy1
);
T2
blockHeadCopy
=
this
.
createCopy
(
serializer2
,
firstV2
,
this
.
blockHeadCopy
);
T1
spillHeadCopy
=
null
;
// --------------- 1) Cross the heads -------------------
match
Function
.
join
(
copy1
,
firstV2
,
collector
);
join
Function
.
join
(
copy1
,
firstV2
,
collector
);
// for the remaining values, we do a block-nested-loops join
SpillingResettableIterator
<
T1
>
spillIt
=
null
;
...
...
@@ -256,7 +256,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
while
(
this
.
blockIt
.
hasNext
())
{
final
T2
nextBlockRec
=
this
.
blockIt
.
next
();
copy1
=
this
.
createCopy
(
serializer1
,
firstV1
,
this
.
copy1
);
match
Function
.
join
(
copy1
,
nextBlockRec
,
collector
);
join
Function
.
join
(
copy1
,
nextBlockRec
,
collector
);
}
this
.
blockIt
.
reset
();
...
...
@@ -286,7 +286,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
// -------- 3) cross the iterator of the spilling side with the head of the block side --------
T2
copy2
=
this
.
createCopy
(
serializer2
,
blockHeadCopy
,
this
.
copy2
);
match
Function
.
join
(
copy1
,
copy2
,
collector
);
join
Function
.
join
(
copy1
,
copy2
,
collector
);
// -------- 4) cross the iterator of the spilling side with the first block --------
while
(
this
.
blockIt
.
hasNext
())
{
...
...
@@ -294,7 +294,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
// get instances of key and block value
copy1
=
this
.
createCopy
(
serializer1
,
nextSpillVal
,
this
.
copy1
);
match
Function
.
join
(
copy1
,
nextBlockRec
,
collector
);
join
Function
.
join
(
copy1
,
nextBlockRec
,
collector
);
}
// reset block iterator
this
.
blockIt
.
reset
();
...
...
@@ -316,7 +316,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
while
(
this
.
blockIt
.
hasNext
())
{
copy1
=
this
.
createCopy
(
serializer1
,
spillHeadCopy
,
this
.
copy1
);
final
T2
nextBlockVal
=
blockIt
.
next
();
match
Function
.
join
(
copy1
,
nextBlockVal
,
collector
);
join
Function
.
join
(
copy1
,
nextBlockVal
,
collector
);
}
this
.
blockIt
.
reset
();
...
...
@@ -329,7 +329,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
// get instances of key and block value
final
T2
nextBlockVal
=
this
.
blockIt
.
next
();
copy1
=
this
.
createCopy
(
serializer1
,
nextSpillVal
,
this
.
copy1
);
match
Function
.
join
(
copy1
,
nextBlockVal
,
collector
);
join
Function
.
join
(
copy1
,
nextBlockVal
,
collector
);
}
// reset block iterator
...
...
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
0 → 100644
浏览文件 @
941ac6df
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.runtime.operators.sort
;
import
org.apache.flink.api.common.functions.FlatJoinFunction
;
import
org.apache.flink.api.common.typeutils.TypeComparator
;
import
org.apache.flink.api.common.typeutils.TypePairComparator
;
import
org.apache.flink.api.common.typeutils.TypeSerializer
;
import
org.apache.flink.runtime.io.disk.iomanager.IOManager
;
import
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
;
import
org.apache.flink.runtime.memorymanager.MemoryAllocationException
;
import
org.apache.flink.runtime.memorymanager.MemoryManager
;
import
org.apache.flink.util.Collector
;
import
org.apache.flink.util.MutableObjectIterator
;
import
java.util.Iterator
;
/**
* An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
* outer join through a sort-merge join strategy.
*/
public
abstract
class
AbstractMergeOuterJoinIterator
<
T1
,
T2
,
O
>
extends
AbstractMergeIterator
<
T1
,
T2
,
O
>
{
public
static
enum
OuterJoinType
{
LEFT
,
RIGHT
,
FULL
}
private
final
OuterJoinType
outerJoinType
;
private
boolean
initialized
=
false
;
private
boolean
it1Empty
=
false
;
private
boolean
it2Empty
=
false
;
public
AbstractMergeOuterJoinIterator
(
OuterJoinType
outerJoinType
,
MutableObjectIterator
<
T1
>
input1
,
MutableObjectIterator
<
T2
>
input2
,
TypeSerializer
<
T1
>
serializer1
,
TypeComparator
<
T1
>
comparator1
,
TypeSerializer
<
T2
>
serializer2
,
TypeComparator
<
T2
>
comparator2
,
TypePairComparator
<
T1
,
T2
>
pairComparator
,
MemoryManager
memoryManager
,
IOManager
ioManager
,
int
numMemoryPages
,
AbstractInvokable
parentTask
)
throws
MemoryAllocationException
{
super
(
input1
,
input2
,
serializer1
,
comparator1
,
serializer2
,
comparator2
,
pairComparator
,
memoryManager
,
ioManager
,
numMemoryPages
,
parentTask
);
this
.
outerJoinType
=
outerJoinType
;
}
/**
* Calls the <code>JoinFunction#join()</code> method for all two key-value pairs that share the same key and come
* from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no
* matching partner from the other input exists are joined with null.
* The output of the <code>join()</code> method is forwarded.
*
* @throws Exception Forwards all exceptions from the user code and the I/O system.
* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
*/
@Override
public
boolean
callWithNextKey
(
final
FlatJoinFunction
<
T1
,
T2
,
O
>
joinFunction
,
final
Collector
<
O
>
collector
)
throws
Exception
{
if
(!
initialized
)
{
//first run, set iterators to first elements
it1Empty
=
!
this
.
iterator1
.
nextKey
();
it2Empty
=
!
this
.
iterator2
.
nextKey
();
initialized
=
true
;
}
if
(
it1Empty
&&
it2Empty
)
{
return
false
;
}
else
if
(
it2Empty
)
{
if
(
outerJoinType
==
OuterJoinType
.
LEFT
||
outerJoinType
==
OuterJoinType
.
FULL
)
{
joinLeftKeyValuesWithNull
(
iterator1
.
getValues
(),
joinFunction
,
collector
);
it1Empty
=
!
iterator1
.
nextKey
();
return
true
;
}
else
{
//consume rest of left side
while
(
iterator1
.
nextKey
())
;
it1Empty
=
true
;
return
false
;
}
}
else
if
(
it1Empty
)
{
if
(
outerJoinType
==
OuterJoinType
.
RIGHT
||
outerJoinType
==
OuterJoinType
.
FULL
)
{
joinRightKeyValuesWithNull
(
iterator2
.
getValues
(),
joinFunction
,
collector
);
it2Empty
=
!
iterator2
.
nextKey
();
return
true
;
}
else
{
//consume rest of right side
while
(
iterator2
.
nextKey
())
;
it2Empty
=
true
;
return
false
;
}
}
else
{
final
TypePairComparator
<
T1
,
T2
>
comparator
=
super
.
pairComparator
;
comparator
.
setReference
(
this
.
iterator1
.
getCurrent
());
T2
current2
=
this
.
iterator2
.
getCurrent
();
// zig zag
while
(
true
)
{
// determine the relation between the (possibly composite) keys
final
int
comp
=
comparator
.
compareToReference
(
current2
);
if
(
comp
==
0
)
{
break
;
}
if
(
comp
<
0
)
{
//right key < left key
if
(
outerJoinType
==
OuterJoinType
.
RIGHT
||
outerJoinType
==
OuterJoinType
.
FULL
)
{
//join right key values with null in case of right or full outer join
joinRightKeyValuesWithNull
(
iterator2
.
getValues
(),
joinFunction
,
collector
);
it2Empty
=
!
iterator2
.
nextKey
();
return
true
;
}
else
{
//skip this right key if it is a left outer join
if
(!
this
.
iterator2
.
nextKey
())
{
//if right side is empty, join current left key values with null
joinLeftKeyValuesWithNull
(
iterator1
.
getValues
(),
joinFunction
,
collector
);
it1Empty
=
!
iterator1
.
nextKey
();
it2Empty
=
true
;
return
true
;
}
current2
=
this
.
iterator2
.
getCurrent
();
}
}
else
{
//right key > left key
if
(
outerJoinType
==
OuterJoinType
.
LEFT
||
outerJoinType
==
OuterJoinType
.
FULL
)
{
//join left key values with null in case of left or full outer join
joinLeftKeyValuesWithNull
(
iterator1
.
getValues
(),
joinFunction
,
collector
);
it1Empty
=
!
iterator1
.
nextKey
();
return
true
;
}
else
{
//skip this left key if it is a right outer join
if
(!
this
.
iterator1
.
nextKey
())
{
//if right side is empty, join current right key values with null
joinRightKeyValuesWithNull
(
iterator2
.
getValues
(),
joinFunction
,
collector
);
it1Empty
=
true
;
it2Empty
=
!
iterator2
.
nextKey
();
return
true
;
}
comparator
.
setReference
(
this
.
iterator1
.
getCurrent
());
}
}
}
// here, we have a common key! call the join function with the cross product of the
// values
final
Iterator
<
T1
>
values1
=
this
.
iterator1
.
getValues
();
final
Iterator
<
T2
>
values2
=
this
.
iterator2
.
getValues
();
crossMatchingGroup
(
values1
,
values2
,
joinFunction
,
collector
);
it1Empty
=
!
iterator1
.
nextKey
();
it2Empty
=
!
iterator2
.
nextKey
();
return
true
;
}
}
private
void
joinLeftKeyValuesWithNull
(
Iterator
<
T1
>
values
,
FlatJoinFunction
<
T1
,
T2
,
O
>
joinFunction
,
Collector
<
O
>
collector
)
throws
Exception
{
while
(
values
.
hasNext
())
{
T1
next
=
values
.
next
();
this
.
copy1
=
createCopy
(
serializer1
,
next
,
copy1
);
joinFunction
.
join
(
copy1
,
null
,
collector
);
}
}
private
void
joinRightKeyValuesWithNull
(
Iterator
<
T2
>
values
,
FlatJoinFunction
<
T1
,
T2
,
O
>
joinFunction
,
Collector
<
O
>
collector
)
throws
Exception
{
while
(
values
.
hasNext
())
{
T2
next
=
values
.
next
();
this
.
copy2
=
createCopy
(
serializer2
,
next
,
copy2
);
joinFunction
.
join
(
null
,
copy2
,
collector
);
}
}
}
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
0 → 100644
浏览文件 @
941ac6df
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.runtime.operators.sort
;
import
org.apache.flink.api.common.typeutils.TypeComparator
;
import
org.apache.flink.api.common.typeutils.TypePairComparator
;
import
org.apache.flink.api.common.typeutils.TypeSerializer
;
import
org.apache.flink.runtime.io.disk.iomanager.IOManager
;
import
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
;
import
org.apache.flink.runtime.memorymanager.MemoryAllocationException
;
import
org.apache.flink.runtime.memorymanager.MemoryManager
;
import
org.apache.flink.runtime.util.KeyGroupedIterator
;
import
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator
;
import
org.apache.flink.util.MutableObjectIterator
;
public
class
NonReusingMergeOuterJoinIterator
<
T1
,
T2
,
O
>
extends
AbstractMergeOuterJoinIterator
<
T1
,
T2
,
O
>
{
public
NonReusingMergeOuterJoinIterator
(
OuterJoinType
outerJoinType
,
MutableObjectIterator
<
T1
>
input1
,
MutableObjectIterator
<
T2
>
input2
,
TypeSerializer
<
T1
>
serializer1
,
TypeComparator
<
T1
>
comparator1
,
TypeSerializer
<
T2
>
serializer2
,
TypeComparator
<
T2
>
comparator2
,
TypePairComparator
<
T1
,
T2
>
pairComparator
,
MemoryManager
memoryManager
,
IOManager
ioManager
,
int
numMemoryPages
,
AbstractInvokable
parentTask
)
throws
MemoryAllocationException
{
super
(
outerJoinType
,
input1
,
input2
,
serializer1
,
comparator1
,
serializer2
,
comparator2
,
pairComparator
,
memoryManager
,
ioManager
,
numMemoryPages
,
parentTask
);
}
@Override
protected
<
T
>
KeyGroupedIterator
<
T
>
createKeyGroupedIterator
(
MutableObjectIterator
<
T
>
input
,
TypeSerializer
<
T
>
serializer
,
TypeComparator
<
T
>
comparator
)
{
return
new
NonReusingKeyGroupedIterator
<
T
>(
input
,
comparator
);
}
@Override
protected
<
T
>
T
createCopy
(
TypeSerializer
<
T
>
serializer
,
T
value
,
T
reuse
)
{
return
serializer
.
copy
(
value
);
}
}
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
0 → 100644
浏览文件 @
941ac6df
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.runtime.operators.sort
;
import
org.apache.flink.api.common.typeutils.TypeComparator
;
import
org.apache.flink.api.common.typeutils.TypePairComparator
;
import
org.apache.flink.api.common.typeutils.TypeSerializer
;
import
org.apache.flink.runtime.io.disk.iomanager.IOManager
;
import
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
;
import
org.apache.flink.runtime.memorymanager.MemoryAllocationException
;
import
org.apache.flink.runtime.memorymanager.MemoryManager
;
import
org.apache.flink.runtime.util.KeyGroupedIterator
;
import
org.apache.flink.runtime.util.ReusingKeyGroupedIterator
;
import
org.apache.flink.util.MutableObjectIterator
;
public
class
ReusingMergeOuterJoinIterator
<
T1
,
T2
,
O
>
extends
AbstractMergeOuterJoinIterator
<
T1
,
T2
,
O
>
{
public
ReusingMergeOuterJoinIterator
(
OuterJoinType
outerJoinType
,
MutableObjectIterator
<
T1
>
input1
,
MutableObjectIterator
<
T2
>
input2
,
TypeSerializer
<
T1
>
serializer1
,
TypeComparator
<
T1
>
comparator1
,
TypeSerializer
<
T2
>
serializer2
,
TypeComparator
<
T2
>
comparator2
,
TypePairComparator
<
T1
,
T2
>
pairComparator
,
MemoryManager
memoryManager
,
IOManager
ioManager
,
int
numMemoryPages
,
AbstractInvokable
parentTask
)
throws
MemoryAllocationException
{
super
(
outerJoinType
,
input1
,
input2
,
serializer1
,
comparator1
,
serializer2
,
comparator2
,
pairComparator
,
memoryManager
,
ioManager
,
numMemoryPages
,
parentTask
);
this
.
copy1
=
serializer1
.
createInstance
();
this
.
spillHeadCopy
=
serializer1
.
createInstance
();
this
.
copy2
=
serializer2
.
createInstance
();
this
.
blockHeadCopy
=
serializer2
.
createInstance
();
}
@Override
protected
<
T
>
KeyGroupedIterator
<
T
>
createKeyGroupedIterator
(
MutableObjectIterator
<
T
>
input
,
TypeSerializer
<
T
>
serializer
,
TypeComparator
<
T
>
comparator
)
{
return
new
ReusingKeyGroupedIterator
<
T
>(
input
,
serializer
,
comparator
);
}
@Override
protected
<
T
>
T
createCopy
(
TypeSerializer
<
T
>
serializer
,
T
value
,
T
reuse
)
{
return
serializer
.
copy
(
value
,
reuse
);
}
}
\ No newline at end of file
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
0 → 100644
浏览文件 @
941ac6df
此差异已折叠。
点击以展开。
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
浏览文件 @
941ac6df
...
...
@@ -135,7 +135,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase {
collectData
(
input2
));
final
FlatJoinFunction
<
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>>
joinFunction
=
new
MatchRemoving
Match
er
(
expectedMatchesMap
);
new
MatchRemoving
Join
er
(
expectedMatchesMap
);
final
Collector
<
Tuple2
<
Integer
,
String
>>
collector
=
new
DiscardingOutputCollector
<
Tuple2
<
Integer
,
String
>>();
...
...
@@ -226,7 +226,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase {
input1
=
new
MergeIterator
<
Tuple2
<
Integer
,
String
>>(
inList1
,
comparator1
.
duplicate
());
input2
=
new
MergeIterator
<
Tuple2
<
Integer
,
String
>>(
inList2
,
comparator2
.
duplicate
());
final
FlatJoinFunction
<
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>>
joinFunction
=
new
MatchRemoving
Match
er
(
expectedMatchesMap
);
final
FlatJoinFunction
<
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>>
joinFunction
=
new
MatchRemoving
Join
er
(
expectedMatchesMap
);
final
Collector
<
Tuple2
<
Integer
,
String
>>
collector
=
new
DiscardingOutputCollector
<
Tuple2
<
Integer
,
String
>>();
...
...
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
0 → 100644
浏览文件 @
941ac6df
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.runtime.operators.sort
;
import
org.apache.flink.api.common.typeutils.TypeComparator
;
import
org.apache.flink.api.common.typeutils.TypePairComparator
;
import
org.apache.flink.api.common.typeutils.TypeSerializer
;
import
org.apache.flink.runtime.io.disk.iomanager.IOManager
;
import
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
;
import
org.apache.flink.runtime.memorymanager.MemoryManager
;
import
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType
;
import
org.apache.flink.util.MutableObjectIterator
;
import
org.junit.Test
;
public
class
NonReusingSortMergeOuterJoinIteratorITCase
extends
AbstractSortMergeOuterJoinIteratorITCase
{
@Override
protected
<
T1
,
T2
>
AbstractMergeOuterJoinIterator
createOuterJoinIterator
(
OuterJoinType
outerJoinType
,
MutableObjectIterator
<
T1
>
input1
,
MutableObjectIterator
<
T2
>
input2
,
TypeSerializer
<
T1
>
serializer1
,
TypeComparator
<
T1
>
comparator1
,
TypeSerializer
<
T2
>
serializer2
,
TypeComparator
<
T2
>
comparator2
,
TypePairComparator
<
T1
,
T2
>
pairComparator
,
MemoryManager
memoryManager
,
IOManager
ioManager
,
int
numMemoryPages
,
AbstractInvokable
parentTask
)
throws
Exception
{
return
new
NonReusingMergeOuterJoinIterator
(
outerJoinType
,
input1
,
input2
,
serializer1
,
comparator1
,
serializer2
,
comparator2
,
pairComparator
,
memoryManager
,
ioManager
,
numMemoryPages
,
parentTask
);
}
@Test
public
void
testFullOuterWithSample
()
throws
Exception
{
super
.
testFullOuterWithSample
();
}
@Test
public
void
testLeftOuterWithSample
()
throws
Exception
{
super
.
testLeftOuterWithSample
();
}
@Test
public
void
testRightOuterWithSample
()
throws
Exception
{
super
.
testRightOuterWithSample
();
}
@Test
public
void
testRightSideEmpty
()
throws
Exception
{
super
.
testRightSideEmpty
();
}
@Test
public
void
testLeftSideEmpty
()
throws
Exception
{
super
.
testLeftSideEmpty
();
}
@Test
public
void
testFullOuterJoinWithHighNumberOfCommonKeys
()
{
testOuterJoinWithHighNumberOfCommonKeys
(
OuterJoinType
.
FULL
,
200
,
500
,
2048
,
0.02f
,
200
,
500
,
2048
,
0.02f
);
}
@Test
public
void
testLeftOuterJoinWithHighNumberOfCommonKeys
()
{
testOuterJoinWithHighNumberOfCommonKeys
(
OuterJoinType
.
LEFT
,
200
,
10
,
4096
,
0.02f
,
100
,
4000
,
2048
,
0.02f
);
}
@Test
public
void
testRightOuterJoinWithHighNumberOfCommonKeys
()
{
testOuterJoinWithHighNumberOfCommonKeys
(
OuterJoinType
.
RIGHT
,
100
,
10
,
2048
,
0.02f
,
200
,
4000
,
4096
,
0.02f
);
}
}
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
浏览文件 @
941ac6df
...
...
@@ -135,7 +135,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase {
collectData
(
input2
));
final
FlatJoinFunction
<
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>>
joinFunction
=
new
MatchRemoving
Match
er
(
expectedMatchesMap
);
new
MatchRemoving
Join
er
(
expectedMatchesMap
);
final
Collector
<
Tuple2
<
Integer
,
String
>>
collector
=
new
DiscardingOutputCollector
<
Tuple2
<
Integer
,
String
>>();
...
...
@@ -226,7 +226,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase {
input1
=
new
MergeIterator
<
Tuple2
<
Integer
,
String
>>(
inList1
,
comparator1
.
duplicate
());
input2
=
new
MergeIterator
<
Tuple2
<
Integer
,
String
>>(
inList2
,
comparator2
.
duplicate
());
final
FlatJoinFunction
<
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>>
matcher
=
new
MatchRemoving
Match
er
(
expectedMatchesMap
);
final
FlatJoinFunction
<
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>>
matcher
=
new
MatchRemoving
Join
er
(
expectedMatchesMap
);
final
Collector
<
Tuple2
<
Integer
,
String
>>
collector
=
new
DiscardingOutputCollector
<
Tuple2
<
Integer
,
String
>>();
...
...
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
0 → 100644
浏览文件 @
941ac6df
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.runtime.operators.sort
;
import
org.apache.flink.api.common.typeutils.TypeComparator
;
import
org.apache.flink.api.common.typeutils.TypePairComparator
;
import
org.apache.flink.api.common.typeutils.TypeSerializer
;
import
org.apache.flink.runtime.io.disk.iomanager.IOManager
;
import
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
;
import
org.apache.flink.runtime.memorymanager.MemoryManager
;
import
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType
;
import
org.apache.flink.util.MutableObjectIterator
;
import
org.junit.Test
;
public
class
ReusingSortMergeOuterJoinIteratorITCase
extends
AbstractSortMergeOuterJoinIteratorITCase
{
@Override
protected
<
T1
,
T2
>
AbstractMergeOuterJoinIterator
createOuterJoinIterator
(
OuterJoinType
outerJoinType
,
MutableObjectIterator
<
T1
>
input1
,
MutableObjectIterator
<
T2
>
input2
,
TypeSerializer
<
T1
>
serializer1
,
TypeComparator
<
T1
>
comparator1
,
TypeSerializer
<
T2
>
serializer2
,
TypeComparator
<
T2
>
comparator2
,
TypePairComparator
<
T1
,
T2
>
pairComparator
,
MemoryManager
memoryManager
,
IOManager
ioManager
,
int
numMemoryPages
,
AbstractInvokable
parentTask
)
throws
Exception
{
return
new
ReusingMergeOuterJoinIterator
(
outerJoinType
,
input1
,
input2
,
serializer1
,
comparator1
,
serializer2
,
comparator2
,
pairComparator
,
memoryManager
,
ioManager
,
numMemoryPages
,
parentTask
);
}
@Test
public
void
testFullOuterWithSample
()
throws
Exception
{
super
.
testFullOuterWithSample
();
}
@Test
public
void
testLeftOuterWithSample
()
throws
Exception
{
super
.
testLeftOuterWithSample
();
}
@Test
public
void
testRightOuterWithSample
()
throws
Exception
{
super
.
testRightOuterWithSample
();
}
@Test
public
void
testRightSideEmpty
()
throws
Exception
{
super
.
testRightSideEmpty
();
}
@Test
public
void
testLeftSideEmpty
()
throws
Exception
{
super
.
testLeftSideEmpty
();
}
@Test
public
void
testFullOuterJoinWithHighNumberOfCommonKeys
()
{
testOuterJoinWithHighNumberOfCommonKeys
(
OuterJoinType
.
FULL
,
200
,
500
,
2048
,
0.02f
,
200
,
500
,
2048
,
0.02f
);
}
@Test
public
void
testLeftOuterJoinWithHighNumberOfCommonKeys
()
{
testOuterJoinWithHighNumberOfCommonKeys
(
OuterJoinType
.
LEFT
,
200
,
10
,
4096
,
0.02f
,
100
,
4000
,
2048
,
0.02f
);
}
@Test
public
void
testRightOuterJoinWithHighNumberOfCommonKeys
()
{
testOuterJoinWithHighNumberOfCommonKeys
(
OuterJoinType
.
RIGHT
,
100
,
10
,
2048
,
0.02f
,
200
,
4000
,
4096
,
0.02f
);
}
}
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
浏览文件 @
941ac6df
...
...
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.testutils;
/**
* Utility class for keeping track of matches in join operator tests.
*
* @see
org.apache.flink.runtime.operators.testutils.MatchRemovingMatch
er
* @see
MatchRemovingJoin
er
*/
public
class
Match
{
private
final
String
left
;
...
...
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemoving
Match
er.java
→
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemoving
Join
er.java
浏览文件 @
941ac6df
...
...
@@ -27,12 +27,12 @@ import java.util.Collection;
import
java.util.Map
;
public
final
class
MatchRemoving
Match
er
implements
FlatJoinFunction
<
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>>
{
public
final
class
MatchRemoving
Join
er
implements
FlatJoinFunction
<
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>,
Tuple2
<
Integer
,
String
>>
{
private
static
final
long
serialVersionUID
=
1L
;
private
final
Map
<
Integer
,
Collection
<
Match
>>
toRemoveFrom
;
public
MatchRemoving
Match
er
(
Map
<
Integer
,
Collection
<
Match
>>
map
)
{
public
MatchRemoving
Join
er
(
Map
<
Integer
,
Collection
<
Match
>>
map
)
{
this
.
toRemoveFrom
=
map
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录