Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
openanolis
dragonwell8_jdk
提交
e4ca9b16
D
dragonwell8_jdk
项目概览
openanolis
/
dragonwell8_jdk
通知
4
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
dragonwell8_jdk
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e4ca9b16
编写于
1月 16, 2014
作者:
P
psandoz
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
8029452: Fork/Join task ForEachOps.ForEachOrderedTask clarifications and minor improvements
Reviewed-by: mduigou, briangoetz
上级
bad5624a
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
109 addition
and
25 deletion
+109
-25
src/share/classes/java/util/stream/ForEachOps.java
src/share/classes/java/util/stream/ForEachOps.java
+109
-25
未找到文件。
src/share/classes/java/util/stream/ForEachOps.java
浏览文件 @
e4ca9b16
...
...
@@ -317,12 +317,55 @@ final class ForEachOps {
*/
@SuppressWarnings
(
"serial"
)
static
final
class
ForEachOrderedTask
<
S
,
T
>
extends
CountedCompleter
<
Void
>
{
/*
* Our goal is to ensure that the elements associated with a task are
* processed according to an in-order traversal of the computation tree.
* We use completion counts for representing these dependencies, so that
* a task does not complete until all the tasks preceding it in this
* order complete. We use the "completion map" to associate the next
* task in this order for any left child. We increase the pending count
* of any node on the right side of such a mapping by one to indicate
* its dependency, and when a node on the left side of such a mapping
* completes, it decrements the pending count of its corresponding right
* side. As the computation tree is expanded by splitting, we must
* atomically update the mappings to maintain the invariant that the
* completion map maps left children to the next node in the in-order
* traversal.
*
* Take, for example, the following computation tree of tasks:
*
* a
* / \
* b c
* / \ / \
* d e f g
*
* The complete map will contain (not necessarily all at the same time)
* the following associations:
*
* d -> e
* b -> f
* f -> g
*
* Tasks e, f, g will have their pending counts increased by 1.
*
* The following relationships hold:
*
* - completion of d "happens-before" e;
* - completion of d and e "happens-before b;
* - completion of b "happens-before" f; and
* - completion of f "happens-before" g
*
* Thus overall the "happens-before" relationship holds for the
* reporting of elements, covered by tasks d, e, f and g, as specified
* by the forEachOrdered operation.
*/
private
final
PipelineHelper
<
T
>
helper
;
private
Spliterator
<
S
>
spliterator
;
private
final
long
targetSize
;
private
final
ConcurrentHashMap
<
ForEachOrderedTask
<
S
,
T
>,
ForEachOrderedTask
<
S
,
T
>>
completionMap
;
private
final
Sink
<
T
>
action
;
private
final
Object
lock
;
private
final
ForEachOrderedTask
<
S
,
T
>
leftPredecessor
;
private
Node
<
T
>
node
;
...
...
@@ -333,9 +376,9 @@ final class ForEachOps {
this
.
helper
=
helper
;
this
.
spliterator
=
spliterator
;
this
.
targetSize
=
AbstractTask
.
suggestTargetSize
(
spliterator
.
estimateSize
());
this
.
completionMap
=
new
ConcurrentHashMap
<>();
// Size map to avoid concurrent re-sizes
this
.
completionMap
=
new
ConcurrentHashMap
<>(
Math
.
max
(
16
,
AbstractTask
.
LEAF_TARGET
<<
1
));
this
.
action
=
action
;
this
.
lock
=
new
Object
();
this
.
leftPredecessor
=
null
;
}
...
...
@@ -348,7 +391,6 @@ final class ForEachOps {
this
.
targetSize
=
parent
.
targetSize
;
this
.
completionMap
=
parent
.
completionMap
;
this
.
action
=
parent
.
action
;
this
.
lock
=
parent
.
lock
;
this
.
leftPredecessor
=
leftPredecessor
;
}
...
...
@@ -367,16 +409,42 @@ final class ForEachOps {
new
ForEachOrderedTask
<>(
task
,
leftSplit
,
task
.
leftPredecessor
);
ForEachOrderedTask
<
S
,
T
>
rightChild
=
new
ForEachOrderedTask
<>(
task
,
rightSplit
,
leftChild
);
// Fork the parent task
// Completion of the left and right children "happens-before"
// completion of the parent
task
.
addToPendingCount
(
1
);
// Completion of the left child "happens-before" completion of
// the right child
rightChild
.
addToPendingCount
(
1
);
task
.
completionMap
.
put
(
leftChild
,
rightChild
);
task
.
addToPendingCount
(
1
);
// forking
rightChild
.
addToPendingCount
(
1
);
// right pending on left child
// If task is not on the left spine
if
(
task
.
leftPredecessor
!=
null
)
{
leftChild
.
addToPendingCount
(
1
);
// left pending on previous subtree, except left spine
if
(
task
.
completionMap
.
replace
(
task
.
leftPredecessor
,
task
,
leftChild
))
task
.
addToPendingCount
(-
1
);
// transfer my "right child" count to my left child
else
leftChild
.
addToPendingCount
(-
1
);
// left child is ready to go when ready
/*
* Completion of left-predecessor, or left subtree,
* "happens-before" completion of left-most leaf node of
* right subtree.
* The left child's pending count needs to be updated before
* it is associated in the completion map, otherwise the
* left child can complete prematurely and violate the
* "happens-before" constraint.
*/
leftChild
.
addToPendingCount
(
1
);
// Update association of left-predecessor to left-most
// leaf node of right subtree
if
(
task
.
completionMap
.
replace
(
task
.
leftPredecessor
,
task
,
leftChild
))
{
// If replaced, adjust the pending count of the parent
// to complete when its children complete
task
.
addToPendingCount
(-
1
);
}
else
{
// Left-predecessor has already completed, parent's
// pending count is adjusted by left-predecessor;
// left child is ready to complete
leftChild
.
addToPendingCount
(-
1
);
}
}
ForEachOrderedTask
<
S
,
T
>
taskToFork
;
if
(
forkRight
)
{
forkRight
=
false
;
...
...
@@ -391,31 +459,47 @@ final class ForEachOps {
}
taskToFork
.
fork
();
}
if
(
task
.
getPendingCount
()
==
0
)
{
task
.
helper
.
wrapAndCopyInto
(
task
.
action
,
rightSplit
);
}
else
{
/*
* Task's pending count is either 0 or 1. If 1 then the completion
* map will contain a value that is task, and two calls to
* tryComplete are required for completion, one below and one
* triggered by the completion of task's left-predecessor in
* onCompletion. Therefore there is no data race within the if
* block.
*/
if
(
task
.
getPendingCount
()
>
0
)
{
// Cannot complete just yet so buffer elements into a Node
// for use when completion occurs
Node
.
Builder
<
T
>
nb
=
task
.
helper
.
makeNodeBuilder
(
task
.
helper
.
exactOutputSizeIfKnown
(
rightSplit
),
size
->
(
T
[])
new
Object
[
size
]);
task
.
node
=
task
.
helper
.
wrapAndCopyInto
(
nb
,
rightSplit
).
build
();
task
.
spliterator
=
null
;
}
task
.
tryComplete
();
}
@Override
public
void
onCompletion
(
CountedCompleter
<?>
caller
)
{
spliterator
=
null
;
if
(
node
!=
null
)
{
// Dump any data from this leaf into the sink
synchronized
(
lock
)
{
// Dump buffered elements from this leaf into the sink
node
.
forEach
(
action
);
}
node
=
null
;
}
ForEachOrderedTask
<
S
,
T
>
victim
=
completionMap
.
remove
(
this
);
if
(
victim
!=
null
)
victim
.
tryComplete
();
else
if
(
spliterator
!=
null
)
{
// Dump elements output from this leaf's pipeline into the sink
helper
.
wrapAndCopyInto
(
action
,
spliterator
);
spliterator
=
null
;
}
// The completion of this task *and* the dumping of elements
// "happens-before" completion of the associated left-most leaf task
// of right subtree (if any, which can be this task's right sibling)
//
ForEachOrderedTask
<
S
,
T
>
leftDescendant
=
completionMap
.
remove
(
this
);
if
(
leftDescendant
!=
null
)
leftDescendant
.
tryComplete
();
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录