Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
openanolis
dragonwell8_jdk
提交
4efead92
D
dragonwell8_jdk
项目概览
openanolis
/
dragonwell8_jdk
通知
4
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
dragonwell8_jdk
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4efead92
编写于
8月 02, 2013
作者:
M
mullan
浏览文件
操作
浏览文件
下载
差异文件
Merge
上级
4eb69437
8c2a5a5d
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
1799 addition
and
1338 deletion
+1799
-1338
src/share/classes/java/util/concurrent/CompletableFuture.java
...share/classes/java/util/concurrent/CompletableFuture.java
+1037
-1332
src/share/classes/java/util/concurrent/CompletionStage.java
src/share/classes/java/util/concurrent/CompletionStage.java
+760
-0
test/ProblemList.txt
test/ProblemList.txt
+0
-6
test/java/util/concurrent/CompletableFuture/Basic.java
test/java/util/concurrent/CompletableFuture/Basic.java
+2
-0
未找到文件。
src/share/classes/java/util/concurrent/CompletableFuture.java
浏览文件 @
4efead92
...
...
@@ -48,13 +48,16 @@ import java.util.concurrent.ThreadLocalRandom;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.TimeoutException
;
import
java.util.concurrent.CancellationException
;
import
java.util.concurrent.CompletionException
;
import
java.util.concurrent.CompletionStage
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.concurrent.locks.LockSupport
;
/**
* A {@link Future} that may be explicitly completed (setting its
* value and status), and may include dependent functions and actions
* that trigger upon its completion.
* value and status), and may be used as a {@link CompletionStage},
* supporting dependent functions and actions that trigger upon its
* completion.
*
* <p>When two or more threads attempt to
* {@link #complete complete},
...
...
@@ -62,64 +65,50 @@ import java.util.concurrent.locks.LockSupport;
* {@link #cancel cancel}
* a CompletableFuture, only one of them succeeds.
*
* <p>Methods are available for adding dependents based on
* user-provided Functions, Consumers, or Runnables. The appropriate
* form to use depends on whether actions require arguments and/or
* produce results. Completion of a dependent action will trigger the
* completion of another CompletableFuture. Actions may also be
* triggered after either or both the current and another
* CompletableFuture complete. Multiple CompletableFutures may also
* be grouped as one using {@link #anyOf(CompletableFuture...)} and
* {@link #allOf(CompletableFuture...)}.
* <p>In addition to these and related methods for directly
* manipulating status and results, CompletableFuture implements
* interface {@link CompletionStage} with the following policies: <ul>
*
* <p>CompletableFutures themselves do not execute asynchronously.
* However, actions supplied for dependent completions of another
* CompletableFuture may do so, depending on whether they are provided
* via one of the <em>async</em> methods (that is, methods with names
* of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
* methods provide a way to commence asynchronous processing of an
* action using either a given {@link Executor} or by default the
* {@link ForkJoinPool#commonPool()}. To simplify monitoring,
* <li>Actions supplied for dependent completions of
* <em>non-async</em> methods may be performed by the thread that
* completes the current CompletableFuture, or by any other caller of
* a completion method.</li>
*
* <li>All <em>async</em> methods without an explicit Executor
* argument are performed using the {@link ForkJoinPool#commonPool()}
* (unless it does not support a parallelism level of at least two, in
* which case, a new Thread is used). To simplify monitoring,
* debugging, and tracking, all generated asynchronous tasks are
* instances of the marker interface {@link AsynchronousCompletionTask}.
* instances of the marker interface {@link
* AsynchronousCompletionTask}. </li>
*
* <p>Actions supplied for dependent completions of <em>non-async</em>
* methods may be performed by the thread that completes the current
* CompletableFuture, or by any other caller of these methods. There
* are no guarantees about the order of processing completions unless
* constrained by these methods.
* <li>All CompletionStage methods are implemented independently of
* other public methods, so the behavior of one method is not impacted
* by overrides of others in subclasses. </li> </ul>
*
* <p>Since (unlike {@link FutureTask}) this class has no direct
* control over the computation that causes it to be completed,
* cancellation is treated as just another form of exceptional completion.
* Method {@link #cancel cancel} has the same effect as
* {@code completeExceptionally(new CancellationException())}.
* <p>CompletableFuture also implements {@link Future} with the following
* policies: <ul>
*
* <p>Upon exceptional completion (including cancellation), or when a
* completion entails an additional computation which terminates
* abruptly with an (unchecked) exception or error, then all of their
* dependent completions (and their dependents in turn) generally act
* as {@code completeExceptionally} with a {@link CompletionException}
* holding that exception as its cause. However, the {@link
* #exceptionally exceptionally} and {@link #handle handle}
* completions <em>are</em> able to handle exceptional completions of
* the CompletableFutures they depend on.
* <li>Since (unlike {@link FutureTask}) this class has no direct
* control over the computation that causes it to be completed,
* cancellation is treated as just another form of exceptional
* completion. Method {@link #cancel cancel} has the same effect as
* {@code completeExceptionally(new CancellationException())}. Method
* {@link #isCompletedExceptionally} can be used to determine if a
* CompletableFuture completed in any exceptional fashion.</li>
*
* <
p
>In case of exceptional completion with a CompletionException,
* <
li
>In case of exceptional completion with a CompletionException,
* methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
* {@link ExecutionException} with the same cause as held in the
* corresponding CompletionException. However, in these cases,
* methods {@link #join()} and {@link #getNow} throw the
* CompletionException, which simplifies usage.
*
* <p>Arguments used to pass a completion result (that is, for parameters
* of type {@code T}) may be null, but passing a null value for any other
* parameter will result in a {@link NullPointerException} being thrown.
* corresponding CompletionException. To simplify usage in most
* contexts, this class also defines methods {@link #join()} and
* {@link #getNow} that instead throw the CompletionException directly
* in these cases.</li> </ul>
*
* @author Doug Lea
* @since 1.8
*/
public
class
CompletableFuture
<
T
>
implements
Future
<
T
>
{
public
class
CompletableFuture
<
T
>
implements
Future
<
T
>
,
CompletionStage
<
T
>
{
/*
* Overview:
...
...
@@ -438,6 +427,19 @@ public class CompletableFuture<T> implements Future<T> {
public
final
void
run
()
{
exec
();
}
}
/**
* Starts the given async task using the given executor, unless
* the executor is ForkJoinPool.commonPool and it has been
* disabled, in which case starts a new thread.
*/
static
void
execAsync
(
Executor
e
,
Async
r
)
{
if
(
e
==
ForkJoinPool
.
commonPool
()
&&
ForkJoinPool
.
getCommonPoolParallelism
()
<=
1
)
new
Thread
(
r
).
start
();
else
e
.
execute
(
r
);
}
static
final
class
AsyncRun
extends
Async
{
final
Runnable
fn
;
final
CompletableFuture
<
Void
>
dst
;
...
...
@@ -538,13 +540,13 @@ public class CompletableFuture<T> implements Future<T> {
static
final
class
AsyncAccept
<
T
>
extends
Async
{
final
T
arg
;
final
Consumer
<?
super
T
>
fn
;
final
CompletableFuture
<
Void
>
dst
;
final
CompletableFuture
<
?
>
dst
;
AsyncAccept
(
T
arg
,
Consumer
<?
super
T
>
fn
,
CompletableFuture
<
Void
>
dst
)
{
CompletableFuture
<
?
>
dst
)
{
this
.
arg
=
arg
;
this
.
fn
=
fn
;
this
.
dst
=
dst
;
}
public
final
boolean
exec
()
{
CompletableFuture
<
Void
>
d
;
Throwable
ex
;
CompletableFuture
<
?
>
d
;
Throwable
ex
;
if
((
d
=
this
.
dst
)
!=
null
&&
d
.
result
==
null
)
{
try
{
fn
.
accept
(
arg
);
...
...
@@ -563,14 +565,14 @@ public class CompletableFuture<T> implements Future<T> {
final
T
arg1
;
final
U
arg2
;
final
BiConsumer
<?
super
T
,?
super
U
>
fn
;
final
CompletableFuture
<
Void
>
dst
;
final
CompletableFuture
<
?
>
dst
;
AsyncAcceptBoth
(
T
arg1
,
U
arg2
,
BiConsumer
<?
super
T
,?
super
U
>
fn
,
CompletableFuture
<
Void
>
dst
)
{
CompletableFuture
<
?
>
dst
)
{
this
.
arg1
=
arg1
;
this
.
arg2
=
arg2
;
this
.
fn
=
fn
;
this
.
dst
=
dst
;
}
public
final
boolean
exec
()
{
CompletableFuture
<
Void
>
d
;
Throwable
ex
;
CompletableFuture
<
?
>
d
;
Throwable
ex
;
if
((
d
=
this
.
dst
)
!=
null
&&
d
.
result
==
null
)
{
try
{
fn
.
accept
(
arg1
,
arg2
);
...
...
@@ -587,10 +589,10 @@ public class CompletableFuture<T> implements Future<T> {
static
final
class
AsyncCompose
<
T
,
U
>
extends
Async
{
final
T
arg
;
final
Function
<?
super
T
,
CompletableFutur
e
<
U
>>
fn
;
final
Function
<?
super
T
,
?
extends
CompletionStag
e
<
U
>>
fn
;
final
CompletableFuture
<
U
>
dst
;
AsyncCompose
(
T
arg
,
Function
<?
super
T
,
CompletableFutur
e
<
U
>>
fn
,
Function
<?
super
T
,
?
extends
CompletionStag
e
<
U
>>
fn
,
CompletableFuture
<
U
>
dst
)
{
this
.
arg
=
arg
;
this
.
fn
=
fn
;
this
.
dst
=
dst
;
}
...
...
@@ -598,7 +600,8 @@ public class CompletableFuture<T> implements Future<T> {
CompletableFuture
<
U
>
d
,
fr
;
U
u
;
Throwable
ex
;
if
((
d
=
this
.
dst
)
!=
null
&&
d
.
result
==
null
)
{
try
{
fr
=
fn
.
apply
(
arg
);
CompletionStage
<
U
>
cs
=
fn
.
apply
(
arg
);
fr
=
(
cs
==
null
)
?
null
:
cs
.
toCompletableFuture
();
ex
=
(
fr
==
null
)
?
new
NullPointerException
()
:
null
;
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
...
...
@@ -626,6 +629,33 @@ public class CompletableFuture<T> implements Future<T> {
private
static
final
long
serialVersionUID
=
5232453952276885070L
;
}
static
final
class
AsyncWhenComplete
<
T
>
extends
Async
{
final
T
arg1
;
final
Throwable
arg2
;
final
BiConsumer
<?
super
T
,?
super
Throwable
>
fn
;
final
CompletableFuture
<
T
>
dst
;
AsyncWhenComplete
(
T
arg1
,
Throwable
arg2
,
BiConsumer
<?
super
T
,?
super
Throwable
>
fn
,
CompletableFuture
<
T
>
dst
)
{
this
.
arg1
=
arg1
;
this
.
arg2
=
arg2
;
this
.
fn
=
fn
;
this
.
dst
=
dst
;
}
public
final
boolean
exec
()
{
CompletableFuture
<
T
>
d
;
if
((
d
=
this
.
dst
)
!=
null
&&
d
.
result
==
null
)
{
Throwable
ex
=
arg2
;
try
{
fn
.
accept
(
arg1
,
ex
);
}
catch
(
Throwable
rex
)
{
if
(
ex
==
null
)
ex
=
rex
;
}
d
.
internalComplete
(
arg1
,
ex
);
}
return
true
;
}
private
static
final
long
serialVersionUID
=
5232453952276885070L
;
}
/* ------------- Completions -------------- */
/**
...
...
@@ -680,7 +710,7 @@ public class CompletableFuture<T> implements Future<T> {
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncApply
<
T
,
U
>(
t
,
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncApply
<
T
,
U
>(
t
,
fn
,
dst
));
else
u
=
fn
.
apply
(
t
);
}
catch
(
Throwable
rex
)
{
...
...
@@ -697,11 +727,11 @@ public class CompletableFuture<T> implements Future<T> {
static
final
class
ThenAccept
<
T
>
extends
Completion
{
final
CompletableFuture
<?
extends
T
>
src
;
final
Consumer
<?
super
T
>
fn
;
final
CompletableFuture
<
Void
>
dst
;
final
CompletableFuture
<
?
>
dst
;
final
Executor
executor
;
ThenAccept
(
CompletableFuture
<?
extends
T
>
src
,
Consumer
<?
super
T
>
fn
,
CompletableFuture
<
Void
>
dst
,
CompletableFuture
<
?
>
dst
,
Executor
executor
)
{
this
.
src
=
src
;
this
.
fn
=
fn
;
this
.
dst
=
dst
;
this
.
executor
=
executor
;
...
...
@@ -709,7 +739,7 @@ public class CompletableFuture<T> implements Future<T> {
public
final
void
run
()
{
final
CompletableFuture
<?
extends
T
>
a
;
final
Consumer
<?
super
T
>
fn
;
final
CompletableFuture
<
Void
>
dst
;
final
CompletableFuture
<
?
>
dst
;
Object
r
;
T
t
;
Throwable
ex
;
if
((
dst
=
this
.
dst
)
!=
null
&&
(
fn
=
this
.
fn
)
!=
null
&&
...
...
@@ -729,7 +759,7 @@ public class CompletableFuture<T> implements Future<T> {
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncAccept
<
T
>(
t
,
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncAccept
<
T
>(
t
,
fn
,
dst
));
else
fn
.
accept
(
t
);
}
catch
(
Throwable
rex
)
{
...
...
@@ -773,7 +803,7 @@ public class CompletableFuture<T> implements Future<T> {
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncRun
(
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncRun
(
fn
,
dst
));
else
fn
.
run
();
}
catch
(
Throwable
rex
)
{
...
...
@@ -839,7 +869,7 @@ public class CompletableFuture<T> implements Future<T> {
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncCombine
<
T
,
U
,
V
>(
t
,
u
,
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncCombine
<
T
,
U
,
V
>(
t
,
u
,
fn
,
dst
));
else
v
=
fn
.
apply
(
t
,
u
);
}
catch
(
Throwable
rex
)
{
...
...
@@ -904,7 +934,7 @@ public class CompletableFuture<T> implements Future<T> {
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncAcceptBoth
<
T
,
U
>(
t
,
u
,
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncAcceptBoth
<
T
,
U
>(
t
,
u
,
fn
,
dst
));
else
fn
.
accept
(
t
,
u
);
}
catch
(
Throwable
rex
)
{
...
...
@@ -956,7 +986,7 @@ public class CompletableFuture<T> implements Future<T> {
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncRun
(
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncRun
(
fn
,
dst
));
else
fn
.
run
();
}
catch
(
Throwable
rex
)
{
...
...
@@ -1042,7 +1072,7 @@ public class CompletableFuture<T> implements Future<T> {
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncApply
<
T
,
U
>(
t
,
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncApply
<
T
,
U
>(
t
,
fn
,
dst
));
else
u
=
fn
.
apply
(
t
);
}
catch
(
Throwable
rex
)
{
...
...
@@ -1095,7 +1125,7 @@ public class CompletableFuture<T> implements Future<T> {
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncAccept
<
T
>(
t
,
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncAccept
<
T
>(
t
,
fn
,
dst
));
else
fn
.
accept
(
t
);
}
catch
(
Throwable
rex
)
{
...
...
@@ -1143,7 +1173,7 @@ public class CompletableFuture<T> implements Future<T> {
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncRun
(
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncRun
(
fn
,
dst
));
else
fn
.
run
();
}
catch
(
Throwable
rex
)
{
...
...
@@ -1226,6 +1256,54 @@ public class CompletableFuture<T> implements Future<T> {
private
static
final
long
serialVersionUID
=
5232453952276885070L
;
}
static
final
class
WhenCompleteCompletion
<
T
>
extends
Completion
{
final
CompletableFuture
<?
extends
T
>
src
;
final
BiConsumer
<?
super
T
,
?
super
Throwable
>
fn
;
final
CompletableFuture
<
T
>
dst
;
final
Executor
executor
;
WhenCompleteCompletion
(
CompletableFuture
<?
extends
T
>
src
,
BiConsumer
<?
super
T
,
?
super
Throwable
>
fn
,
CompletableFuture
<
T
>
dst
,
Executor
executor
)
{
this
.
src
=
src
;
this
.
fn
=
fn
;
this
.
dst
=
dst
;
this
.
executor
=
executor
;
}
public
final
void
run
()
{
final
CompletableFuture
<?
extends
T
>
a
;
final
BiConsumer
<?
super
T
,
?
super
Throwable
>
fn
;
final
CompletableFuture
<
T
>
dst
;
Object
r
;
T
t
;
Throwable
ex
;
if
((
dst
=
this
.
dst
)
!=
null
&&
(
fn
=
this
.
fn
)
!=
null
&&
(
a
=
this
.
src
)
!=
null
&&
(
r
=
a
.
result
)
!=
null
&&
compareAndSet
(
0
,
1
))
{
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
Executor
e
=
executor
;
Throwable
dx
=
null
;
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncWhenComplete
<
T
>(
t
,
ex
,
fn
,
dst
));
else
fn
.
accept
(
t
,
ex
);
}
catch
(
Throwable
rex
)
{
dx
=
rex
;
}
if
(
e
==
null
||
dx
!=
null
)
dst
.
internalComplete
(
t
,
ex
!=
null
?
ex
:
dx
);
}
}
private
static
final
long
serialVersionUID
=
5232453952276885070L
;
}
static
final
class
ThenCopy
<
T
>
extends
Completion
{
final
CompletableFuture
<?>
src
;
final
CompletableFuture
<
T
>
dst
;
...
...
@@ -1286,10 +1364,13 @@ public class CompletableFuture<T> implements Future<T> {
final
CompletableFuture
<?
extends
T
>
src
;
final
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
;
final
CompletableFuture
<
U
>
dst
;
final
Executor
executor
;
HandleCompletion
(
CompletableFuture
<?
extends
T
>
src
,
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
,
CompletableFuture
<
U
>
dst
)
{
CompletableFuture
<
U
>
dst
,
Executor
executor
)
{
this
.
src
=
src
;
this
.
fn
=
fn
;
this
.
dst
=
dst
;
this
.
executor
=
executor
;
}
public
final
void
run
()
{
final
CompletableFuture
<?
extends
T
>
a
;
...
...
@@ -1310,13 +1391,19 @@ public class CompletableFuture<T> implements Future<T> {
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
U
u
=
null
;
Throwable
dx
=
null
;
Executor
e
=
executor
;
U
u
=
null
;
Throwable
dx
=
null
;
try
{
u
=
fn
.
apply
(
t
,
ex
);
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncCombine
<
T
,
Throwable
,
U
>(
t
,
ex
,
fn
,
dst
));
else
u
=
fn
.
apply
(
t
,
ex
);
}
catch
(
Throwable
rex
)
{
dx
=
rex
;
}
dst
.
internalComplete
(
u
,
dx
);
if
(
e
==
null
||
dx
!=
null
)
dst
.
internalComplete
(
u
,
dx
);
}
}
private
static
final
long
serialVersionUID
=
5232453952276885070L
;
...
...
@@ -1324,11 +1411,11 @@ public class CompletableFuture<T> implements Future<T> {
static
final
class
ThenCompose
<
T
,
U
>
extends
Completion
{
final
CompletableFuture
<?
extends
T
>
src
;
final
Function
<?
super
T
,
CompletableFutur
e
<
U
>>
fn
;
final
Function
<?
super
T
,
?
extends
CompletionStag
e
<
U
>>
fn
;
final
CompletableFuture
<
U
>
dst
;
final
Executor
executor
;
ThenCompose
(
CompletableFuture
<?
extends
T
>
src
,
Function
<?
super
T
,
CompletableFutur
e
<
U
>>
fn
,
Function
<?
super
T
,
?
extends
CompletionStag
e
<
U
>>
fn
,
CompletableFuture
<
U
>
dst
,
Executor
executor
)
{
this
.
src
=
src
;
this
.
fn
=
fn
;
this
.
dst
=
dst
;
...
...
@@ -1336,7 +1423,7 @@ public class CompletableFuture<T> implements Future<T> {
}
public
final
void
run
()
{
final
CompletableFuture
<?
extends
T
>
a
;
final
Function
<?
super
T
,
CompletableFutur
e
<
U
>>
fn
;
final
Function
<?
super
T
,
?
extends
CompletionStag
e
<
U
>>
fn
;
final
CompletableFuture
<
U
>
dst
;
Object
r
;
T
t
;
Throwable
ex
;
Executor
e
;
if
((
dst
=
this
.
dst
)
!=
null
&&
...
...
@@ -1358,10 +1445,12 @@ public class CompletableFuture<T> implements Future<T> {
boolean
complete
=
false
;
if
(
ex
==
null
)
{
if
((
e
=
executor
)
!=
null
)
e
.
execute
(
new
AsyncCompose
<
T
,
U
>(
t
,
fn
,
dst
));
e
xecAsync
(
e
,
new
AsyncCompose
<
T
,
U
>(
t
,
fn
,
dst
));
else
{
try
{
if
((
c
=
fn
.
apply
(
t
))
==
null
)
CompletionStage
<
U
>
cs
=
fn
.
apply
(
t
);
c
=
(
cs
==
null
)
?
null
:
cs
.
toCompletableFuture
();
if
(
c
==
null
)
ex
=
new
NullPointerException
();
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
...
...
@@ -1401,84 +1490,697 @@ public class CompletableFuture<T> implements Future<T> {
private
static
final
long
serialVersionUID
=
5232453952276885070L
;
}
// public methods
/**
* Creates a new incomplete CompletableFuture.
*/
public
CompletableFuture
()
{
}
// Implementations of stage methods with (plain, async, Executor) forms
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} with
* the value obtained by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @return the new CompletableFuture
*/
public
static
<
U
>
CompletableFuture
<
U
>
supplyAsync
(
Supplier
<
U
>
supplier
)
{
if
(
supplier
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
f
=
new
CompletableFuture
<
U
>();
ForkJoinPool
.
commonPool
().
execute
((
ForkJoinTask
<?>)
new
AsyncSupply
<
U
>(
supplier
,
f
));
return
f
;
private
<
U
>
CompletableFuture
<
U
>
doThenApply
(
Function
<?
super
T
,?
extends
U
>
fn
,
Executor
e
)
{
if
(
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
dst
=
new
CompletableFuture
<
U
>();
ThenApply
<
T
,
U
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
ThenApply
<
T
,
U
>(
this
,
fn
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
U
u
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncApply
<
T
,
U
>(
t
,
fn
,
dst
));
else
u
=
fn
.
apply
(
t
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
u
,
ex
);
}
helpPostComplete
();
return
dst
;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
static
<
U
>
CompletableFuture
<
U
>
supplyAsync
(
Supplier
<
U
>
supplier
,
Executor
executor
)
{
if
(
executor
==
null
||
supplier
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
f
=
new
CompletableFuture
<
U
>();
executor
.
execute
(
new
AsyncSupply
<
U
>(
supplier
,
f
));
return
f
;
private
CompletableFuture
<
Void
>
doThenAccept
(
Consumer
<?
super
T
>
fn
,
Executor
e
)
{
if
(
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
ThenAccept
<
T
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
ThenAccept
<
T
>(
this
,
fn
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncAccept
<
T
>(
t
,
fn
,
dst
));
else
fn
.
accept
(
t
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
return
dst
;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} after
* it runs the given action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
static
CompletableFuture
<
Void
>
runAsync
(
Runnable
runnable
)
{
if
(
runnable
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
f
=
new
CompletableFuture
<
Void
>();
ForkJoinPool
.
commonPool
().
execute
((
ForkJoinTask
<?>)
new
AsyncRun
(
runnable
,
f
));
return
f
;
private
CompletableFuture
<
Void
>
doThenRun
(
Runnable
action
,
Executor
e
)
{
if
(
action
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
ThenRun
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
ThenRun
(
this
,
action
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
Throwable
ex
;
if
(
r
instanceof
AltResult
)
ex
=
((
AltResult
)
r
).
ex
;
else
ex
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncRun
(
action
,
dst
));
else
action
.
run
();
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
return
dst
;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor after it runs the given
* action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
static
CompletableFuture
<
Void
>
runAsync
(
Runnable
runnable
,
Executor
executor
)
{
if
(
executor
==
null
||
runnable
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
f
=
new
CompletableFuture
<
Void
>();
executor
.
execute
(
new
AsyncRun
(
runnable
,
f
));
return
f
;
private
<
U
,
V
>
CompletableFuture
<
V
>
doThenCombine
(
CompletableFuture
<?
extends
U
>
other
,
BiFunction
<?
super
T
,?
super
U
,?
extends
V
>
fn
,
Executor
e
)
{
if
(
other
==
null
||
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
V
>
dst
=
new
CompletableFuture
<
V
>();
ThenCombine
<
T
,
U
,
V
>
d
=
null
;
Object
r
,
s
=
null
;
if
((
r
=
result
)
==
null
||
(
s
=
other
.
result
)
==
null
)
{
d
=
new
ThenCombine
<
T
,
U
,
V
>(
this
,
other
,
fn
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
==
null
&&
(
r
=
result
)
==
null
)
||
(
s
==
null
&&
(
s
=
other
.
result
)
==
null
))
{
if
(
q
!=
null
)
{
if
(
s
!=
null
||
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
r
!=
null
||
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
{
if
(
s
!=
null
)
break
;
q
=
new
CompletionNode
(
d
);
}
}
}
if
(
r
!=
null
&&
s
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
U
u
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
!=
null
)
u
=
null
;
else
if
(
s
instanceof
AltResult
)
{
ex
=
((
AltResult
)
s
).
ex
;
u
=
null
;
}
else
{
@SuppressWarnings
(
"unchecked"
)
U
us
=
(
U
)
s
;
u
=
us
;
}
V
v
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncCombine
<
T
,
U
,
V
>(
t
,
u
,
fn
,
dst
));
else
v
=
fn
.
apply
(
t
,
u
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
v
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
}
private
<
U
>
CompletableFuture
<
Void
>
doThenAcceptBoth
(
CompletableFuture
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,?
super
U
>
fn
,
Executor
e
)
{
if
(
other
==
null
||
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
ThenAcceptBoth
<
T
,
U
>
d
=
null
;
Object
r
,
s
=
null
;
if
((
r
=
result
)
==
null
||
(
s
=
other
.
result
)
==
null
)
{
d
=
new
ThenAcceptBoth
<
T
,
U
>(
this
,
other
,
fn
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
==
null
&&
(
r
=
result
)
==
null
)
||
(
s
==
null
&&
(
s
=
other
.
result
)
==
null
))
{
if
(
q
!=
null
)
{
if
(
s
!=
null
||
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
r
!=
null
||
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
{
if
(
s
!=
null
)
break
;
q
=
new
CompletionNode
(
d
);
}
}
}
if
(
r
!=
null
&&
s
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
U
u
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
!=
null
)
u
=
null
;
else
if
(
s
instanceof
AltResult
)
{
ex
=
((
AltResult
)
s
).
ex
;
u
=
null
;
}
else
{
@SuppressWarnings
(
"unchecked"
)
U
us
=
(
U
)
s
;
u
=
us
;
}
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncAcceptBoth
<
T
,
U
>(
t
,
u
,
fn
,
dst
));
else
fn
.
accept
(
t
,
u
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
}
private
CompletableFuture
<
Void
>
doRunAfterBoth
(
CompletableFuture
<?>
other
,
Runnable
action
,
Executor
e
)
{
if
(
other
==
null
||
action
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
RunAfterBoth
d
=
null
;
Object
r
,
s
=
null
;
if
((
r
=
result
)
==
null
||
(
s
=
other
.
result
)
==
null
)
{
d
=
new
RunAfterBoth
(
this
,
other
,
action
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
==
null
&&
(
r
=
result
)
==
null
)
||
(
s
==
null
&&
(
s
=
other
.
result
)
==
null
))
{
if
(
q
!=
null
)
{
if
(
s
!=
null
||
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
r
!=
null
||
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
{
if
(
s
!=
null
)
break
;
q
=
new
CompletionNode
(
d
);
}
}
}
if
(
r
!=
null
&&
s
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
Throwable
ex
;
if
(
r
instanceof
AltResult
)
ex
=
((
AltResult
)
r
).
ex
;
else
ex
=
null
;
if
(
ex
==
null
&&
(
s
instanceof
AltResult
))
ex
=
((
AltResult
)
s
).
ex
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncRun
(
action
,
dst
));
else
action
.
run
();
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
}
private
<
U
>
CompletableFuture
<
U
>
doApplyToEither
(
CompletableFuture
<?
extends
T
>
other
,
Function
<?
super
T
,
U
>
fn
,
Executor
e
)
{
if
(
other
==
null
||
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
dst
=
new
CompletableFuture
<
U
>();
ApplyToEither
<
T
,
U
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
d
=
new
ApplyToEither
<
T
,
U
>(
this
,
other
,
fn
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
if
(
q
!=
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
q
=
new
CompletionNode
(
d
);
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
U
u
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncApply
<
T
,
U
>(
t
,
fn
,
dst
));
else
u
=
fn
.
apply
(
t
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
u
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
}
private
CompletableFuture
<
Void
>
doAcceptEither
(
CompletableFuture
<?
extends
T
>
other
,
Consumer
<?
super
T
>
fn
,
Executor
e
)
{
if
(
other
==
null
||
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
AcceptEither
<
T
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
d
=
new
AcceptEither
<
T
>(
this
,
other
,
fn
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
if
(
q
!=
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
q
=
new
CompletionNode
(
d
);
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncAccept
<
T
>(
t
,
fn
,
dst
));
else
fn
.
accept
(
t
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
}
private
CompletableFuture
<
Void
>
doRunAfterEither
(
CompletableFuture
<?>
other
,
Runnable
action
,
Executor
e
)
{
if
(
other
==
null
||
action
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
RunAfterEither
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
d
=
new
RunAfterEither
(
this
,
other
,
action
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
if
(
q
!=
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
q
=
new
CompletionNode
(
d
);
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
Throwable
ex
;
if
(
r
instanceof
AltResult
)
ex
=
((
AltResult
)
r
).
ex
;
else
ex
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncRun
(
action
,
dst
));
else
action
.
run
();
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
}
private
<
U
>
CompletableFuture
<
U
>
doThenCompose
(
Function
<?
super
T
,
?
extends
CompletionStage
<
U
>>
fn
,
Executor
e
)
{
if
(
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
dst
=
null
;
ThenCompose
<
T
,
U
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
dst
=
new
CompletableFuture
<
U
>();
CompletionNode
p
=
new
CompletionNode
(
d
=
new
ThenCompose
<
T
,
U
>(
this
,
fn
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
==
null
)
{
if
(
e
!=
null
)
{
if
(
dst
==
null
)
dst
=
new
CompletableFuture
<
U
>();
execAsync
(
e
,
new
AsyncCompose
<
T
,
U
>(
t
,
fn
,
dst
));
}
else
{
try
{
CompletionStage
<
U
>
cs
=
fn
.
apply
(
t
);
if
(
cs
==
null
||
(
dst
=
cs
.
toCompletableFuture
())
==
null
)
ex
=
new
NullPointerException
();
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
}
if
(
dst
==
null
)
dst
=
new
CompletableFuture
<
U
>();
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
dst
.
helpPostComplete
();
return
dst
;
}
private
CompletableFuture
<
T
>
doWhenComplete
(
BiConsumer
<?
super
T
,
?
super
Throwable
>
fn
,
Executor
e
)
{
if
(
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
T
>
dst
=
new
CompletableFuture
<
T
>();
WhenCompleteCompletion
<
T
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
WhenCompleteCompletion
<
T
>
(
this
,
fn
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
Throwable
dx
=
null
;
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncWhenComplete
<
T
>(
t
,
ex
,
fn
,
dst
));
else
fn
.
accept
(
t
,
ex
);
}
catch
(
Throwable
rex
)
{
dx
=
rex
;
}
if
(
e
==
null
||
dx
!=
null
)
dst
.
internalComplete
(
t
,
ex
!=
null
?
ex
:
dx
);
}
helpPostComplete
();
return
dst
;
}
private
<
U
>
CompletableFuture
<
U
>
doHandle
(
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
,
Executor
e
)
{
if
(
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
dst
=
new
CompletableFuture
<
U
>();
HandleCompletion
<
T
,
U
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
HandleCompletion
<
T
,
U
>
(
this
,
fn
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
U
u
=
null
;
Throwable
dx
=
null
;
try
{
if
(
e
!=
null
)
execAsync
(
e
,
new
AsyncCombine
<
T
,
Throwable
,
U
>(
t
,
ex
,
fn
,
dst
));
else
{
u
=
fn
.
apply
(
t
,
ex
);
dx
=
null
;
}
}
catch
(
Throwable
rex
)
{
dx
=
rex
;
u
=
null
;
}
if
(
e
==
null
||
dx
!=
null
)
dst
.
internalComplete
(
u
,
dx
);
}
helpPostComplete
();
return
dst
;
}
// public methods
/**
* Creates a new incomplete CompletableFuture.
*/
public
CompletableFuture
()
{
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} with
* the value obtained by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public
static
<
U
>
CompletableFuture
<
U
>
supplyAsync
(
Supplier
<
U
>
supplier
)
{
if
(
supplier
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
f
=
new
CompletableFuture
<
U
>();
execAsync
(
ForkJoinPool
.
commonPool
(),
new
AsyncSupply
<
U
>(
supplier
,
f
));
return
f
;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public
static
<
U
>
CompletableFuture
<
U
>
supplyAsync
(
Supplier
<
U
>
supplier
,
Executor
executor
)
{
if
(
executor
==
null
||
supplier
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
f
=
new
CompletableFuture
<
U
>();
execAsync
(
executor
,
new
AsyncSupply
<
U
>(
supplier
,
f
));
return
f
;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} after
* it runs the given action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
static
CompletableFuture
<
Void
>
runAsync
(
Runnable
runnable
)
{
if
(
runnable
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
f
=
new
CompletableFuture
<
Void
>();
execAsync
(
ForkJoinPool
.
commonPool
(),
new
AsyncRun
(
runnable
,
f
));
return
f
;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor after it runs the given
* action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
static
CompletableFuture
<
Void
>
runAsync
(
Runnable
runnable
,
Executor
executor
)
{
if
(
executor
==
null
||
runnable
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
f
=
new
CompletableFuture
<
Void
>();
execAsync
(
executor
,
new
AsyncRun
(
runnable
,
f
));
return
f
;
}
/**
...
...
@@ -1486,6 +2188,7 @@ public class CompletableFuture<T> implements Future<T> {
* the given value.
*
* @param value the value
* @param <U> the type of the value
* @return the completed CompletableFuture
*/
public
static
<
U
>
CompletableFuture
<
U
>
completedFuture
(
U
value
)
{
...
...
@@ -1620,1247 +2323,284 @@ public class CompletableFuture<T> implements Future<T> {
return
null
;
if
(
ex
instanceof
CancellationException
)
throw
(
CancellationException
)
ex
;
if
(
ex
instanceof
CompletionException
)
throw
(
CompletionException
)
ex
;
throw
new
CompletionException
(
ex
);
}
/**
* If not already completed, sets the value returned by {@link
* #get()} and related methods to the given value.
*
* @param value the result value
* @return {@code true} if this invocation caused this CompletableFuture
* to transition to a completed state, else {@code false}
*/
public
boolean
complete
(
T
value
)
{
boolean
triggered
=
result
==
null
&&
UNSAFE
.
compareAndSwapObject
(
this
,
RESULT
,
null
,
value
==
null
?
NIL
:
value
);
postComplete
();
return
triggered
;
}
/**
* If not already completed, causes invocations of {@link #get()}
* and related methods to throw the given exception.
*
* @param ex the exception
* @return {@code true} if this invocation caused this CompletableFuture
* to transition to a completed state, else {@code false}
*/
public
boolean
completeExceptionally
(
Throwable
ex
)
{
if
(
ex
==
null
)
throw
new
NullPointerException
();
boolean
triggered
=
result
==
null
&&
UNSAFE
.
compareAndSwapObject
(
this
,
RESULT
,
null
,
new
AltResult
(
ex
));
postComplete
();
return
triggered
;
}
/**
* Returns a new CompletableFuture that is completed
* when this CompletableFuture completes, with the result of the
* given function of this CompletableFuture's result.
*
* <p>If this CompletableFuture completes exceptionally, or the
* supplied function throws an exception, then the returned
* CompletableFuture completes exceptionally with a
* CompletionException holding the exception as its cause.
*
* @param fn the function to use to compute the value of
* the returned CompletableFuture
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
thenApply
(
Function
<?
super
T
,?
extends
U
>
fn
)
{
return
doThenApply
(
fn
,
null
);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when this CompletableFuture completes, with the result of the
* given function of this CompletableFuture's result from a
* task running in the {@link ForkJoinPool#commonPool()}.
*
* <p>If this CompletableFuture completes exceptionally, or the
* supplied function throws an exception, then the returned
* CompletableFuture completes exceptionally with a
* CompletionException holding the exception as its cause.
*
* @param fn the function to use to compute the value of
* the returned CompletableFuture
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
thenApplyAsync
(
Function
<?
super
T
,?
extends
U
>
fn
)
{
return
doThenApply
(
fn
,
ForkJoinPool
.
commonPool
());
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when this CompletableFuture completes, with the result of the
* given function of this CompletableFuture's result from a
* task running in the given executor.
*
* <p>If this CompletableFuture completes exceptionally, or the
* supplied function throws an exception, then the returned
* CompletableFuture completes exceptionally with a
* CompletionException holding the exception as its cause.
*
* @param fn the function to use to compute the value of
* the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
thenApplyAsync
(
Function
<?
super
T
,?
extends
U
>
fn
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doThenApply
(
fn
,
executor
);
}
private
<
U
>
CompletableFuture
<
U
>
doThenApply
(
Function
<?
super
T
,?
extends
U
>
fn
,
Executor
e
)
{
if
(
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
dst
=
new
CompletableFuture
<
U
>();
ThenApply
<
T
,
U
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
ThenApply
<
T
,
U
>(
this
,
fn
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
U
u
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncApply
<
T
,
U
>(
t
,
fn
,
dst
));
else
u
=
fn
.
apply
(
t
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
u
,
ex
);
}
helpPostComplete
();
return
dst
;
if
(
ex
instanceof
CompletionException
)
throw
(
CompletionException
)
ex
;
throw
new
CompletionException
(
ex
);
}
/**
* Returns a new CompletableFuture that is completed
* when this CompletableFuture completes, after performing the given
* action with this CompletableFuture's result.
*
* <p>If this CompletableFuture completes exceptionally, or the
* supplied action throws an exception, then the returned
* CompletableFuture completes exceptionally with a
* CompletionException holding the exception as its cause.
* If not already completed, sets the value returned by {@link
* #get()} and related methods to the given value.
*
* @param
block the action to perform before completing th
e
*
returned
CompletableFuture
*
@return the new CompletableFuture
* @param
value the result valu
e
*
@return {@code true} if this invocation caused this
CompletableFuture
*
to transition to a completed state, else {@code false}
*/
public
CompletableFuture
<
Void
>
thenAccept
(
Consumer
<?
super
T
>
block
)
{
return
doThenAccept
(
block
,
null
);
public
boolean
complete
(
T
value
)
{
boolean
triggered
=
result
==
null
&&
UNSAFE
.
compareAndSwapObject
(
this
,
RESULT
,
null
,
value
==
null
?
NIL
:
value
);
postComplete
();
return
triggered
;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when this CompletableFuture completes, after performing the given
* action with this CompletableFuture's result from a task running
* in the {@link ForkJoinPool#commonPool()}.
*
* <p>If this CompletableFuture completes exceptionally, or the
* supplied action throws an exception, then the returned
* CompletableFuture completes exceptionally with a
* CompletionException holding the exception as its cause.
* If not already completed, causes invocations of {@link #get()}
* and related methods to throw the given exception.
*
* @param
block the action to perform before completing the
*
returned
CompletableFuture
*
@return the new CompletableFuture
* @param
ex the exception
*
@return {@code true} if this invocation caused this
CompletableFuture
*
to transition to a completed state, else {@code false}
*/
public
CompletableFuture
<
Void
>
thenAcceptAsync
(
Consumer
<?
super
T
>
block
)
{
return
doThenAccept
(
block
,
ForkJoinPool
.
commonPool
());
public
boolean
completeExceptionally
(
Throwable
ex
)
{
if
(
ex
==
null
)
throw
new
NullPointerException
();
boolean
triggered
=
result
==
null
&&
UNSAFE
.
compareAndSwapObject
(
this
,
RESULT
,
null
,
new
AltResult
(
ex
));
postComplete
();
return
triggered
;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when this CompletableFuture completes, after performing the given
* action with this CompletableFuture's result from a task running
* in the given executor.
*
* <p>If this CompletableFuture completes exceptionally, or the
* supplied action throws an exception, then the returned
* CompletableFuture completes exceptionally with a
* CompletionException holding the exception as its cause.
*
* @param block the action to perform before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
thenAcceptAsync
(
Consumer
<?
super
T
>
block
,
Executor
executor
)
{
// CompletionStage methods
public
<
U
>
CompletableFuture
<
U
>
thenApply
(
Function
<?
super
T
,?
extends
U
>
fn
)
{
return
doThenApply
(
fn
,
null
);
}
public
<
U
>
CompletableFuture
<
U
>
thenApplyAsync
(
Function
<?
super
T
,?
extends
U
>
fn
)
{
return
doThenApply
(
fn
,
ForkJoinPool
.
commonPool
());
}
public
<
U
>
CompletableFuture
<
U
>
thenApplyAsync
(
Function
<?
super
T
,?
extends
U
>
fn
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doThenA
ccept
(
block
,
executor
);
return
doThenA
pply
(
fn
,
executor
);
}
private
CompletableFuture
<
Void
>
doThenAccept
(
Consumer
<?
super
T
>
fn
,
Executor
e
)
{
if
(
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
ThenAccept
<
T
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
ThenAccept
<
T
>(
this
,
fn
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncAccept
<
T
>(
t
,
fn
,
dst
));
else
fn
.
accept
(
t
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
return
dst
;
public
CompletableFuture
<
Void
>
thenAccept
(
Consumer
<?
super
T
>
action
)
{
return
doThenAccept
(
action
,
null
);
}
/**
* Returns a new CompletableFuture that is completed
* when this CompletableFuture completes, after performing the given
* action.
*
* <p>If this CompletableFuture completes exceptionally, or the
* supplied action throws an exception, then the returned
* CompletableFuture completes exceptionally with a
* CompletionException holding the exception as its cause.
*
* @param action the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
thenRun
(
Runnable
action
)
{
public
CompletableFuture
<
Void
>
thenAcceptAsync
(
Consumer
<?
super
T
>
action
)
{
return
doThenAccept
(
action
,
ForkJoinPool
.
commonPool
());
}
public
CompletableFuture
<
Void
>
thenAcceptAsync
(
Consumer
<?
super
T
>
action
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doThenAccept
(
action
,
executor
);
}
public
CompletableFuture
<
Void
>
thenRun
(
Runnable
action
)
{
return
doThenRun
(
action
,
null
);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when this CompletableFuture completes, after performing the given
* action from a task running in the {@link ForkJoinPool#commonPool()}.
*
* <p>If this CompletableFuture completes exceptionally, or the
* supplied action throws an exception, then the returned
* CompletableFuture completes exceptionally with a
* CompletionException holding the exception as its cause.
*
* @param action the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
thenRunAsync
(
Runnable
action
)
{
public
CompletableFuture
<
Void
>
thenRunAsync
(
Runnable
action
)
{
return
doThenRun
(
action
,
ForkJoinPool
.
commonPool
());
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when this CompletableFuture completes, after performing the given
* action from a task running in the given executor.
*
* <p>If this CompletableFuture completes exceptionally, or the
* supplied action throws an exception, then the returned
* CompletableFuture completes exceptionally with a
* CompletionException holding the exception as its cause.
*
* @param action the action to perform before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
thenRunAsync
(
Runnable
action
,
Executor
executor
)
{
public
CompletableFuture
<
Void
>
thenRunAsync
(
Runnable
action
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doThenRun
(
action
,
executor
);
}
private
CompletableFuture
<
Void
>
doThenRun
(
Runnable
action
,
Executor
e
)
{
if
(
action
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
ThenRun
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
ThenRun
(
this
,
action
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
Throwable
ex
;
if
(
r
instanceof
AltResult
)
ex
=
((
AltResult
)
r
).
ex
;
else
ex
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncRun
(
action
,
dst
));
else
action
.
run
();
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
return
dst
;
}
/**
* Returns a new CompletableFuture that is completed
* when both this and the other given CompletableFuture complete,
* with the result of the given function of the results of the two
* CompletableFutures.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, or the supplied function throws an exception,
* then the returned CompletableFuture completes exceptionally
* with a CompletionException holding the exception as its cause.
*
* @param other the other CompletableFuture
* @param fn the function to use to compute the value of
* the returned CompletableFuture
* @return the new CompletableFuture
*/
public
<
U
,
V
>
CompletableFuture
<
V
>
thenCombine
(
Complet
ableFutur
e
<?
extends
U
>
other
,
(
Complet
ionStag
e
<?
extends
U
>
other
,
BiFunction
<?
super
T
,?
super
U
,?
extends
V
>
fn
)
{
return
doThenCombine
(
other
,
fn
,
null
);
return
doThenCombine
(
other
.
toCompletableFuture
()
,
fn
,
null
);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when both this and the other given CompletableFuture complete,
* with the result of the given function of the results of the two
* CompletableFutures from a task running in the
* {@link ForkJoinPool#commonPool()}.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, or the supplied function throws an exception,
* then the returned CompletableFuture completes exceptionally
* with a CompletionException holding the exception as its cause.
*
* @param other the other CompletableFuture
* @param fn the function to use to compute the value of
* the returned CompletableFuture
* @return the new CompletableFuture
*/
public
<
U
,
V
>
CompletableFuture
<
V
>
thenCombineAsync
(
Complet
ableFutur
e
<?
extends
U
>
other
,
(
Complet
ionStag
e
<?
extends
U
>
other
,
BiFunction
<?
super
T
,?
super
U
,?
extends
V
>
fn
)
{
return
doThenCombine
(
other
,
fn
,
ForkJoinPool
.
commonPool
());
return
doThenCombine
(
other
.
toCompletableFuture
(),
fn
,
ForkJoinPool
.
commonPool
());
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when both this and the other given CompletableFuture complete,
* with the result of the given function of the results of the two
* CompletableFutures from a task running in the given executor.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, or the supplied function throws an exception,
* then the returned CompletableFuture completes exceptionally
* with a CompletionException holding the exception as its cause.
*
* @param other the other CompletableFuture
* @param fn the function to use to compute the value of
* the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
<
U
,
V
>
CompletableFuture
<
V
>
thenCombineAsync
(
Complet
ableFutur
e
<?
extends
U
>
other
,
(
Complet
ionStag
e
<?
extends
U
>
other
,
BiFunction
<?
super
T
,?
super
U
,?
extends
V
>
fn
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doThenCombine
(
other
,
fn
,
executor
);
}
private
<
U
,
V
>
CompletableFuture
<
V
>
doThenCombine
(
CompletableFuture
<?
extends
U
>
other
,
BiFunction
<?
super
T
,?
super
U
,?
extends
V
>
fn
,
Executor
e
)
{
if
(
other
==
null
||
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
V
>
dst
=
new
CompletableFuture
<
V
>();
ThenCombine
<
T
,
U
,
V
>
d
=
null
;
Object
r
,
s
=
null
;
if
((
r
=
result
)
==
null
||
(
s
=
other
.
result
)
==
null
)
{
d
=
new
ThenCombine
<
T
,
U
,
V
>(
this
,
other
,
fn
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
==
null
&&
(
r
=
result
)
==
null
)
||
(
s
==
null
&&
(
s
=
other
.
result
)
==
null
))
{
if
(
q
!=
null
)
{
if
(
s
!=
null
||
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
r
!=
null
||
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
{
if
(
s
!=
null
)
break
;
q
=
new
CompletionNode
(
d
);
}
}
}
if
(
r
!=
null
&&
s
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
U
u
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
!=
null
)
u
=
null
;
else
if
(
s
instanceof
AltResult
)
{
ex
=
((
AltResult
)
s
).
ex
;
u
=
null
;
}
else
{
@SuppressWarnings
(
"unchecked"
)
U
us
=
(
U
)
s
;
u
=
us
;
}
V
v
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncCombine
<
T
,
U
,
V
>(
t
,
u
,
fn
,
dst
));
else
v
=
fn
.
apply
(
t
,
u
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
v
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
return
doThenCombine
(
other
.
toCompletableFuture
(),
fn
,
executor
);
}
/**
* Returns a new CompletableFuture that is completed
* when both this and the other given CompletableFuture complete,
* after performing the given action with the results of the two
* CompletableFutures.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, or the supplied action throws an exception,
* then the returned CompletableFuture completes exceptionally
* with a CompletionException holding the exception as its cause.
*
* @param other the other CompletableFuture
* @param block the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
Void
>
thenAcceptBoth
(
Complet
ableFutur
e
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,
?
super
U
>
block
)
{
return
doThenAcceptBoth
(
other
,
block
,
null
);
(
Complet
ionStag
e
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,
?
super
U
>
action
)
{
return
doThenAcceptBoth
(
other
.
toCompletableFuture
(),
action
,
null
);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when both this and the other given CompletableFuture complete,
* after performing the given action with the results of the two
* CompletableFutures from a task running in the {@link
* ForkJoinPool#commonPool()}.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, or the supplied action throws an exception,
* then the returned CompletableFuture completes exceptionally
* with a CompletionException holding the exception as its cause.
*
* @param other the other CompletableFuture
* @param block the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
Void
>
thenAcceptBothAsync
(
CompletableFuture
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,
?
super
U
>
block
)
{
return
doThenAcceptBoth
(
other
,
block
,
ForkJoinPool
.
commonPool
());
(
CompletionStage
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,
?
super
U
>
action
)
{
return
doThenAcceptBoth
(
other
.
toCompletableFuture
(),
action
,
ForkJoinPool
.
commonPool
());
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when both this and the other given CompletableFuture complete,
* after performing the given action with the results of the two
* CompletableFutures from a task running in the given executor.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, or the supplied action throws an exception,
* then the returned CompletableFuture completes exceptionally
* with a CompletionException holding the exception as its cause.
*
* @param other the other CompletableFuture
* @param block the action to perform before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
Void
>
thenAcceptBothAsync
(
Complet
ableFutur
e
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,
?
super
U
>
block
,
(
Complet
ionStag
e
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,
?
super
U
>
action
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doThenAcceptBoth
(
other
,
block
,
executor
);
}
private
<
U
>
CompletableFuture
<
Void
>
doThenAcceptBoth
(
CompletableFuture
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,?
super
U
>
fn
,
Executor
e
)
{
if
(
other
==
null
||
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
ThenAcceptBoth
<
T
,
U
>
d
=
null
;
Object
r
,
s
=
null
;
if
((
r
=
result
)
==
null
||
(
s
=
other
.
result
)
==
null
)
{
d
=
new
ThenAcceptBoth
<
T
,
U
>(
this
,
other
,
fn
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
==
null
&&
(
r
=
result
)
==
null
)
||
(
s
==
null
&&
(
s
=
other
.
result
)
==
null
))
{
if
(
q
!=
null
)
{
if
(
s
!=
null
||
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
r
!=
null
||
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
{
if
(
s
!=
null
)
break
;
q
=
new
CompletionNode
(
d
);
}
}
}
if
(
r
!=
null
&&
s
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
U
u
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
!=
null
)
u
=
null
;
else
if
(
s
instanceof
AltResult
)
{
ex
=
((
AltResult
)
s
).
ex
;
u
=
null
;
}
else
{
@SuppressWarnings
(
"unchecked"
)
U
us
=
(
U
)
s
;
u
=
us
;
}
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncAcceptBoth
<
T
,
U
>(
t
,
u
,
fn
,
dst
));
else
fn
.
accept
(
t
,
u
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
return
doThenAcceptBoth
(
other
.
toCompletableFuture
(),
action
,
executor
);
}
/**
* Returns a new CompletableFuture that is completed
* when both this and the other given CompletableFuture complete,
* after performing the given action.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, or the supplied action throws an exception,
* then the returned CompletableFuture completes exceptionally
* with a CompletionException holding the exception as its cause.
*
* @param other the other CompletableFuture
* @param action the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
runAfterBoth
(
CompletableFuture
<?>
other
,
Runnable
action
)
{
return
doRunAfterBoth
(
other
,
action
,
null
);
public
CompletableFuture
<
Void
>
runAfterBoth
(
CompletionStage
<?>
other
,
Runnable
action
)
{
return
doRunAfterBoth
(
other
.
toCompletableFuture
(),
action
,
null
);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when both this and the other given CompletableFuture complete,
* after performing the given action from a task running in the
* {@link ForkJoinPool#commonPool()}.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, or the supplied action throws an exception,
* then the returned CompletableFuture completes exceptionally
* with a CompletionException holding the exception as its cause.
*
* @param other the other CompletableFuture
* @param action the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
runAfterBothAsync
(
CompletableFuture
<?>
other
,
Runnable
action
)
{
return
doRunAfterBoth
(
other
,
action
,
ForkJoinPool
.
commonPool
());
public
CompletableFuture
<
Void
>
runAfterBothAsync
(
CompletionStage
<?>
other
,
Runnable
action
)
{
return
doRunAfterBoth
(
other
.
toCompletableFuture
(),
action
,
ForkJoinPool
.
commonPool
());
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when both this and the other given CompletableFuture complete,
* after performing the given action from a task running in the
* given executor.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, or the supplied action throws an exception,
* then the returned CompletableFuture completes exceptionally
* with a CompletionException holding the exception as its cause.
*
* @param other the other CompletableFuture
* @param action the action to perform before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
runAfterBothAsync
(
CompletableFuture
<?>
other
,
Runnable
action
,
Executor
executor
)
{
public
CompletableFuture
<
Void
>
runAfterBothAsync
(
CompletionStage
<?>
other
,
Runnable
action
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doRunAfterBoth
(
other
,
action
,
executor
);
return
doRunAfterBoth
(
other
.
toCompletableFuture
()
,
action
,
executor
);
}
private
CompletableFuture
<
Void
>
doRunAfterBoth
(
CompletableFuture
<?>
other
,
Runnable
action
,
Executor
e
)
{
if
(
other
==
null
||
action
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
RunAfterBoth
d
=
null
;
Object
r
,
s
=
null
;
if
((
r
=
result
)
==
null
||
(
s
=
other
.
result
)
==
null
)
{
d
=
new
RunAfterBoth
(
this
,
other
,
action
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
==
null
&&
(
r
=
result
)
==
null
)
||
(
s
==
null
&&
(
s
=
other
.
result
)
==
null
))
{
if
(
q
!=
null
)
{
if
(
s
!=
null
||
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
r
!=
null
||
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
{
if
(
s
!=
null
)
break
;
q
=
new
CompletionNode
(
d
);
}
}
}
if
(
r
!=
null
&&
s
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
Throwable
ex
;
if
(
r
instanceof
AltResult
)
ex
=
((
AltResult
)
r
).
ex
;
else
ex
=
null
;
if
(
ex
==
null
&&
(
s
instanceof
AltResult
))
ex
=
((
AltResult
)
s
).
ex
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncRun
(
action
,
dst
));
else
action
.
run
();
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
}
/**
* Returns a new CompletableFuture that is completed
* when either this or the other given CompletableFuture completes,
* with the result of the given function of either this or the other
* CompletableFuture's result.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, then the returned CompletableFuture may also do so,
* with a CompletionException holding one of these exceptions as its
* cause. No guarantees are made about which result or exception is
* used in the returned CompletableFuture. If the supplied function
* throws an exception, then the returned CompletableFuture completes
* exceptionally with a CompletionException holding the exception as
* its cause.
*
* @param other the other CompletableFuture
* @param fn the function to use to compute the value of
* the returned CompletableFuture
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
applyToEither
(
Complet
ableFutur
e
<?
extends
T
>
other
,
(
Complet
ionStag
e
<?
extends
T
>
other
,
Function
<?
super
T
,
U
>
fn
)
{
return
doApplyToEither
(
other
,
fn
,
null
);
return
doApplyToEither
(
other
.
toCompletableFuture
()
,
fn
,
null
);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when either this or the other given CompletableFuture completes,
* with the result of the given function of either this or the other
* CompletableFuture's result from a task running in the
* {@link ForkJoinPool#commonPool()}.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, then the returned CompletableFuture may also do so,
* with a CompletionException holding one of these exceptions as its
* cause. No guarantees are made about which result or exception is
* used in the returned CompletableFuture. If the supplied function
* throws an exception, then the returned CompletableFuture completes
* exceptionally with a CompletionException holding the exception as
* its cause.
*
* @param other the other CompletableFuture
* @param fn the function to use to compute the value of
* the returned CompletableFuture
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
applyToEitherAsync
(
Complet
ableFutur
e
<?
extends
T
>
other
,
(
Complet
ionStag
e
<?
extends
T
>
other
,
Function
<?
super
T
,
U
>
fn
)
{
return
doApplyToEither
(
other
,
fn
,
ForkJoinPool
.
commonPool
());
return
doApplyToEither
(
other
.
toCompletableFuture
(),
fn
,
ForkJoinPool
.
commonPool
());
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when either this or the other given CompletableFuture completes,
* with the result of the given function of either this or the other
* CompletableFuture's result from a task running in the
* given executor.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, then the returned CompletableFuture may also do so,
* with a CompletionException holding one of these exceptions as its
* cause. No guarantees are made about which result or exception is
* used in the returned CompletableFuture. If the supplied function
* throws an exception, then the returned CompletableFuture completes
* exceptionally with a CompletionException holding the exception as
* its cause.
*
* @param other the other CompletableFuture
* @param fn the function to use to compute the value of
* the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
applyToEitherAsync
(
Complet
ableFutur
e
<?
extends
T
>
other
,
(
Complet
ionStag
e
<?
extends
T
>
other
,
Function
<?
super
T
,
U
>
fn
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doApplyToEither
(
other
,
fn
,
executor
);
}
private
<
U
>
CompletableFuture
<
U
>
doApplyToEither
(
CompletableFuture
<?
extends
T
>
other
,
Function
<?
super
T
,
U
>
fn
,
Executor
e
)
{
if
(
other
==
null
||
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
dst
=
new
CompletableFuture
<
U
>();
ApplyToEither
<
T
,
U
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
d
=
new
ApplyToEither
<
T
,
U
>(
this
,
other
,
fn
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
if
(
q
!=
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
q
=
new
CompletionNode
(
d
);
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
U
u
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncApply
<
T
,
U
>(
t
,
fn
,
dst
));
else
u
=
fn
.
apply
(
t
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
u
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
return
doApplyToEither
(
other
.
toCompletableFuture
(),
fn
,
executor
);
}
/**
* Returns a new CompletableFuture that is completed
* when either this or the other given CompletableFuture completes,
* after performing the given action with the result of either this
* or the other CompletableFuture's result.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, then the returned CompletableFuture may also do so,
* with a CompletionException holding one of these exceptions as its
* cause. No guarantees are made about which result or exception is
* used in the returned CompletableFuture. If the supplied action
* throws an exception, then the returned CompletableFuture completes
* exceptionally with a CompletionException holding the exception as
* its cause.
*
* @param other the other CompletableFuture
* @param block the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
acceptEither
(
Complet
ableFutur
e
<?
extends
T
>
other
,
Consumer
<?
super
T
>
block
)
{
return
doAcceptEither
(
other
,
block
,
null
);
(
Complet
ionStag
e
<?
extends
T
>
other
,
Consumer
<?
super
T
>
action
)
{
return
doAcceptEither
(
other
.
toCompletableFuture
(),
action
,
null
);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when either this or the other given CompletableFuture completes,
* after performing the given action with the result of either this
* or the other CompletableFuture's result from a task running in
* the {@link ForkJoinPool#commonPool()}.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, then the returned CompletableFuture may also do so,
* with a CompletionException holding one of these exceptions as its
* cause. No guarantees are made about which result or exception is
* used in the returned CompletableFuture. If the supplied action
* throws an exception, then the returned CompletableFuture completes
* exceptionally with a CompletionException holding the exception as
* its cause.
*
* @param other the other CompletableFuture
* @param block the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
acceptEitherAsync
(
CompletableFuture
<?
extends
T
>
other
,
Consumer
<?
super
T
>
block
)
{
return
doAcceptEither
(
other
,
block
,
ForkJoinPool
.
commonPool
());
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when either this or the other given CompletableFuture completes,
* after performing the given action with the result of either this
* or the other CompletableFuture's result from a task running in
* the given executor.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, then the returned CompletableFuture may also do so,
* with a CompletionException holding one of these exceptions as its
* cause. No guarantees are made about which result or exception is
* used in the returned CompletableFuture. If the supplied action
* throws an exception, then the returned CompletableFuture completes
* exceptionally with a CompletionException holding the exception as
* its cause.
*
* @param other the other CompletableFuture
* @param block the action to perform before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
(
CompletionStage
<?
extends
T
>
other
,
Consumer
<?
super
T
>
action
)
{
return
doAcceptEither
(
other
.
toCompletableFuture
(),
action
,
ForkJoinPool
.
commonPool
());
}
public
CompletableFuture
<
Void
>
acceptEitherAsync
(
Complet
ableFutur
e
<?
extends
T
>
other
,
Consumer
<?
super
T
>
block
,
(
Complet
ionStag
e
<?
extends
T
>
other
,
Consumer
<?
super
T
>
action
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doAcceptEither
(
other
,
block
,
executor
);
}
private
CompletableFuture
<
Void
>
doAcceptEither
(
CompletableFuture
<?
extends
T
>
other
,
Consumer
<?
super
T
>
fn
,
Executor
e
)
{
if
(
other
==
null
||
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
AcceptEither
<
T
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
d
=
new
AcceptEither
<
T
>(
this
,
other
,
fn
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
if
(
q
!=
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
q
=
new
CompletionNode
(
d
);
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncAccept
<
T
>(
t
,
fn
,
dst
));
else
fn
.
accept
(
t
);
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
return
doAcceptEither
(
other
.
toCompletableFuture
(),
action
,
executor
);
}
/**
* Returns a new CompletableFuture that is completed
* when either this or the other given CompletableFuture completes,
* after performing the given action.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, then the returned CompletableFuture may also do so,
* with a CompletionException holding one of these exceptions as its
* cause. No guarantees are made about which result or exception is
* used in the returned CompletableFuture. If the supplied action
* throws an exception, then the returned CompletableFuture completes
* exceptionally with a CompletionException holding the exception as
* its cause.
*
* @param other the other CompletableFuture
* @param action the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
runAfterEither
(
CompletableFuture
<?>
other
,
public
CompletableFuture
<
Void
>
runAfterEither
(
CompletionStage
<?>
other
,
Runnable
action
)
{
return
doRunAfterEither
(
other
,
action
,
null
);
return
doRunAfterEither
(
other
.
toCompletableFuture
()
,
action
,
null
);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when either this or the other given CompletableFuture completes,
* after performing the given action from a task running in the
* {@link ForkJoinPool#commonPool()}.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, then the returned CompletableFuture may also do so,
* with a CompletionException holding one of these exceptions as its
* cause. No guarantees are made about which result or exception is
* used in the returned CompletableFuture. If the supplied action
* throws an exception, then the returned CompletableFuture completes
* exceptionally with a CompletionException holding the exception as
* its cause.
*
* @param other the other CompletableFuture
* @param action the action to perform before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
runAfterEitherAsync
(
Complet
ableFutur
e
<?>
other
,
(
Complet
ionStag
e
<?>
other
,
Runnable
action
)
{
return
doRunAfterEither
(
other
,
action
,
ForkJoinPool
.
commonPool
());
return
doRunAfterEither
(
other
.
toCompletableFuture
(),
action
,
ForkJoinPool
.
commonPool
());
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* when either this or the other given CompletableFuture completes,
* after performing the given action from a task running in the
* given executor.
*
* <p>If this and/or the other CompletableFuture complete
* exceptionally, then the returned CompletableFuture may also do so,
* with a CompletionException holding one of these exceptions as its
* cause. No guarantees are made about which result or exception is
* used in the returned CompletableFuture. If the supplied action
* throws an exception, then the returned CompletableFuture completes
* exceptionally with a CompletionException holding the exception as
* its cause.
*
* @param other the other CompletableFuture
* @param action the action to perform before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public
CompletableFuture
<
Void
>
runAfterEitherAsync
(
Complet
ableFutur
e
<?>
other
,
(
Complet
ionStag
e
<?>
other
,
Runnable
action
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doRunAfterEither
(
other
,
action
,
executor
);
}
private
CompletableFuture
<
Void
>
doRunAfterEither
(
CompletableFuture
<?>
other
,
Runnable
action
,
Executor
e
)
{
if
(
other
==
null
||
action
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
Void
>
dst
=
new
CompletableFuture
<
Void
>();
RunAfterEither
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
d
=
new
RunAfterEither
(
this
,
other
,
action
,
dst
,
e
);
CompletionNode
q
=
null
,
p
=
new
CompletionNode
(
d
);
while
((
r
=
result
)
==
null
&&
(
r
=
other
.
result
)
==
null
)
{
if
(
q
!=
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
other
,
COMPLETIONS
,
q
.
next
=
other
.
completions
,
q
))
break
;
}
else
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
q
=
new
CompletionNode
(
d
);
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
Throwable
ex
;
if
(
r
instanceof
AltResult
)
ex
=
((
AltResult
)
r
).
ex
;
else
ex
=
null
;
if
(
ex
==
null
)
{
try
{
if
(
e
!=
null
)
e
.
execute
(
new
AsyncRun
(
action
,
dst
));
else
action
.
run
();
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
other
.
helpPostComplete
();
return
dst
;
return
doRunAfterEither
(
other
.
toCompletableFuture
(),
action
,
executor
);
}
/**
* Returns a CompletableFuture that upon completion, has the same
* value as produced by the given function of the result of this
* CompletableFuture.
*
* <p>If this CompletableFuture completes exceptionally, then the
* returned CompletableFuture also does so, with a
* CompletionException holding this exception as its cause.
* Similarly, if the computed CompletableFuture completes
* exceptionally, then so does the returned CompletableFuture.
*
* @param fn the function returning a new CompletableFuture
* @return the CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
thenCompose
(
Function
<?
super
T
,
CompletableFutur
e
<
U
>>
fn
)
{
(
Function
<?
super
T
,
?
extends
CompletionStag
e
<
U
>>
fn
)
{
return
doThenCompose
(
fn
,
null
);
}
/**
* Returns a CompletableFuture that upon completion, has the same
* value as that produced asynchronously using the {@link
* ForkJoinPool#commonPool()} by the given function of the result
* of this CompletableFuture.
*
* <p>If this CompletableFuture completes exceptionally, then the
* returned CompletableFuture also does so, with a
* CompletionException holding this exception as its cause.
* Similarly, if the computed CompletableFuture completes
* exceptionally, then so does the returned CompletableFuture.
*
* @param fn the function returning a new CompletableFuture
* @return the CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
thenComposeAsync
(
Function
<?
super
T
,
CompletableFutur
e
<
U
>>
fn
)
{
(
Function
<?
super
T
,
?
extends
CompletionStag
e
<
U
>>
fn
)
{
return
doThenCompose
(
fn
,
ForkJoinPool
.
commonPool
());
}
/**
* Returns a CompletableFuture that upon completion, has the same
* value as that produced asynchronously using the given executor
* by the given function of this CompletableFuture.
*
* <p>If this CompletableFuture completes exceptionally, then the
* returned CompletableFuture also does so, with a
* CompletionException holding this exception as its cause.
* Similarly, if the computed CompletableFuture completes
* exceptionally, then so does the returned CompletableFuture.
*
* @param fn the function returning a new CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
thenComposeAsync
(
Function
<?
super
T
,
CompletableFutur
e
<
U
>>
fn
,
(
Function
<?
super
T
,
?
extends
CompletionStag
e
<
U
>>
fn
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doThenCompose
(
fn
,
executor
);
}
private
<
U
>
CompletableFuture
<
U
>
doThenCompose
(
Function
<?
super
T
,
CompletableFuture
<
U
>>
fn
,
Executor
e
)
{
if
(
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
dst
=
null
;
ThenCompose
<
T
,
U
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
dst
=
new
CompletableFuture
<
U
>();
CompletionNode
p
=
new
CompletionNode
(
d
=
new
ThenCompose
<
T
,
U
>(
this
,
fn
,
dst
,
e
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
if
(
ex
==
null
)
{
if
(
e
!=
null
)
{
if
(
dst
==
null
)
dst
=
new
CompletableFuture
<
U
>();
e
.
execute
(
new
AsyncCompose
<
T
,
U
>(
t
,
fn
,
dst
));
}
else
{
try
{
if
((
dst
=
fn
.
apply
(
t
))
==
null
)
ex
=
new
NullPointerException
();
}
catch
(
Throwable
rex
)
{
ex
=
rex
;
}
}
}
if
(
dst
==
null
)
dst
=
new
CompletableFuture
<
U
>();
if
(
e
==
null
||
ex
!=
null
)
dst
.
internalComplete
(
null
,
ex
);
}
helpPostComplete
();
dst
.
helpPostComplete
();
return
dst
;
public
CompletableFuture
<
T
>
whenComplete
(
BiConsumer
<?
super
T
,
?
super
Throwable
>
action
)
{
return
doWhenComplete
(
action
,
null
);
}
public
CompletableFuture
<
T
>
whenCompleteAsync
(
BiConsumer
<?
super
T
,
?
super
Throwable
>
action
)
{
return
doWhenComplete
(
action
,
ForkJoinPool
.
commonPool
());
}
public
CompletableFuture
<
T
>
whenCompleteAsync
(
BiConsumer
<?
super
T
,
?
super
Throwable
>
action
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doWhenComplete
(
action
,
executor
);
}
public
<
U
>
CompletableFuture
<
U
>
handle
(
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
)
{
return
doHandle
(
fn
,
null
);
}
public
<
U
>
CompletableFuture
<
U
>
handleAsync
(
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
)
{
return
doHandle
(
fn
,
ForkJoinPool
.
commonPool
());
}
public
<
U
>
CompletableFuture
<
U
>
handleAsync
(
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
,
Executor
executor
)
{
if
(
executor
==
null
)
throw
new
NullPointerException
();
return
doHandle
(
fn
,
executor
);
}
/**
* Returns this CompletableFuture
*
* @return this CompletableFuture
*/
public
CompletableFuture
<
T
>
toCompletableFuture
()
{
return
this
;
}
// not in interface CompletionStage
/**
* Returns a new CompletableFuture that is completed when this
* CompletableFuture completes, with the result of the given
...
...
@@ -2868,6 +2608,8 @@ public class CompletableFuture<T> implements Future<T> {
* completion when it completes exceptionally; otherwise, if this
* CompletableFuture completes normally, then the returned
* CompletableFuture also completes normally with the same value.
* Note: More flexible versions of this functionality are
* available using methods {@code whenComplete} and {@code handle}.
*
* @param fn the function to use to compute the value of the
* returned CompletableFuture if this CompletableFuture completed
...
...
@@ -2882,7 +2624,8 @@ public class CompletableFuture<T> implements Future<T> {
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
ExceptionCompletion
<
T
>(
this
,
fn
,
dst
));
new
CompletionNode
(
d
=
new
ExceptionCompletion
<
T
>
(
this
,
fn
,
dst
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
...
...
@@ -2910,59 +2653,6 @@ public class CompletableFuture<T> implements Future<T> {
return
dst
;
}
/**
* Returns a new CompletableFuture that is completed when this
* CompletableFuture completes, with the result of the given
* function of the result and exception of this CompletableFuture's
* completion. The given function is invoked with the result (or
* {@code null} if none) and the exception (or {@code null} if none)
* of this CompletableFuture when complete.
*
* @param fn the function to use to compute the value of the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public
<
U
>
CompletableFuture
<
U
>
handle
(
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
)
{
if
(
fn
==
null
)
throw
new
NullPointerException
();
CompletableFuture
<
U
>
dst
=
new
CompletableFuture
<
U
>();
HandleCompletion
<
T
,
U
>
d
=
null
;
Object
r
;
if
((
r
=
result
)
==
null
)
{
CompletionNode
p
=
new
CompletionNode
(
d
=
new
HandleCompletion
<
T
,
U
>(
this
,
fn
,
dst
));
while
((
r
=
result
)
==
null
)
{
if
(
UNSAFE
.
compareAndSwapObject
(
this
,
COMPLETIONS
,
p
.
next
=
completions
,
p
))
break
;
}
}
if
(
r
!=
null
&&
(
d
==
null
||
d
.
compareAndSet
(
0
,
1
)))
{
T
t
;
Throwable
ex
;
if
(
r
instanceof
AltResult
)
{
ex
=
((
AltResult
)
r
).
ex
;
t
=
null
;
}
else
{
ex
=
null
;
@SuppressWarnings
(
"unchecked"
)
T
tr
=
(
T
)
r
;
t
=
tr
;
}
U
u
;
Throwable
dx
;
try
{
u
=
fn
.
apply
(
t
,
ex
);
dx
=
null
;
}
catch
(
Throwable
rex
)
{
dx
=
rex
;
u
=
null
;
}
dst
.
internalComplete
(
u
,
dx
);
}
helpPostComplete
();
return
dst
;
}
/* ------------- Arbitrary-arity constructions -------------- */
/*
...
...
@@ -3214,6 +2904,21 @@ public class CompletableFuture<T> implements Future<T> {
(((
AltResult
)
r
).
ex
instanceof
CancellationException
);
}
/**
* Returns {@code true} if this CompletableFuture completed
* exceptionally, in any way. Possible causes include
* cancellation, explicit invocation of {@code
* completeExceptionally}, and abrupt termination of a
* CompletionStage action.
*
* @return {@code true} if this CompletableFuture completed
* exceptionally
*/
public
boolean
isCompletedExceptionally
()
{
Object
r
;
return
((
r
=
result
)
instanceof
AltResult
)
&&
r
!=
NIL
;
}
/**
* Forcibly sets or resets the value subsequently returned by
* method {@link #get()} and related methods, whether or not
...
...
src/share/classes/java/util/concurrent/CompletionStage.java
0 → 100644
浏览文件 @
4efead92
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package
java.util.concurrent
;
import
java.util.function.Supplier
;
import
java.util.function.Consumer
;
import
java.util.function.BiConsumer
;
import
java.util.function.Function
;
import
java.util.function.BiFunction
;
import
java.util.concurrent.Executor
;
/**
* A stage of a possibly asynchronous computation, that performs an
* action or computes a value when another CompletionStage completes.
* A stage completes upon termination of its computation, but this may
* in turn trigger other dependent stages. The functionality defined
* in this interface takes only a few basic forms, which expand out to
* a larger set of methods to capture a range of usage styles: <ul>
*
* <li>The computation performed by a stage may be expressed as a
* Function, Consumer, or Runnable (using methods with names including
* <em>apply</em>, <em>accept</em>, or <em>run</em>, respectively)
* depending on whether it requires arguments and/or produces results.
* For example, {@code stage.thenApply(x -> square(x)).thenAccept(x ->
* System.out.print(x)).thenRun(() -> System.out.println())}. An
* additional form (<em>compose</em>) applies functions of stages
* themselves, rather than their results. </li>
*
* <li> One stage's execution may be triggered by completion of a
* single stage, or both of two stages, or either of two stages.
* Dependencies on a single stage are arranged using methods with
* prefix <em>then</em>. Those triggered by completion of
* <em>both</em> of two stages may <em>combine</em> their results or
* effects, using correspondingly named methods. Those triggered by
* <em>either</em> of two stages make no guarantees about which of the
* results or effects are used for the dependent stage's
* computation.</li>
*
* <li> Dependencies among stages control the triggering of
* computations, but do not otherwise guarantee any particular
* ordering. Additionally, execution of a new stage's computations may
* be arranged in any of three ways: default execution, default
* asynchronous execution (using methods with suffix <em>async</em>
* that employ the stage's default asynchronous execution facility),
* or custom (via a supplied {@link Executor}). The execution
* properties of default and async modes are specified by
* CompletionStage implementations, not this interface. Methods with
* explicit Executor arguments may have arbitrary execution
* properties, and might not even support concurrent execution, but
* are arranged for processing in a way that accommodates asynchrony.
*
* <li> Two method forms support processing whether the triggering
* stage completed normally or exceptionally: Method {@link
* #whenComplete whenComplete} allows injection of an action
* regardless of outcome, otherwise preserving the outcome in its
* completion. Method {@link #handle handle} additionally allows the
* stage to compute a replacement result that may enable further
* processing by other dependent stages. In all other cases, if a
* stage's computation terminates abruptly with an (unchecked)
* exception or error, then all dependent stages requiring its
* completion complete exceptionally as well, with a {@link
* CompletionException} holding the exception as its cause. If a
* stage is dependent on <em>both</em> of two stages, and both
* complete exceptionally, then the CompletionException may correspond
* to either one of these exceptions. If a stage is dependent on
* <em>either</em> of two others, and only one of them completes
* exceptionally, no guarantees are made about whether the dependent
* stage completes normally or exceptionally. In the case of method
* {@code whenComplete}, when the supplied action itself encounters an
* exception, then the stage exceptionally completes with this
* exception if not already completed exceptionally.</li>
*
* </ul>
*
* <p>All methods adhere to the above triggering, execution, and
* exceptional completion specifications (which are not repeated in
* individual method specifications). Additionally, while arguments
* used to pass a completion result (that is, for parameters of type
* {@code T}) for methods accepting them may be null, passing a null
* value for any other parameter will result in a {@link
* NullPointerException} being thrown.
*
* <p>This interface does not define methods for initially creating,
* forcibly completing normally or exceptionally, probing completion
* status or results, or awaiting completion of a stage.
* Implementations of CompletionStage may provide means of achieving
* such effects, as appropriate. Method {@link #toCompletableFuture}
* enables interoperability among different implementations of this
* interface by providing a common conversion type.
*
* @author Doug Lea
* @since 1.8
*/
public
interface
CompletionStage
<
T
>
{
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage's result as the argument
* to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
thenApply
(
Function
<?
super
T
,?
extends
U
>
fn
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage's result as the argument to
* the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
thenApplyAsync
(
Function
<?
super
T
,?
extends
U
>
fn
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using the supplied Executor, with this
* stage's result as the argument to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
thenApplyAsync
(
Function
<?
super
T
,?
extends
U
>
fn
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage's result as the argument
* to the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
thenAccept
(
Consumer
<?
super
T
>
action
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage's result as the argument to
* the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
thenAcceptAsync
(
Consumer
<?
super
T
>
action
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using the supplied Executor, with this
* stage's result as the argument to the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
thenAcceptAsync
(
Consumer
<?
super
T
>
action
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
thenRun
(
Runnable
action
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action using this stage's default
* asynchronous execution facility.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
thenRunAsync
(
Runnable
action
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action using the supplied Executor.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
thenRunAsync
(
Runnable
action
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when this and the other
* given stage both complete normally, is executed with the two
* results as arguments to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @param <V> the function's return type
* @return the new CompletionStage
*/
public
<
U
,
V
>
CompletionStage
<
V
>
thenCombine
(
CompletionStage
<?
extends
U
>
other
,
BiFunction
<?
super
T
,?
super
U
,?
extends
V
>
fn
);
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, is executed using this stage's
* default asynchronous execution facility, with the two results
* as arguments to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @param <V> the function's return type
* @return the new CompletionStage
*/
public
<
U
,
V
>
CompletionStage
<
V
>
thenCombineAsync
(
CompletionStage
<?
extends
U
>
other
,
BiFunction
<?
super
T
,?
super
U
,?
extends
V
>
fn
);
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, is executed using the supplied
* executor, with the two results as arguments to the supplied
* function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the type of the other CompletionStage's result
* @param <V> the function's return type
* @return the new CompletionStage
*/
public
<
U
,
V
>
CompletionStage
<
V
>
thenCombineAsync
(
CompletionStage
<?
extends
U
>
other
,
BiFunction
<?
super
T
,?
super
U
,?
extends
V
>
fn
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when this and the other
* given stage both complete normally, is executed with the two
* results as arguments to the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
Void
>
thenAcceptBoth
(
CompletionStage
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,
?
super
U
>
action
);
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, is executed using this stage's
* default asynchronous execution facility, with the two results
* as arguments to the supplied action.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
Void
>
thenAcceptBothAsync
(
CompletionStage
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,
?
super
U
>
action
);
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, is executed using the supplied
* executor, with the two results as arguments to the supplied
* function.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the type of the other CompletionStage's result
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
Void
>
thenAcceptBothAsync
(
CompletionStage
<?
extends
U
>
other
,
BiConsumer
<?
super
T
,
?
super
U
>
action
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when this and the other
* given stage both complete normally, executes the given action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
runAfterBoth
(
CompletionStage
<?>
other
,
Runnable
action
);
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, executes the given action using
* this stage's default asynchronous execution facility.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
runAfterBothAsync
(
CompletionStage
<?>
other
,
Runnable
action
);
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, executes the given action using
* the supplied executor
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
runAfterBothAsync
(
CompletionStage
<?>
other
,
Runnable
action
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed with the
* corresponding result as argument to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
applyToEither
(
CompletionStage
<?
extends
T
>
other
,
Function
<?
super
T
,
U
>
fn
);
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed using this
* stage's default asynchronous execution facility, with the
* corresponding result as argument to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
applyToEitherAsync
(
CompletionStage
<?
extends
T
>
other
,
Function
<?
super
T
,
U
>
fn
);
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed using the
* supplied executor, with the corresponding result as argument to
* the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
applyToEitherAsync
(
CompletionStage
<?
extends
T
>
other
,
Function
<?
super
T
,
U
>
fn
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed with the
* corresponding result as argument to the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
acceptEither
(
CompletionStage
<?
extends
T
>
other
,
Consumer
<?
super
T
>
action
);
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed using this
* stage's default asynchronous execution facility, with the
* corresponding result as argument to the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
acceptEitherAsync
(
CompletionStage
<?
extends
T
>
other
,
Consumer
<?
super
T
>
action
);
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed using the
* supplied executor, with the corresponding result as argument to
* the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
acceptEitherAsync
(
CompletionStage
<?
extends
T
>
other
,
Consumer
<?
super
T
>
action
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, executes the given action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
runAfterEither
(
CompletionStage
<?>
other
,
Runnable
action
);
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, executes the given action
* using this stage's default asynchronous execution facility.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
runAfterEitherAsync
(
CompletionStage
<?>
other
,
Runnable
action
);
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, executes the given action
* using supplied executor.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public
CompletionStage
<
Void
>
runAfterEitherAsync
(
CompletionStage
<?>
other
,
Runnable
action
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage as the argument
* to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function returning a new CompletionStage
* @param <U> the type of the returned CompletionStage's result
* @return the CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
thenCompose
(
Function
<?
super
T
,
?
extends
CompletionStage
<
U
>>
fn
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage as the argument to the
* supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function returning a new CompletionStage
* @param <U> the type of the returned CompletionStage's result
* @return the CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
thenComposeAsync
(
Function
<?
super
T
,
?
extends
CompletionStage
<
U
>>
fn
);
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using the supplied Executor, with this
* stage's result as the argument to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function returning a new CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the type of the returned CompletionStage's result
* @return the CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
thenComposeAsync
(
Function
<?
super
T
,
?
extends
CompletionStage
<
U
>>
fn
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when this stage completes
* exceptionally, is executed with this stage's exception as the
* argument to the supplied function. Otherwise, if this stage
* completes normally, then the returned stage also completes
* normally with the same value.
*
* @param fn the function to use to compute the value of the
* returned CompletionStage if this CompletionStage completed
* exceptionally
* @return the new CompletionStage
*/
public
CompletionStage
<
T
>
exceptionally
(
Function
<
Throwable
,
?
extends
T
>
fn
);
/**
* Returns a new CompletionStage with the same result or exception
* as this stage, and when this stage completes, executes the
* given action with the result (or {@code null} if none) and the
* exception (or {@code null} if none) of this stage.
*
* @param action the action to perform
* @return the new CompletionStage
*/
public
CompletionStage
<
T
>
whenComplete
(
BiConsumer
<?
super
T
,
?
super
Throwable
>
action
);
/**
* Returns a new CompletionStage with the same result or exception
* as this stage, and when this stage completes, executes the
* given action executes the given action using this stage's
* default asynchronous execution facility, with the result (or
* {@code null} if none) and the exception (or {@code null} if
* none) of this stage as arguments.
*
* @param action the action to perform
* @return the new CompletionStage
*/
public
CompletionStage
<
T
>
whenCompleteAsync
(
BiConsumer
<?
super
T
,
?
super
Throwable
>
action
);
/**
* Returns a new CompletionStage with the same result or exception
* as this stage, and when this stage completes, executes using
* the supplied Executor, the given action with the result (or
* {@code null} if none) and the exception (or {@code null} if
* none) of this stage as arguments.
*
* @param action the action to perform
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public
CompletionStage
<
T
>
whenCompleteAsync
(
BiConsumer
<?
super
T
,
?
super
Throwable
>
action
,
Executor
executor
);
/**
* Returns a new CompletionStage that, when this stage completes
* either normally or exceptionally, is executed with this stage's
* result and exception as arguments to the supplied function.
* The given function is invoked with the result (or {@code null}
* if none) and the exception (or {@code null} if none) of this
* stage when complete as arguments.
*
* @param fn the function to use to compute the value of the
* returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
handle
(
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
);
/**
* Returns a new CompletionStage that, when this stage completes
* either normally or exceptionally, is executed using this stage's
* default asynchronous execution facility, with this stage's
* result and exception as arguments to the supplied function.
* The given function is invoked with the result (or {@code null}
* if none) and the exception (or {@code null} if none) of this
* stage when complete as arguments.
*
* @param fn the function to use to compute the value of the
* returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
handleAsync
(
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
);
/**
* Returns a new CompletionStage that, when this stage completes
* either normally or exceptionally, is executed using the
* supplied executor, with this stage's result and exception as
* arguments to the supplied function. The given function is
* invoked with the result (or {@code null} if none) and the
* exception (or {@code null} if none) of this stage when complete
* as arguments.
*
* @param fn the function to use to compute the value of the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletionStage
*/
public
<
U
>
CompletionStage
<
U
>
handleAsync
(
BiFunction
<?
super
T
,
Throwable
,
?
extends
U
>
fn
,
Executor
executor
);
/**
* Returns a {@link CompletableFuture} maintaining the same
* completion properties as this stage. If this stage is already a
* CompletableFuture, this method may return this stage itself.
* Otherwise, invocation of this method may be equivalent in
* effect to {@code thenApply(x -> x)}, but returning an instance
* of type {@code CompletableFuture}. A CompletionStage
* implementation that does not choose to interoperate with others
* may throw {@code UnsupportedOperationException}.
*
* @return the CompletableFuture
* @throws UnsupportedOperationException if this implementation
* does not interoperate with CompletableFuture
*/
public
CompletableFuture
<
T
>
toCompletableFuture
();
}
test/ProblemList.txt
浏览文件 @
4efead92
...
...
@@ -370,12 +370,6 @@ java/util/concurrent/ThreadPoolExecutor/CoreThreadTimeOut.java generic-all
# Filed 6772009
java/util/concurrent/locks/ReentrantLock/CancelledLockLoops.java generic-all
# 8020435
java/util/concurrent/CompletableFuture/Basic.java generic-all
# 8020291
java/util/Random/RandomStreamTest.java generic-all
# 7041639, Solaris DSA keypair generation bug
java/util/TimeZone/TimeZoneDatePermissionCheck.sh solaris-all
...
...
test/java/util/concurrent/CompletableFuture/Basic.java
浏览文件 @
4efead92
...
...
@@ -34,6 +34,8 @@
/*
* @test
* @bug 8005696
* @run main Basic
* @run main/othervm -Djava.util.concurrent.ForkJoinPool.common.parallelism=0 Basic
* @summary Basic tests for CompletableFuture
* @author Chris Hegarty
*/
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录