diff --git a/src/share/classes/java/util/concurrent/CompletableFuture.java b/src/share/classes/java/util/concurrent/CompletableFuture.java index 9e89169e15f08a0f159691b46395c518ff29c07a..8476dcc12379bc6b0ac6050bd6a14b826bbb66f9 100644 --- a/src/share/classes/java/util/concurrent/CompletableFuture.java +++ b/src/share/classes/java/util/concurrent/CompletableFuture.java @@ -50,7 +50,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; /** @@ -77,9 +76,9 @@ import java.util.concurrent.locks.LockSupport; *
  • All async methods without an explicit Executor * argument are performed using the {@link ForkJoinPool#commonPool()} * (unless it does not support a parallelism level of at least two, in - * which case, a new Thread is used). To simplify monitoring, - * debugging, and tracking, all generated asynchronous tasks are - * instances of the marker interface {@link + * which case, a new Thread is created to run each task). To simplify + * monitoring, debugging, and tracking, all generated asynchronous + * tasks are instances of the marker interface {@link * AsynchronousCompletionTask}.
  • * *
  • All CompletionStage methods are implemented independently of @@ -113,301 +112,273 @@ public class CompletableFuture implements Future, CompletionStage { /* * Overview: * - * 1. Non-nullness of field result (set via CAS) indicates done. - * An AltResult is used to box null as a result, as well as to - * hold exceptions. Using a single field makes completion fast - * and simple to detect and trigger, at the expense of a lot of - * encoding and decoding that infiltrates many methods. One minor - * simplification relies on the (static) NIL (to box null results) - * being the only AltResult with a null exception field, so we - * don't usually need explicit comparisons with NIL. The CF - * exception propagation mechanics surrounding decoding rely on - * unchecked casts of decoded results really being unchecked, - * where user type errors are caught at point of use, as is - * currently the case in Java. These are highlighted by using - * SuppressWarnings-annotated temporaries. + * A CompletableFuture may have dependent completion actions, + * collected in a linked stack. It atomically completes by CASing + * a result field, and then pops off and runs those actions. This + * applies across normal vs exceptional outcomes, sync vs async + * actions, binary triggers, and various forms of completions. * - * 2. Waiters are held in a Treiber stack similar to the one used - * in FutureTask, Phaser, and SynchronousQueue. See their - * internal documentation for algorithmic details. + * Non-nullness of field result (set via CAS) indicates done. An + * AltResult is used to box null as a result, as well as to hold + * exceptions. Using a single field makes completion simple to + * detect and trigger. Encoding and decoding is straightforward + * but adds to the sprawl of trapping and associating exceptions + * with targets. Minor simplifications rely on (static) NIL (to + * box null results) being the only AltResult with a null + * exception field, so we don't usually need explicit comparisons. + * Even though some of the generics casts are unchecked (see + * SuppressWarnings annotations), they are placed to be + * appropriate even if checked. * - * 3. Completions are also kept in a list/stack, and pulled off - * and run when completion is triggered. (We could even use the - * same stack as for waiters, but would give up the potential - * parallelism obtained because woken waiters help release/run - * others -- see method postComplete). Because post-processing - * may race with direct calls, class Completion opportunistically - * extends AtomicInteger so callers can claim the action via - * compareAndSet(0, 1). The Completion.run methods are all - * written a boringly similar uniform way (that sometimes includes - * unnecessary-looking checks, kept to maintain uniformity). - * There are enough dimensions upon which they differ that - * attempts to factor commonalities while maintaining efficiency - * require more lines of code than they would save. + * Dependent actions are represented by Completion objects linked + * as Treiber stacks headed by field "stack". There are Completion + * classes for each kind of action, grouped into single-input + * (UniCompletion), two-input (BiCompletion), projected + * (BiCompletions using either (not both) of two inputs), shared + * (CoCompletion, used by the second of two sources), zero-input + * source actions, and Signallers that unblock waiters. Class + * Completion extends ForkJoinTask to enable async execution + * (adding no space overhead because we exploit its "tag" methods + * to maintain claims). It is also declared as Runnable to allow + * usage with arbitrary executors. * - * 4. The exported then/and/or methods do support a bit of - * factoring (see doThenApply etc). They must cope with the - * intrinsic races surrounding addition of a dependent action - * versus performing the action directly because the task is - * already complete. For example, a CF may not be complete upon - * entry, so a dependent completion is added, but by the time it - * is added, the target CF is complete, so must be directly - * executed. This is all done while avoiding unnecessary object - * construction in safe-bypass cases. + * Support for each kind of CompletionStage relies on a separate + * class, along with two CompletableFuture methods: + * + * * A Completion class with name X corresponding to function, + * prefaced with "Uni", "Bi", or "Or". Each class contains + * fields for source(s), actions, and dependent. They are + * boringly similar, differing from others only with respect to + * underlying functional forms. We do this so that users don't + * encounter layers of adaptors in common usages. We also + * include "Relay" classes/methods that don't correspond to user + * methods; they copy results from one stage to another. + * + * * Boolean CompletableFuture method x(...) (for example + * uniApply) takes all of the arguments needed to check that an + * action is triggerable, and then either runs the action or + * arranges its async execution by executing its Completion + * argument, if present. The method returns true if known to be + * complete. + * + * * Completion method tryFire(int mode) invokes the associated x + * method with its held arguments, and on success cleans up. + * The mode argument allows tryFire to be called twice (SYNC, + * then ASYNC); the first to screen and trap exceptions while + * arranging to execute, and the second when called from a + * task. (A few classes are not used async so take slightly + * different forms.) The claim() callback suppresses function + * invocation if already claimed by another thread. + * + * * CompletableFuture method xStage(...) is called from a public + * stage method of CompletableFuture x. It screens user + * arguments and invokes and/or creates the stage object. If + * not async and x is already complete, the action is run + * immediately. Otherwise a Completion c is created, pushed to + * x's stack (unless done), and started or triggered via + * c.tryFire. This also covers races possible if x completes + * while pushing. Classes with two inputs (for example BiApply) + * deal with races across both while pushing actions. The + * second completion is a CoCompletion pointing to the first, + * shared so that at most one performs the action. The + * multiple-arity methods allOf and anyOf do this pairwise to + * form trees of completions. + * + * Note that the generic type parameters of methods vary according + * to whether "this" is a source, dependent, or completion. + * + * Method postComplete is called upon completion unless the target + * is guaranteed not to be observable (i.e., not yet returned or + * linked). Multiple threads can call postComplete, which + * atomically pops each dependent action, and tries to trigger it + * via method tryFire, in NESTED mode. Triggering can propagate + * recursively, so NESTED mode returns its completed dependent (if + * one exists) for further processing by its caller (see method + * postFire). + * + * Blocking methods get() and join() rely on Signaller Completions + * that wake up waiting threads. The mechanics are similar to + * Treiber stack wait-nodes used in FutureTask, Phaser, and + * SynchronousQueue. See their internal documentation for + * algorithmic details. + * + * Without precautions, CompletableFutures would be prone to + * garbage accumulation as chains of Completions build up, each + * pointing back to its sources. So we null out fields as soon as + * possible (see especially method Completion.detach). The + * screening checks needed anyway harmlessly ignore null arguments + * that may have been obtained during races with threads nulling + * out fields. We also try to unlink fired Completions from + * stacks that might never be popped (see method postFire). + * Completion fields need not be declared as final or volatile + * because they are only visible to other threads upon safe + * publication. */ - // preliminaries + volatile Object result; // Either the result or boxed AltResult + volatile Completion stack; // Top of Treiber stack of dependent actions + + final boolean internalComplete(Object r) { // CAS from null to r + return UNSAFE.compareAndSwapObject(this, RESULT, null, r); + } + + final boolean casStack(Completion cmp, Completion val) { + return UNSAFE.compareAndSwapObject(this, STACK, cmp, val); + } + + /** Returns true if successfully pushed c onto stack. */ + final boolean tryPushStack(Completion c) { + Completion h = stack; + lazySetNext(c, h); + return UNSAFE.compareAndSwapObject(this, STACK, h, c); + } + + /** Unconditionally pushes c onto stack, retrying if necessary. */ + final void pushStack(Completion c) { + do {} while (!tryPushStack(c)); + } + + /* ------------- Encoding and decoding outcomes -------------- */ - static final class AltResult { - final Throwable ex; // null only for NIL - AltResult(Throwable ex) { this.ex = ex; } + static final class AltResult { // See above + final Throwable ex; // null only for NIL + AltResult(Throwable x) { this.ex = x; } } + /** The encoding of the null value. */ static final AltResult NIL = new AltResult(null); - // Fields + /** Completes with the null value, unless already completed. */ + final boolean completeNull() { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + NIL); + } - volatile Object result; // Either the result or boxed AltResult - volatile WaitNode waiters; // Treiber stack of threads blocked on get() - volatile CompletionNode completions; // list (Treiber stack) of completions + /** Returns the encoding of the given non-exceptional value. */ + final Object encodeValue(T t) { + return (t == null) ? NIL : t; + } - // Basic utilities for triggering and processing completions + /** Completes with a non-exceptional result, unless already completed. */ + final boolean completeValue(T t) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + (t == null) ? NIL : t); + } /** - * Removes and signals all waiting threads and runs all completions. + * Returns the encoding of the given (non-null) exception as a + * wrapped CompletionException unless it is one already. */ - final void postComplete() { - WaitNode q; Thread t; - while ((q = waiters) != null) { - if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) && - (t = q.thread) != null) { - q.thread = null; - LockSupport.unpark(t); - } - } + static AltResult encodeThrowable(Throwable x) { + return new AltResult((x instanceof CompletionException) ? x : + new CompletionException(x)); + } - CompletionNode h; Completion c; - while ((h = completions) != null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) && - (c = h.completion) != null) - c.run(); - } + /** Completes with an exceptional result, unless already completed. */ + final boolean completeThrowable(Throwable x) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + encodeThrowable(x)); } /** - * Triggers completion with the encoding of the given arguments: - * if the exception is non-null, encodes it as a wrapped - * CompletionException unless it is one already. Otherwise uses - * the given result, boxed as NIL if null. + * Returns the encoding of the given (non-null) exception as a + * wrapped CompletionException unless it is one already. May + * return the given Object r (which must have been the result of a + * source future) if it is equivalent, i.e. if this is a simple + * relay of an existing CompletionException. */ - final void internalComplete(T v, Throwable ex) { - if (result == null) - UNSAFE.compareAndSwapObject - (this, RESULT, null, - (ex == null) ? (v == null) ? NIL : v : - new AltResult((ex instanceof CompletionException) ? ex : - new CompletionException(ex))); - postComplete(); // help out even if not triggered + static Object encodeThrowable(Throwable x, Object r) { + if (!(x instanceof CompletionException)) + x = new CompletionException(x); + else if (r instanceof AltResult && x == ((AltResult)r).ex) + return r; + return new AltResult(x); } /** - * If triggered, helps release and/or process completions. + * Completes with the given (non-null) exceptional result as a + * wrapped CompletionException unless it is one already, unless + * already completed. May complete with the given Object r + * (which must have been the result of a source future) if it is + * equivalent, i.e. if this is a simple propagation of an + * existing CompletionException. */ - final void helpPostComplete() { - if (result != null) - postComplete(); + final boolean completeThrowable(Throwable x, Object r) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + encodeThrowable(x, r)); } - /* ------------- waiting for completions -------------- */ - - /** Number of processors, for spin control */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - /** - * Heuristic spin value for waitingGet() before blocking on - * multiprocessors + * Returns the encoding of the given arguments: if the exception + * is non-null, encodes as AltResult. Otherwise uses the given + * value, boxed as NIL if null. */ - static final int SPINS = (NCPU > 1) ? 1 << 8 : 0; + Object encodeOutcome(T t, Throwable x) { + return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); + } /** - * Linked nodes to record waiting threads in a Treiber stack. See - * other classes such as Phaser and SynchronousQueue for more - * detailed explanation. This class implements ManagedBlocker to - * avoid starvation when blocking actions pile up in - * ForkJoinPools. + * Returns the encoding of a copied outcome; if exceptional, + * rewraps as a CompletionException, else returns argument. */ - static final class WaitNode implements ForkJoinPool.ManagedBlocker { - long nanos; // wait time if timed - final long deadline; // non-zero if timed - volatile int interruptControl; // > 0: interruptible, < 0: interrupted - volatile Thread thread; - volatile WaitNode next; - WaitNode(boolean interruptible, long nanos, long deadline) { - this.thread = Thread.currentThread(); - this.interruptControl = interruptible ? 1 : 0; - this.nanos = nanos; - this.deadline = deadline; - } - public boolean isReleasable() { - if (thread == null) - return true; - if (Thread.interrupted()) { - int i = interruptControl; - interruptControl = -1; - if (i > 0) - return true; - } - if (deadline != 0L && - (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { - thread = null; - return true; - } - return false; - } - public boolean block() { - if (isReleasable()) - return true; - else if (deadline == 0L) - LockSupport.park(this); - else if (nanos > 0L) - LockSupport.parkNanos(this, nanos); - return isReleasable(); - } + static Object encodeRelay(Object r) { + Throwable x; + return (((r instanceof AltResult) && + (x = ((AltResult)r).ex) != null && + !(x instanceof CompletionException)) ? + new AltResult(new CompletionException(x)) : r); } /** - * Returns raw result after waiting, or null if interruptible and - * interrupted. + * Completes with r or a copy of r, unless already completed. + * If exceptional, r is first coerced to a CompletionException. */ - private Object waitingGet(boolean interruptible) { - WaitNode q = null; - boolean queued = false; - int spins = SPINS; - for (Object r;;) { - if ((r = result) != null) { - if (q != null) { // suppress unpark - q.thread = null; - if (q.interruptControl < 0) { - if (interruptible) { - removeWaiter(q); - return null; - } - Thread.currentThread().interrupt(); - } - } - postComplete(); // help release others - return r; - } - else if (spins > 0) { - int rnd = ThreadLocalRandom.nextSecondarySeed(); - if (rnd == 0) - rnd = ThreadLocalRandom.current().nextInt(); - if (rnd >= 0) - --spins; - } - else if (q == null) - q = new WaitNode(interruptible, 0L, 0L); - else if (!queued) - queued = UNSAFE.compareAndSwapObject(this, WAITERS, - q.next = waiters, q); - else if (interruptible && q.interruptControl < 0) { - removeWaiter(q); - return null; - } - else if (q.thread != null && result == null) { - try { - ForkJoinPool.managedBlock(q); - } catch (InterruptedException ex) { - q.interruptControl = -1; - } - } - } + final boolean completeRelay(Object r) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + encodeRelay(r)); } /** - * Awaits completion or aborts on interrupt or timeout. - * - * @param nanos time to wait - * @return raw result + * Reports result using Future.get conventions. */ - private Object timedAwaitDone(long nanos) - throws InterruptedException, TimeoutException { - WaitNode q = null; - boolean queued = false; - for (Object r;;) { - if ((r = result) != null) { - if (q != null) { - q.thread = null; - if (q.interruptControl < 0) { - removeWaiter(q); - throw new InterruptedException(); - } - } - postComplete(); - return r; - } - else if (q == null) { - if (nanos <= 0L) - throw new TimeoutException(); - long d = System.nanoTime() + nanos; - q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0 - } - else if (!queued) - queued = UNSAFE.compareAndSwapObject(this, WAITERS, - q.next = waiters, q); - else if (q.interruptControl < 0) { - removeWaiter(q); - throw new InterruptedException(); - } - else if (q.nanos <= 0L) { - if (result == null) { - removeWaiter(q); - throw new TimeoutException(); - } - } - else if (q.thread != null && result == null) { - try { - ForkJoinPool.managedBlock(q); - } catch (InterruptedException ex) { - q.interruptControl = -1; - } - } + private static T reportGet(Object r) + throws InterruptedException, ExecutionException { + if (r == null) // by convention below, null means interrupted + throw new InterruptedException(); + if (r instanceof AltResult) { + Throwable x, cause; + if ((x = ((AltResult)r).ex) == null) + return null; + if (x instanceof CancellationException) + throw (CancellationException)x; + if ((x instanceof CompletionException) && + (cause = x.getCause()) != null) + x = cause; + throw new ExecutionException(x); } + @SuppressWarnings("unchecked") T t = (T) r; + return t; } /** - * Tries to unlink a timed-out or interrupted wait node to avoid - * accumulating garbage. Internal nodes are simply unspliced - * without CAS since it is harmless if they are traversed anyway - * by releasers. To avoid effects of unsplicing from already - * removed nodes, the list is retraversed in case of an apparent - * race. This is slow when there are a lot of nodes, but we don't - * expect lists to be long enough to outweigh higher-overhead - * schemes. + * Decodes outcome to return result or throw unchecked exception. */ - private void removeWaiter(WaitNode node) { - if (node != null) { - node.thread = null; - retry: - for (;;) { // restart on removeWaiter race - for (WaitNode pred = null, q = waiters, s; q != null; q = s) { - s = q.next; - if (q.thread != null) - pred = q; - else if (pred != null) { - pred.next = s; - if (pred.thread == null) // check for race - continue retry; - } - else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s)) - continue retry; - } - break; - } + private static T reportJoin(Object r) { + if (r instanceof AltResult) { + Throwable x; + if ((x = ((AltResult)r).ex) == null) + return null; + if (x instanceof CancellationException) + throw (CancellationException)x; + if (x instanceof CompletionException) + throw (CompletionException)x; + throw new CompletionException(x); } + @SuppressWarnings("unchecked") T t = (T) r; + return t; } - /* ------------- Async tasks -------------- */ + /* ------------- Async task preliminaries -------------- */ /** * A marker interface identifying asynchronous tasks produced by @@ -419,1693 +390,1394 @@ public class CompletableFuture implements Future, CompletionStage { public static interface AsynchronousCompletionTask { } - /** Base class can act as either FJ or plain Runnable */ + private static final boolean useCommonPool = + (ForkJoinPool.getCommonPoolParallelism() > 1); + + /** + * Default executor -- ForkJoinPool.commonPool() unless it cannot + * support parallelism. + */ + private static final Executor asyncPool = useCommonPool ? + ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); + + /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ + static final class ThreadPerTaskExecutor implements Executor { + public void execute(Runnable r) { new Thread(r).start(); } + } + + /** + * Null-checks user executor argument, and translates uses of + * commonPool to asyncPool in case parallelism disabled. + */ + static Executor screenExecutor(Executor e) { + if (!useCommonPool && e == ForkJoinPool.commonPool()) + return asyncPool; + if (e == null) throw new NullPointerException(); + return e; + } + + // Modes for Completion.tryFire. Signedness matters. + static final int SYNC = 0; + static final int ASYNC = 1; + static final int NESTED = -1; + + /* ------------- Base Completion classes and operations -------------- */ + @SuppressWarnings("serial") - abstract static class Async extends ForkJoinTask + abstract static class Completion extends ForkJoinTask implements Runnable, AsynchronousCompletionTask { - public final Void getRawResult() { return null; } - public final void setRawResult(Void v) { } - public final void run() { exec(); } + volatile Completion next; // Treiber stack link + + /** + * Performs completion action if triggered, returning a + * dependent that may need propagation, if one exists. + * + * @param mode SYNC, ASYNC, or NESTED + */ + abstract CompletableFuture tryFire(int mode); + + /** Returns true if possibly still triggerable. Used by cleanStack. */ + abstract boolean isLive(); + + public final void run() { tryFire(ASYNC); } + public final boolean exec() { tryFire(ASYNC); return true; } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) {} + } + + static void lazySetNext(Completion c, Completion next) { + UNSAFE.putOrderedObject(c, NEXT, next); } /** - * Starts the given async task using the given executor, unless - * the executor is ForkJoinPool.commonPool and it has been - * disabled, in which case starts a new thread. + * Pops and tries to trigger all reachable dependents. Call only + * when known to be done. */ - static void execAsync(Executor e, Async r) { - if (e == ForkJoinPool.commonPool() && - ForkJoinPool.getCommonPoolParallelism() <= 1) - new Thread(r).start(); - else - e.execute(r); - } - - static final class AsyncRun extends Async { - final Runnable fn; - final CompletableFuture dst; - AsyncRun(Runnable fn, CompletableFuture dst) { - this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - fn.run(); - ex = null; - } catch (Throwable rex) { - ex = rex; + final void postComplete() { + /* + * On each step, variable f holds current dependents to pop + * and run. It is extended along only one path at a time, + * pushing others to avoid unbounded recursion. + */ + CompletableFuture f = this; Completion h; + while ((h = f.stack) != null || + (f != this && (h = (f = this).stack) != null)) { + CompletableFuture d; Completion t; + if (f.casStack(h, t = h.next)) { + if (t != null) { + if (f != this) { + pushStack(h); + continue; + } + h.next = null; // detach } - d.internalComplete(null, ex); + f = (d = h.tryFire(NESTED)) == null ? this : d; } - return true; } - private static final long serialVersionUID = 5232453952276885070L; } - static final class AsyncSupply extends Async { - final Supplier fn; - final CompletableFuture dst; - AsyncSupply(Supplier fn, CompletableFuture dst) { - this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; U u; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - u = fn.get(); - ex = null; - } catch (Throwable rex) { - ex = rex; - u = null; + /** Traverses stack and unlinks dead Completions. */ + final void cleanStack() { + for (Completion p = null, q = stack; q != null;) { + Completion s = q.next; + if (q.isLive()) { + p = q; + q = s; + } + else if (p == null) { + casStack(q, s); + q = stack; + } + else { + p.next = s; + if (p.isLive()) + q = s; + else { + p = null; // restart + q = stack; } - d.internalComplete(u, ex); } - return true; } - private static final long serialVersionUID = 5232453952276885070L; } - static final class AsyncApply extends Async { - final T arg; - final Function fn; - final CompletableFuture dst; - AsyncApply(T arg, Function fn, - CompletableFuture dst) { - this.arg = arg; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture d; U u; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - u = fn.apply(arg); - ex = null; - } catch (Throwable rex) { - ex = rex; - u = null; - } - d.internalComplete(u, ex); + /* ------------- One-input Completions -------------- */ + + /** A Completion with a source, dependent, and executor. */ + @SuppressWarnings("serial") + abstract static class UniCompletion extends Completion { + Executor executor; // executor to use (null if none) + CompletableFuture dep; // the dependent to complete + CompletableFuture src; // source for action + + UniCompletion(Executor executor, CompletableFuture dep, + CompletableFuture src) { + this.executor = executor; this.dep = dep; this.src = src; + } + + /** + * Returns true if action can be run. Call only when known to + * be triggerable. Uses FJ tag bit to ensure that only one + * thread claims ownership. If async, starts as task -- a + * later call to tryFire will run action. + */ + final boolean claim() { + Executor e = executor; + if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { + if (e == null) + return true; + executor = null; // disable + e.execute(this); } - return true; + return false; } - private static final long serialVersionUID = 5232453952276885070L; + + final boolean isLive() { return dep != null; } } - static final class AsyncCombine extends Async { - final T arg1; - final U arg2; - final BiFunction fn; - final CompletableFuture dst; - AsyncCombine(T arg1, U arg2, - BiFunction fn, - CompletableFuture dst) { - this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; + /** Pushes the given completion (if it exists) unless done. */ + final void push(UniCompletion c) { + if (c != null) { + while (result == null && !tryPushStack(c)) + lazySetNext(c, null); // clear on failure } - public final boolean exec() { - CompletableFuture d; V v; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - v = fn.apply(arg1, arg2); - ex = null; - } catch (Throwable rex) { - ex = rex; - v = null; - } - d.internalComplete(v, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; } - static final class AsyncAccept extends Async { - final T arg; - final Consumer fn; - final CompletableFuture dst; - AsyncAccept(T arg, Consumer fn, - CompletableFuture dst) { - this.arg = arg; this.fn = fn; this.dst = dst; + /** + * Post-processing by dependent after successful UniCompletion + * tryFire. Tries to clean stack of source a, and then either runs + * postComplete or returns this to caller, depending on mode. + */ + final CompletableFuture postFire(CompletableFuture a, int mode) { + if (a != null && a.stack != null) { + if (mode < 0 || a.result == null) + a.cleanStack(); + else + a.postComplete(); } - public final boolean exec() { - CompletableFuture d; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - fn.accept(arg); - ex = null; - } catch (Throwable rex) { - ex = rex; - } - d.internalComplete(null, ex); - } - return true; + if (result != null && stack != null) { + if (mode < 0) + return this; + else + postComplete(); } - private static final long serialVersionUID = 5232453952276885070L; + return null; } - static final class AsyncAcceptBoth extends Async { - final T arg1; - final U arg2; - final BiConsumer fn; - final CompletableFuture dst; - AsyncAcceptBoth(T arg1, U arg2, - BiConsumer fn, - CompletableFuture dst) { - this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; + @SuppressWarnings("serial") + static final class UniApply extends UniCompletion { + Function fn; + UniApply(Executor executor, CompletableFuture dep, + CompletableFuture src, + Function fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniApply(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); } - public final boolean exec() { - CompletableFuture d; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - fn.accept(arg1, arg2); - ex = null; - } catch (Throwable rex) { - ex = rex; + } + + final boolean uniApply(CompletableFuture a, + Function f, + UniApply c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; } - d.internalComplete(null, ex); + r = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") S s = (S) r; + completeValue(f.apply(s)); + } catch (Throwable ex) { + completeThrowable(ex); } - return true; } - private static final long serialVersionUID = 5232453952276885070L; + return true; } - static final class AsyncCompose extends Async { - final T arg; - final Function> fn; - final CompletableFuture dst; - AsyncCompose(T arg, - Function> fn, - CompletableFuture dst) { - this.arg = arg; this.fn = fn; this.dst = dst; + private CompletableFuture uniApplyStage( + Executor e, Function f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniApply(this, f, null)) { + UniApply c = new UniApply(e, d, this, f); + push(c); + c.tryFire(SYNC); } - public final boolean exec() { - CompletableFuture d, fr; U u; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - CompletionStage cs = fn.apply(arg); - fr = (cs == null) ? null : cs.toCompletableFuture(); - ex = (fr == null) ? new NullPointerException() : null; - } catch (Throwable rex) { - ex = rex; - fr = null; - } - if (ex != null) - u = null; - else { - Object r = fr.result; - if (r == null) - r = fr.waitingGet(false); - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U ur = (U) r; - u = ur; - } + return d; + } + + @SuppressWarnings("serial") + static final class UniAccept extends UniCompletion { + Consumer fn; + UniAccept(Executor executor, CompletableFuture dep, + CompletableFuture src, Consumer fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniAccept(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniAccept(CompletableFuture a, + Consumer f, UniAccept c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; } - d.internalComplete(u, ex); + r = null; } - return true; + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") S s = (S) r; + f.accept(s); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture uniAcceptStage(Executor e, + Consumer f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniAccept(this, f, null)) { + UniAccept c = new UniAccept(e, d, this, f); + push(c); + c.tryFire(SYNC); } - private static final long serialVersionUID = 5232453952276885070L; + return d; } - static final class AsyncWhenComplete extends Async { - final T arg1; - final Throwable arg2; - final BiConsumer fn; - final CompletableFuture dst; - AsyncWhenComplete(T arg1, Throwable arg2, - BiConsumer fn, - CompletableFuture dst) { - this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; + @SuppressWarnings("serial") + static final class UniRun extends UniCompletion { + Runnable fn; + UniRun(Executor executor, CompletableFuture dep, + CompletableFuture src, Runnable fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniRun(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); } - public final boolean exec() { - CompletableFuture d; - if ((d = this.dst) != null && d.result == null) { - Throwable ex = arg2; + } + + final boolean uniRun(CompletableFuture a, Runnable f, UniRun c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else try { - fn.accept(arg1, ex); - } catch (Throwable rex) { - if (ex == null) - ex = rex; + if (c != null && !c.claim()) + return false; + f.run(); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); } - d.internalComplete(arg1, ex); - } - return true; } - private static final long serialVersionUID = 5232453952276885070L; + return true; } - /* ------------- Completions -------------- */ - - /** - * Simple linked list nodes to record completions, used in - * basically the same way as WaitNodes. (We separate nodes from - * the Completions themselves mainly because for the And and Or - * methods, the same Completion object resides in two lists.) - */ - static final class CompletionNode { - final Completion completion; - volatile CompletionNode next; - CompletionNode(Completion completion) { this.completion = completion; } + private CompletableFuture uniRunStage(Executor e, Runnable f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniRun(this, f, null)) { + UniRun c = new UniRun(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; } - // Opportunistically subclass AtomicInteger to use compareAndSet to claim. @SuppressWarnings("serial") - abstract static class Completion extends AtomicInteger implements Runnable { - } - - static final class ThenApply extends Completion { - final CompletableFuture src; - final Function fn; - final CompletableFuture dst; - final Executor executor; - ThenApply(CompletableFuture src, - Function fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final Function fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } + static final class UniWhenComplete extends UniCompletion { + BiConsumer fn; + UniWhenComplete(Executor executor, CompletableFuture dep, + CompletableFuture src, + BiConsumer fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenAccept extends Completion { - final CompletableFuture src; - final Consumer fn; - final CompletableFuture dst; - final Executor executor; - ThenAccept(CompletableFuture src, - Consumer fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final Consumer fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { + } + + final boolean uniWhenComplete(CompletableFuture a, + BiConsumer f, + UniWhenComplete c) { + Object r; T t; Throwable x = null; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + try { + if (c != null && !c.claim()) + return false; if (r instanceof AltResult) { - ex = ((AltResult)r).ex; + x = ((AltResult)r).ex; t = null; - } - else { - ex = null; + } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; } - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } + f.accept(t, x); + if (x == null) { + internalComplete(r); + return true; } - if (e == null || ex != null) - dst.internalComplete(null, ex); + } catch (Throwable ex) { + if (x == null) + x = ex; } + completeThrowable(x, r); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenRun extends Completion { - final CompletableFuture src; - final Runnable fn; - final CompletableFuture dst; - final Executor executor; - ThenRun(CompletableFuture src, - Runnable fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final Runnable fn; - final CompletableFuture dst; - Object r; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(fn, dst)); - else - fn.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } + return true; + } + + private CompletableFuture uniWhenCompleteStage( + Executor e, BiConsumer f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniWhenComplete(this, f, null)) { + UniWhenComplete c = new UniWhenComplete(e, d, this, f); + push(c); + c.tryFire(SYNC); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenCombine extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final BiFunction fn; - final CompletableFuture dst; - final Executor executor; - ThenCombine(CompletableFuture src, - CompletableFuture snd, - BiFunction fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final BiFunction fn; - final CompletableFuture dst; - Object r, s; T t; U u; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - Executor e = executor; - V v = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncCombine(t, u, fn, dst)); - else - v = fn.apply(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(v, ex); - } + return d; + } + + @SuppressWarnings("serial") + static final class UniHandle extends UniCompletion { + BiFunction fn; + UniHandle(Executor executor, CompletableFuture dep, + CompletableFuture src, + BiFunction fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniHandle(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenAcceptBoth extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final BiConsumer fn; - final CompletableFuture dst; - final Executor executor; - ThenAcceptBoth(CompletableFuture src, - CompletableFuture snd, - BiConsumer fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final BiConsumer fn; - final CompletableFuture dst; - Object r, s; T t; U u; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { + } + + final boolean uniHandle(CompletableFuture a, + BiFunction f, + UniHandle c) { + Object r; S s; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + try { + if (c != null && !c.claim()) + return false; if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; + x = ((AltResult)r).ex; + s = null; + } else { + x = null; + @SuppressWarnings("unchecked") S ss = (S) r; + s = ss; } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAcceptBoth(t, u, fn, dst)); - else - fn.accept(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); + completeValue(f.apply(s, x)); + } catch (Throwable ex) { + completeThrowable(ex); } } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class RunAfterBoth extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final Runnable fn; - final CompletableFuture dst; - final Executor executor; - RunAfterBoth(CompletableFuture src, - CompletableFuture snd, - Runnable fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final Runnable fn; - final CompletableFuture dst; - Object r, s; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(fn, dst)); - else - fn.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } + return true; + } + + private CompletableFuture uniHandleStage( + Executor e, BiFunction f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.uniHandle(this, f, null)) { + UniHandle c = new UniHandle(e, d, this, f); + push(c); + c.tryFire(SYNC); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AndCompletion extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final CompletableFuture dst; - AndCompletion(CompletableFuture src, - CompletableFuture snd, - CompletableFuture dst) { - this.src = src; this.snd = snd; this.dst = dst; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final CompletableFuture dst; - Object r, s; Throwable ex; - if ((dst = this.dst) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - dst.internalComplete(null, ex); - } + return d; + } + + @SuppressWarnings("serial") + static final class UniExceptionally extends UniCompletion { + Function fn; + UniExceptionally(CompletableFuture dep, CompletableFuture src, + Function fn) { + super(null, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { // never ASYNC + // assert mode != ASYNC; + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ApplyToEither extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final Function fn; - final CompletableFuture dst; - final Executor executor; - ApplyToEither(CompletableFuture src, - CompletableFuture snd, - Function fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final Function fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); + } + + final boolean uniExceptionally(CompletableFuture a, + Function f, + UniExceptionally c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + try { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { + if (c != null && !c.claim()) + return false; + completeValue(f.apply(x)); + } else + internalComplete(r); + } catch (Throwable ex) { + completeThrowable(ex); } } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AcceptEither extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final Consumer fn; - final CompletableFuture dst; - final Executor executor; - AcceptEither(CompletableFuture src, - CompletableFuture snd, - Consumer fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final Consumer fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } + return true; + } + + private CompletableFuture uniExceptionallyStage( + Function f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (!d.uniExceptionally(this, f, null)) { + UniExceptionally c = new UniExceptionally(d, this, f); + push(c); + c.tryFire(SYNC); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class RunAfterEither extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final Runnable fn; - final CompletableFuture dst; - final Executor executor; - RunAfterEither(CompletableFuture src, - CompletableFuture snd, - Runnable fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final Runnable fn; - final CompletableFuture dst; - Object r; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(fn, dst)); - else - fn.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } + return d; + } + + @SuppressWarnings("serial") + static final class UniRelay extends UniCompletion { // for Compose + UniRelay(CompletableFuture dep, CompletableFuture src) { + super(null, dep, src); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class OrCompletion extends Completion { - final CompletableFuture src; - final CompletableFuture snd; - final CompletableFuture dst; - OrCompletion(CompletableFuture src, - CompletableFuture snd, - CompletableFuture dst) { - this.src = src; this.snd = snd; this.dst = dst; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture b; - final CompletableFuture dst; - Object r, t; Throwable ex; - if ((dst = this.dst) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - t = r; - } - dst.internalComplete(t, ex); - } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || !d.uniRelay(a = src)) + return null; + src = null; dep = null; + return d.postFire(a, mode); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ExceptionCompletion extends Completion { - final CompletableFuture src; - final Function fn; - final CompletableFuture dst; - ExceptionCompletion(CompletableFuture src, - Function fn, - CompletableFuture dst) { - this.src = src; this.fn = fn; this.dst = dst; - } - public final void run() { - final CompletableFuture a; - final Function fn; - final CompletableFuture dst; - Object r; T t = null; Throwable ex, dx = null; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if ((r instanceof AltResult) && - (ex = ((AltResult)r).ex) != null) { - try { - t = fn.apply(ex); - } catch (Throwable rex) { - dx = rex; - } - } - else { - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - dst.internalComplete(t, dx); - } + } + + final boolean uniRelay(CompletableFuture a) { + Object r; + if (a == null || (r = a.result) == null) + return false; + if (result == null) // no need to claim + completeRelay(r); + return true; + } + + @SuppressWarnings("serial") + static final class UniCompose extends UniCompletion { + Function> fn; + UniCompose(Executor executor, CompletableFuture dep, + CompletableFuture src, + Function> fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; CompletableFuture a; + if ((d = dep) == null || + !d.uniCompose(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class WhenCompleteCompletion extends Completion { - final CompletableFuture src; - final BiConsumer fn; - final CompletableFuture dst; - final Executor executor; - WhenCompleteCompletion(CompletableFuture src, - BiConsumer fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final BiConsumer fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; + } + + final boolean uniCompose( + CompletableFuture a, + Function> f, + UniCompose c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; } - Executor e = executor; - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncWhenComplete(t, ex, fn, dst)); - else - fn.accept(t, ex); - } catch (Throwable rex) { - dx = rex; + r = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") S s = (S) r; + CompletableFuture g = f.apply(s).toCompletableFuture(); + if (g.result == null || !uniRelay(g)) { + UniRelay copy = new UniRelay(this, g); + g.push(copy); + copy.tryFire(SYNC); + if (result == null) + return false; + } + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture uniComposeStage( + Executor e, Function> f) { + if (f == null) throw new NullPointerException(); + Object r; Throwable x; + if (e == null && (r = result) != null) { + // try to return function result directly + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + return new CompletableFuture(encodeThrowable(x, r)); } - if (e == null || dx != null) - dst.internalComplete(t, ex != null ? ex : dx); + r = null; + } + try { + @SuppressWarnings("unchecked") T t = (T) r; + return f.apply(t).toCompletableFuture(); + } catch (Throwable ex) { + return new CompletableFuture(encodeThrowable(ex)); } } - private static final long serialVersionUID = 5232453952276885070L; + CompletableFuture d = new CompletableFuture(); + UniCompose c = new UniCompose(e, d, this, f); + push(c); + c.tryFire(SYNC); + return d; } - static final class ThenCopy extends Completion { - final CompletableFuture src; - final CompletableFuture dst; - ThenCopy(CompletableFuture src, - CompletableFuture dst) { - this.src = src; this.dst = dst; + /* ------------- Two-input Completions -------------- */ + + /** A Completion for an action with two sources */ + @SuppressWarnings("serial") + abstract static class BiCompletion extends UniCompletion { + CompletableFuture snd; // second source for action + BiCompletion(Executor executor, CompletableFuture dep, + CompletableFuture src, CompletableFuture snd) { + super(executor, dep, src); this.snd = snd; } - public final void run() { - final CompletableFuture a; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - dst.internalComplete(t, ex); - } + } + + /** A Completion delegating to a BiCompletion */ + @SuppressWarnings("serial") + static final class CoCompletion extends Completion { + BiCompletion base; + CoCompletion(BiCompletion base) { this.base = base; } + final CompletableFuture tryFire(int mode) { + BiCompletion c; CompletableFuture d; + if ((c = base) == null || (d = c.tryFire(mode)) == null) + return null; + base = null; // detach + return d; } - private static final long serialVersionUID = 5232453952276885070L; - } - - // version of ThenCopy for CompletableFuture dst - static final class ThenPropagate extends Completion { - final CompletableFuture src; - final CompletableFuture dst; - ThenPropagate(CompletableFuture src, - CompletableFuture dst) { - this.src = src; this.dst = dst; - } - public final void run() { - final CompletableFuture a; - final CompletableFuture dst; - Object r; Throwable ex; - if ((dst = this.dst) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - dst.internalComplete(null, ex); - } + final boolean isLive() { + BiCompletion c; + return (c = base) != null && c.dep != null; } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class HandleCompletion extends Completion { - final CompletableFuture src; - final BiFunction fn; - final CompletableFuture dst; - final Executor executor; - HandleCompletion(CompletableFuture src, - BiFunction fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final BiFunction fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - U u = null; - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncCombine(t, ex, fn, dst)); - else - u = fn.apply(t, ex); - } catch (Throwable rex) { - dx = rex; - } - if (e == null || dx != null) - dst.internalComplete(u, dx); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenCompose extends Completion { - final CompletableFuture src; - final Function> fn; - final CompletableFuture dst; - final Executor executor; - ThenCompose(CompletableFuture src, - Function> fn, - CompletableFuture dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture a; - final Function> fn; - final CompletableFuture dst; - Object r; T t; Throwable ex; Executor e; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - CompletableFuture c = null; - U u = null; - boolean complete = false; - if (ex == null) { - if ((e = executor) != null) - execAsync(e, new AsyncCompose(t, fn, dst)); - else { - try { - CompletionStage cs = fn.apply(t); - c = (cs == null) ? null : cs.toCompletableFuture(); - if (c == null) - ex = new NullPointerException(); - } catch (Throwable rex) { - ex = rex; - } - } - } - if (c != null) { - ThenCopy d = null; - Object s; - if ((s = c.result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenCopy(c, dst)); - while ((s = c.result) == null) { - if (UNSAFE.compareAndSwapObject - (c, COMPLETIONS, p.next = c.completions, p)) - break; - } - } - if (s != null && (d == null || d.compareAndSet(0, 1))) { - complete = true; - if (s instanceof AltResult) { - ex = ((AltResult)s).ex; // no rewrap - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - } - } - if (complete || ex != null) - dst.internalComplete(u, ex); - if (c != null) - c.helpPostComplete(); + } + + /** Pushes completion to this and b unless both done. */ + final void bipush(CompletableFuture b, BiCompletion c) { + if (c != null) { + Object r; + while ((r = result) == null && !tryPushStack(c)) + lazySetNext(c, null); // clear on failure + if (b != null && b != this && b.result == null) { + Completion q = (r != null) ? c : new CoCompletion(c); + while (b.result == null && !b.tryPushStack(q)) + lazySetNext(q, null); // clear on failure } } - private static final long serialVersionUID = 5232453952276885070L; } - // Implementations of stage methods with (plain, async, Executor) forms + /** Post-processing after successful BiCompletion tryFire. */ + final CompletableFuture postFire(CompletableFuture a, + CompletableFuture b, int mode) { + if (b != null && b.stack != null) { // clean second source + if (mode < 0 || b.result == null) + b.cleanStack(); + else + b.postComplete(); + } + return postFire(a, mode); + } - private CompletableFuture doThenApply - (Function fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenApply d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenApply(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } + @SuppressWarnings("serial") + static final class BiApply extends BiCompletion { + BiFunction fn; + BiApply(Executor executor, CompletableFuture dep, + CompletableFuture src, CompletableFuture snd, + BiFunction fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; + } + + final boolean biApply(CompletableFuture a, + CompletableFuture b, + BiFunction f, + BiApply c) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null || f == null) + return false; + tryComplete: if (result == null) { if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; } - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; + if (s instanceof AltResult) { + if ((x = ((AltResult)s).ex) != null) { + completeThrowable(x, s); + break tryComplete; } + s = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") R rr = (R) r; + @SuppressWarnings("unchecked") S ss = (S) s; + completeValue(f.apply(rr, ss)); + } catch (Throwable ex) { + completeThrowable(ex); } - if (e == null || ex != null) - dst.internalComplete(u, ex); } - helpPostComplete(); - return dst; + return true; } - private CompletableFuture doThenAccept(Consumer fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenAccept d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenAccept(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } + private CompletableFuture biApplyStage( + Executor e, CompletionStage o, + BiFunction f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.biApply(this, b, f, null)) { + BiApply c = new BiApply(e, d, this, b, f); + bipush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class BiAccept extends BiCompletion { + BiConsumer fn; + BiAccept(Executor executor, CompletableFuture dep, + CompletableFuture src, CompletableFuture snd, + BiConsumer fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; + } + + final boolean biAccept(CompletableFuture a, + CompletableFuture b, + BiConsumer f, + BiAccept c) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null || f == null) + return false; + tryComplete: if (result == null) { if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; } - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; + if (s instanceof AltResult) { + if ((x = ((AltResult)s).ex) != null) { + completeThrowable(x, s); + break tryComplete; } + s = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") R rr = (R) r; + @SuppressWarnings("unchecked") S ss = (S) s; + f.accept(rr, ss); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); } - if (e == null || ex != null) - dst.internalComplete(null, ex); } - helpPostComplete(); - return dst; + return true; } - private CompletableFuture doThenRun(Runnable action, - Executor e) { - if (action == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenRun d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenRun(this, action, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } + private CompletableFuture biAcceptStage( + Executor e, CompletionStage o, + BiConsumer f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.biAccept(this, b, f, null)) { + BiAccept c = new BiAccept(e, d, this, b, f); + bipush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class BiRun extends BiCompletion { + Runnable fn; + BiRun(Executor executor, CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd, + Runnable fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; + } + + final boolean biRun(CompletableFuture a, CompletableFuture b, + Runnable f, BiRun c) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null || f == null) + return false; + if (result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) + completeThrowable(x, s); else - ex = null; - if (ex == null) { try { - if (e != null) - execAsync(e, new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - return dst; - } - - private CompletableFuture doThenCombine - (CompletableFuture other, - BiFunction fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenCombine d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new ThenCombine(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); + if (c != null && !c.claim()) + return false; + f.run(); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); } - } } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - T t; U u; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - V v = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncCombine(t, u, fn, dst)); - else - v = fn.apply(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(v, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private CompletableFuture doThenAcceptBoth - (CompletableFuture other, - BiConsumer fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ThenAcceptBoth d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new ThenAcceptBoth(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); - } - } + return true; + } + + private CompletableFuture biRunStage(Executor e, CompletionStage o, + Runnable f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.biRun(this, b, f, null)) { + BiRun c = new BiRun<>(e, d, this, b, f); + bipush(b, c); + c.tryFire(SYNC); } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - T t; U u; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAcceptBoth(t, u, fn, dst)); - else - fn.accept(t, u); - } catch (Throwable rex) { - ex = rex; - } + return d; + } + + @SuppressWarnings("serial") + static final class BiRelay extends BiCompletion { // for And + BiRelay(CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd) { + super(null, dep, src, snd); + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || !d.biRelay(a = src, b = snd)) + return null; + src = null; snd = null; dep = null; + return d.postFire(a, b, mode); + } + } + + boolean biRelay(CompletableFuture a, CompletableFuture b) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null) + return false; + if (result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) + completeThrowable(x, s); + else + completeNull(); + } + return true; + } + + /** Recursively constructs a tree of completions. */ + static CompletableFuture andTree(CompletableFuture[] cfs, + int lo, int hi) { + CompletableFuture d = new CompletableFuture(); + if (lo > hi) // empty + d.result = NIL; + else { + CompletableFuture a, b; + int mid = (lo + hi) >>> 1; + if ((a = (lo == mid ? cfs[lo] : + andTree(cfs, lo, mid))) == null || + (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : + andTree(cfs, mid+1, hi))) == null) + throw new NullPointerException(); + if (!d.biRelay(a, b)) { + BiRelay c = new BiRelay<>(d, a, b); + a.bipush(b, c); + c.tryFire(SYNC); } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private CompletableFuture doRunAfterBoth(CompletableFuture other, - Runnable action, - Executor e) { - if (other == null || action == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - RunAfterBoth d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new RunAfterBoth(this, other, action, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); + } + return d; + } + + /* ------------- Projected (Ored) BiCompletions -------------- */ + + /** Pushes completion to this and b unless either done. */ + final void orpush(CompletableFuture b, BiCompletion c) { + if (c != null) { + while ((b == null || b.result == null) && result == null) { + if (tryPushStack(c)) { + if (b != null && b != this && b.result == null) { + Completion q = new CoCompletion(c); + while (result == null && b.result == null && + !b.tryPushStack(q)) + lazySetNext(q, null); // clear on failure + } + break; } + lazySetNext(c, null); // clear on failure } } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); + } + + @SuppressWarnings("serial") + static final class OrApply extends BiCompletion { + Function fn; + OrApply(Executor executor, CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd, + Function fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); } - helpPostComplete(); - other.helpPostComplete(); - return dst; } - private CompletableFuture doApplyToEither - (CompletableFuture other, - Function fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ApplyToEither d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new ApplyToEither(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; + final boolean orApply(CompletableFuture a, + CompletableFuture b, + Function f, + OrApply c) { + Object r; Throwable x; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null) || f == null) + return false; + tryComplete: if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); + @SuppressWarnings("unchecked") R rr = (R) r; + completeValue(f.apply(rr)); + } catch (Throwable ex) { + completeThrowable(ex); } } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); + return true; + } + + private CompletableFuture orApplyStage( + Executor e, CompletionStage o, + Function f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.orApply(this, b, f, null)) { + OrApply c = new OrApply(e, d, this, b, f); + orpush(b, c); + c.tryFire(SYNC); } - helpPostComplete(); - other.helpPostComplete(); - return dst; + return d; } - private CompletableFuture doAcceptEither - (CompletableFuture other, - Consumer fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - AcceptEither d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new AcceptEither(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; + @SuppressWarnings("serial") + static final class OrAccept extends BiCompletion { + Consumer fn; + OrAccept(Executor executor, CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd, + Consumer fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final boolean orAccept(CompletableFuture a, + CompletableFuture b, + Consumer f, + OrAccept c) { + Object r; Throwable x; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null) || f == null) + return false; + tryComplete: if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); + @SuppressWarnings("unchecked") R rr = (R) r; + f.accept(rr); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); } } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; + return true; + } + + private CompletableFuture orAcceptStage( + Executor e, CompletionStage o, Consumer f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.orAccept(this, b, f, null)) { + OrAccept c = new OrAccept(e, d, this, b, f); + orpush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class OrRun extends BiCompletion { + Runnable fn; + OrRun(Executor executor, CompletableFuture dep, + CompletableFuture src, + CompletableFuture snd, + Runnable fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || + !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final boolean orRun(CompletableFuture a, CompletableFuture b, + Runnable f, OrRun c) { + Object r; Throwable x; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null) || f == null) + return false; + if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else { + f.run(); + completeNull(); } + } catch (Throwable ex) { + completeThrowable(ex); } - if (e == null || ex != null) - dst.internalComplete(null, ex); } - helpPostComplete(); - other.helpPostComplete(); - return dst; + return true; + } + + private CompletableFuture orRunStage(Executor e, CompletionStage o, + Runnable f) { + CompletableFuture b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + if (e != null || !d.orRun(this, b, f, null)) { + OrRun c = new OrRun<>(e, d, this, b, f); + orpush(b, c); + c.tryFire(SYNC); + } + return d; } - private CompletableFuture doRunAfterEither - (CompletableFuture other, - Runnable action, - Executor e) { - if (other == null || action == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - RunAfterEither d = null; + @SuppressWarnings("serial") + static final class OrRelay extends BiCompletion { // for Or + OrRelay(CompletableFuture dep, CompletableFuture src, + CompletableFuture snd) { + super(null, dep, src, snd); + } + final CompletableFuture tryFire(int mode) { + CompletableFuture d; + CompletableFuture a; + CompletableFuture b; + if ((d = dep) == null || !d.orRelay(a = src, b = snd)) + return null; + src = null; snd = null; dep = null; + return d.postFire(a, b, mode); + } + } + + final boolean orRelay(CompletableFuture a, CompletableFuture b) { Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new RunAfterEither(this, other, action, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null)) + return false; + if (result == null) + completeRelay(r); + return true; + } + + /** Recursively constructs a tree of completions. */ + static CompletableFuture orTree(CompletableFuture[] cfs, + int lo, int hi) { + CompletableFuture d = new CompletableFuture(); + if (lo <= hi) { + CompletableFuture a, b; + int mid = (lo + hi) >>> 1; + if ((a = (lo == mid ? cfs[lo] : + orTree(cfs, lo, mid))) == null || + (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : + orTree(cfs, mid+1, hi))) == null) + throw new NullPointerException(); + if (!d.orRelay(a, b)) { + OrRelay c = new OrRelay<>(d, a, b); + a.orpush(b, c); + c.tryFire(SYNC); } } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; + return d; + } + + /* ------------- Zero-input Async forms -------------- */ + + @SuppressWarnings("serial") + static final class AsyncSupply extends ForkJoinTask + implements Runnable, AsynchronousCompletionTask { + CompletableFuture dep; Supplier fn; + AsyncSupply(CompletableFuture dep, Supplier fn) { + this.dep = dep; this.fn = fn; + } + + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) {} + public final boolean exec() { run(); return true; } + + public void run() { + CompletableFuture d; Supplier f; + if ((d = dep) != null && (f = fn) != null) { + dep = null; fn = null; + if (d.result == null) { + try { + d.completeValue(f.get()); + } catch (Throwable ex) { + d.completeThrowable(ex); + } } + d.postComplete(); } - if (e == null || ex != null) - dst.internalComplete(null, ex); } - helpPostComplete(); - other.helpPostComplete(); - return dst; } - private CompletableFuture doThenCompose - (Function> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = null; - ThenCompose d = null; - Object r; - if ((r = result) == null) { - dst = new CompletableFuture(); - CompletionNode p = new CompletionNode - (d = new ThenCompose(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } + static CompletableFuture asyncSupplyStage(Executor e, + Supplier f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + e.execute(new AsyncSupply(d, f)); + return d; + } + + @SuppressWarnings("serial") + static final class AsyncRun extends ForkJoinTask + implements Runnable, AsynchronousCompletionTask { + CompletableFuture dep; Runnable fn; + AsyncRun(CompletableFuture dep, Runnable fn) { + this.dep = dep; this.fn = fn; } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - if (e != null) { - if (dst == null) - dst = new CompletableFuture(); - execAsync(e, new AsyncCompose(t, fn, dst)); - } - else { + + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) {} + public final boolean exec() { run(); return true; } + + public void run() { + CompletableFuture d; Runnable f; + if ((d = dep) != null && (f = fn) != null) { + dep = null; fn = null; + if (d.result == null) { try { - CompletionStage cs = fn.apply(t); - if (cs == null || - (dst = cs.toCompletableFuture()) == null) - ex = new NullPointerException(); - } catch (Throwable rex) { - ex = rex; + f.run(); + d.completeNull(); + } catch (Throwable ex) { + d.completeThrowable(ex); } } - } - if (dst == null) - dst = new CompletableFuture(); - if (ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - dst.helpPostComplete(); - return dst; - } - - private CompletableFuture doWhenComplete - (BiConsumer fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - WhenCompleteCompletion d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new WhenCompleteCompletion - (this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; + d.postComplete(); } } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; + } + + static CompletableFuture asyncRunStage(Executor e, Runnable f) { + if (f == null) throw new NullPointerException(); + CompletableFuture d = new CompletableFuture(); + e.execute(new AsyncRun(d, f)); + return d; + } + + /* ------------- Signallers -------------- */ + + /** + * Completion for recording and releasing a waiting thread. This + * class implements ManagedBlocker to avoid starvation when + * blocking actions pile up in ForkJoinPools. + */ + @SuppressWarnings("serial") + static final class Signaller extends Completion + implements ForkJoinPool.ManagedBlocker { + long nanos; // wait time if timed + final long deadline; // non-zero if timed + volatile int interruptControl; // > 0: interruptible, < 0: interrupted + volatile Thread thread; + + Signaller(boolean interruptible, long nanos, long deadline) { + this.thread = Thread.currentThread(); + this.interruptControl = interruptible ? 1 : 0; + this.nanos = nanos; + this.deadline = deadline; + } + final CompletableFuture tryFire(int ignore) { + Thread w; // no need to atomically claim + if ((w = thread) != null) { + thread = null; + LockSupport.unpark(w); } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; + return null; + } + public boolean isReleasable() { + if (thread == null) + return true; + if (Thread.interrupted()) { + int i = interruptControl; + interruptControl = -1; + if (i > 0) + return true; } - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncWhenComplete(t, ex, fn, dst)); - else - fn.accept(t, ex); - } catch (Throwable rex) { - dx = rex; + if (deadline != 0L && + (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { + thread = null; + return true; } - if (e == null || dx != null) - dst.internalComplete(t, ex != null ? ex : dx); + return false; + } + public boolean block() { + if (isReleasable()) + return true; + else if (deadline == 0L) + LockSupport.park(this); + else if (nanos > 0L) + LockSupport.parkNanos(this, nanos); + return isReleasable(); } - helpPostComplete(); - return dst; + final boolean isLive() { return thread != null; } } - private CompletableFuture doHandle - (BiFunction fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - HandleCompletion d = null; + /** + * Returns raw result after waiting, or null if interruptible and + * interrupted. + */ + private Object waitingGet(boolean interruptible) { + Signaller q = null; + boolean queued = false; + int spins = -1; Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new HandleCompletion - (this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; + while ((r = result) == null) { + if (spins < 0) + spins = (Runtime.getRuntime().availableProcessors() > 1) ? + 1 << 8 : 0; // Use brief spin-wait on multiprocessors + else if (spins > 0) { + if (ThreadLocalRandom.nextSecondarySeed() >= 0) + --spins; + } + else if (q == null) + q = new Signaller(interruptible, 0L, 0L); + else if (!queued) + queued = tryPushStack(q); + else if (interruptible && q.interruptControl < 0) { + q.thread = null; + cleanStack(); + return null; + } + else if (q.thread != null && result == null) { + try { + ForkJoinPool.managedBlock(q); + } catch (InterruptedException ie) { + q.interruptControl = -1; + } } } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; + if (q != null) { + q.thread = null; + if (q.interruptControl < 0) { + if (interruptible) + r = null; // report interruption + else + Thread.currentThread().interrupt(); } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; + } + postComplete(); + return r; + } + + /** + * Returns raw result after waiting, or null if interrupted, or + * throws TimeoutException on timeout. + */ + private Object timedGet(long nanos) throws TimeoutException { + if (Thread.interrupted()) + return null; + if (nanos <= 0L) + throw new TimeoutException(); + long d = System.nanoTime() + nanos; + Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0 + boolean queued = false; + Object r; + // We intentionally don't spin here (as waitingGet does) because + // the call to nanoTime() above acts much like a spin. + while ((r = result) == null) { + if (!queued) + queued = tryPushStack(q); + else if (q.interruptControl < 0 || q.nanos <= 0L) { + q.thread = null; + cleanStack(); + if (q.interruptControl < 0) + return null; + throw new TimeoutException(); } - U u = null; - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncCombine(t, ex, fn, dst)); - else { - u = fn.apply(t, ex); - dx = null; + else if (q.thread != null && result == null) { + try { + ForkJoinPool.managedBlock(q); + } catch (InterruptedException ie) { + q.interruptControl = -1; } - } catch (Throwable rex) { - dx = rex; - u = null; } - if (e == null || dx != null) - dst.internalComplete(u, dx); } - helpPostComplete(); - return dst; + if (q.interruptControl < 0) + r = null; + q.thread = null; + postComplete(); + return r; } - - // public methods + /* ------------- public methods -------------- */ /** * Creates a new incomplete CompletableFuture. @@ -2113,6 +1785,13 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture() { } + /** + * Creates a new complete CompletableFuture with given encoded result. + */ + private CompletableFuture(Object r) { + this.result = r; + } + /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the {@link ForkJoinPool#commonPool()} with @@ -2124,10 +1803,7 @@ public class CompletableFuture implements Future, CompletionStage { * @return the new CompletableFuture */ public static CompletableFuture supplyAsync(Supplier supplier) { - if (supplier == null) throw new NullPointerException(); - CompletableFuture f = new CompletableFuture(); - execAsync(ForkJoinPool.commonPool(), new AsyncSupply(supplier, f)); - return f; + return asyncSupplyStage(asyncPool, supplier); } /** @@ -2143,11 +1819,7 @@ public class CompletableFuture implements Future, CompletionStage { */ public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) { - if (executor == null || supplier == null) - throw new NullPointerException(); - CompletableFuture f = new CompletableFuture(); - execAsync(executor, new AsyncSupply(supplier, f)); - return f; + return asyncSupplyStage(screenExecutor(executor), supplier); } /** @@ -2160,10 +1832,7 @@ public class CompletableFuture implements Future, CompletionStage { * @return the new CompletableFuture */ public static CompletableFuture runAsync(Runnable runnable) { - if (runnable == null) throw new NullPointerException(); - CompletableFuture f = new CompletableFuture(); - execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f)); - return f; + return asyncRunStage(asyncPool, runnable); } /** @@ -2178,11 +1847,7 @@ public class CompletableFuture implements Future, CompletionStage { */ public static CompletableFuture runAsync(Runnable runnable, Executor executor) { - if (executor == null || runnable == null) - throw new NullPointerException(); - CompletableFuture f = new CompletableFuture(); - execAsync(executor, new AsyncRun(runnable, f)); - return f; + return asyncRunStage(screenExecutor(executor), runnable); } /** @@ -2194,9 +1859,7 @@ public class CompletableFuture implements Future, CompletionStage { * @return the completed CompletableFuture */ public static CompletableFuture completedFuture(U value) { - CompletableFuture f = new CompletableFuture(); - f.result = (value == null) ? NIL : value; - return f; + return new CompletableFuture((value == null) ? NIL : value); } /** @@ -2220,21 +1883,8 @@ public class CompletableFuture implements Future, CompletionStage { * while waiting */ public T get() throws InterruptedException, ExecutionException { - Object r; Throwable ex, cause; - if ((r = result) == null && (r = waitingGet(true)) == null) - throw new InterruptedException(); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if ((ex instanceof CompletionException) && - (cause = ex.getCause()) != null) - ex = cause; - throw new ExecutionException(ex); + Object r; + return reportGet((r = result) == null ? waitingGet(true) : r); } /** @@ -2252,24 +1902,9 @@ public class CompletableFuture implements Future, CompletionStage { */ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Object r; Throwable ex, cause; + Object r; long nanos = unit.toNanos(timeout); - if (Thread.interrupted()) - throw new InterruptedException(); - if ((r = result) == null) - r = timedAwaitDone(nanos); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if ((ex instanceof CompletionException) && - (cause = ex.getCause()) != null) - ex = cause; - throw new ExecutionException(ex); + return reportGet((r = result) == null ? timedGet(nanos) : r); } /** @@ -2287,20 +1922,8 @@ public class CompletableFuture implements Future, CompletionStage { * exceptionally or a completion computation threw an exception */ public T join() { - Object r; Throwable ex; - if ((r = result) == null) - r = waitingGet(false); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if (ex instanceof CompletionException) - throw (CompletionException)ex; - throw new CompletionException(ex); + Object r; + return reportJoin((r = result) == null ? waitingGet(false) : r); } /** @@ -2314,20 +1937,8 @@ public class CompletableFuture implements Future, CompletionStage { * exceptionally or a completion computation threw an exception */ public T getNow(T valueIfAbsent) { - Object r; Throwable ex; - if ((r = result) == null) - return valueIfAbsent; - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if (ex instanceof CompletionException) - throw (CompletionException)ex; - throw new CompletionException(ex); + Object r; + return ((r = result) == null) ? valueIfAbsent : reportJoin(r); } /** @@ -2339,9 +1950,7 @@ public class CompletableFuture implements Future, CompletionStage { * to transition to a completed state, else {@code false} */ public boolean complete(T value) { - boolean triggered = result == null && - UNSAFE.compareAndSwapObject(this, RESULT, null, - value == null ? NIL : value); + boolean triggered = completeValue(value); postComplete(); return triggered; } @@ -2356,244 +1965,200 @@ public class CompletableFuture implements Future, CompletionStage { */ public boolean completeExceptionally(Throwable ex) { if (ex == null) throw new NullPointerException(); - boolean triggered = result == null && - UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); + boolean triggered = internalComplete(new AltResult(ex)); postComplete(); return triggered; } - // CompletionStage methods - - public CompletableFuture thenApply - (Function fn) { - return doThenApply(fn, null); + public CompletableFuture thenApply( + Function fn) { + return uniApplyStage(null, fn); } - public CompletableFuture thenApplyAsync - (Function fn) { - return doThenApply(fn, ForkJoinPool.commonPool()); + public CompletableFuture thenApplyAsync( + Function fn) { + return uniApplyStage(asyncPool, fn); } - public CompletableFuture thenApplyAsync - (Function fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenApply(fn, executor); + public CompletableFuture thenApplyAsync( + Function fn, Executor executor) { + return uniApplyStage(screenExecutor(executor), fn); } - public CompletableFuture thenAccept - (Consumer action) { - return doThenAccept(action, null); + public CompletableFuture thenAccept(Consumer action) { + return uniAcceptStage(null, action); } - public CompletableFuture thenAcceptAsync - (Consumer action) { - return doThenAccept(action, ForkJoinPool.commonPool()); + public CompletableFuture thenAcceptAsync(Consumer action) { + return uniAcceptStage(asyncPool, action); } - public CompletableFuture thenAcceptAsync - (Consumer action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenAccept(action, executor); + public CompletableFuture thenAcceptAsync(Consumer action, + Executor executor) { + return uniAcceptStage(screenExecutor(executor), action); } - public CompletableFuture thenRun - (Runnable action) { - return doThenRun(action, null); + public CompletableFuture thenRun(Runnable action) { + return uniRunStage(null, action); } - public CompletableFuture thenRunAsync - (Runnable action) { - return doThenRun(action, ForkJoinPool.commonPool()); + public CompletableFuture thenRunAsync(Runnable action) { + return uniRunStage(asyncPool, action); } - public CompletableFuture thenRunAsync - (Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenRun(action, executor); + public CompletableFuture thenRunAsync(Runnable action, + Executor executor) { + return uniRunStage(screenExecutor(executor), action); } - public CompletableFuture thenCombine - (CompletionStage other, - BiFunction fn) { - return doThenCombine(other.toCompletableFuture(), fn, null); + public CompletableFuture thenCombine( + CompletionStage other, + BiFunction fn) { + return biApplyStage(null, other, fn); } - public CompletableFuture thenCombineAsync - (CompletionStage other, - BiFunction fn) { - return doThenCombine(other.toCompletableFuture(), fn, - ForkJoinPool.commonPool()); + public CompletableFuture thenCombineAsync( + CompletionStage other, + BiFunction fn) { + return biApplyStage(asyncPool, other, fn); } - public CompletableFuture thenCombineAsync - (CompletionStage other, - BiFunction fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenCombine(other.toCompletableFuture(), fn, executor); + public CompletableFuture thenCombineAsync( + CompletionStage other, + BiFunction fn, Executor executor) { + return biApplyStage(screenExecutor(executor), other, fn); } - public CompletableFuture thenAcceptBoth - (CompletionStage other, - BiConsumer action) { - return doThenAcceptBoth(other.toCompletableFuture(), action, null); + public CompletableFuture thenAcceptBoth( + CompletionStage other, + BiConsumer action) { + return biAcceptStage(null, other, action); } - public CompletableFuture thenAcceptBothAsync - (CompletionStage other, - BiConsumer action) { - return doThenAcceptBoth(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture thenAcceptBothAsync( + CompletionStage other, + BiConsumer action) { + return biAcceptStage(asyncPool, other, action); } - public CompletableFuture thenAcceptBothAsync - (CompletionStage other, - BiConsumer action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenAcceptBoth(other.toCompletableFuture(), action, executor); + public CompletableFuture thenAcceptBothAsync( + CompletionStage other, + BiConsumer action, Executor executor) { + return biAcceptStage(screenExecutor(executor), other, action); } - public CompletableFuture runAfterBoth - (CompletionStage other, - Runnable action) { - return doRunAfterBoth(other.toCompletableFuture(), action, null); + public CompletableFuture runAfterBoth(CompletionStage other, + Runnable action) { + return biRunStage(null, other, action); } - public CompletableFuture runAfterBothAsync - (CompletionStage other, - Runnable action) { - return doRunAfterBoth(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture runAfterBothAsync(CompletionStage other, + Runnable action) { + return biRunStage(asyncPool, other, action); } - public CompletableFuture runAfterBothAsync - (CompletionStage other, - Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doRunAfterBoth(other.toCompletableFuture(), action, executor); + public CompletableFuture runAfterBothAsync(CompletionStage other, + Runnable action, + Executor executor) { + return biRunStage(screenExecutor(executor), other, action); } - - public CompletableFuture applyToEither - (CompletionStage other, - Function fn) { - return doApplyToEither(other.toCompletableFuture(), fn, null); + public CompletableFuture applyToEither( + CompletionStage other, Function fn) { + return orApplyStage(null, other, fn); } - public CompletableFuture applyToEitherAsync - (CompletionStage other, - Function fn) { - return doApplyToEither(other.toCompletableFuture(), fn, - ForkJoinPool.commonPool()); + public CompletableFuture applyToEitherAsync( + CompletionStage other, Function fn) { + return orApplyStage(asyncPool, other, fn); } - public CompletableFuture applyToEitherAsync - (CompletionStage other, - Function fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doApplyToEither(other.toCompletableFuture(), fn, executor); + public CompletableFuture applyToEitherAsync( + CompletionStage other, Function fn, + Executor executor) { + return orApplyStage(screenExecutor(executor), other, fn); } - public CompletableFuture acceptEither - (CompletionStage other, - Consumer action) { - return doAcceptEither(other.toCompletableFuture(), action, null); + public CompletableFuture acceptEither( + CompletionStage other, Consumer action) { + return orAcceptStage(null, other, action); } - public CompletableFuture acceptEitherAsync - (CompletionStage other, - Consumer action) { - return doAcceptEither(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture acceptEitherAsync( + CompletionStage other, Consumer action) { + return orAcceptStage(asyncPool, other, action); } - public CompletableFuture acceptEitherAsync - (CompletionStage other, - Consumer action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doAcceptEither(other.toCompletableFuture(), action, executor); + public CompletableFuture acceptEitherAsync( + CompletionStage other, Consumer action, + Executor executor) { + return orAcceptStage(screenExecutor(executor), other, action); } public CompletableFuture runAfterEither(CompletionStage other, Runnable action) { - return doRunAfterEither(other.toCompletableFuture(), action, null); + return orRunStage(null, other, action); } - public CompletableFuture runAfterEitherAsync - (CompletionStage other, - Runnable action) { - return doRunAfterEither(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture runAfterEitherAsync(CompletionStage other, + Runnable action) { + return orRunStage(asyncPool, other, action); } - public CompletableFuture runAfterEitherAsync - (CompletionStage other, - Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doRunAfterEither(other.toCompletableFuture(), action, executor); + public CompletableFuture runAfterEitherAsync(CompletionStage other, + Runnable action, + Executor executor) { + return orRunStage(screenExecutor(executor), other, action); } - public CompletableFuture thenCompose - (Function> fn) { - return doThenCompose(fn, null); + public CompletableFuture thenCompose( + Function> fn) { + return uniComposeStage(null, fn); } - public CompletableFuture thenComposeAsync - (Function> fn) { - return doThenCompose(fn, ForkJoinPool.commonPool()); + public CompletableFuture thenComposeAsync( + Function> fn) { + return uniComposeStage(asyncPool, fn); } - public CompletableFuture thenComposeAsync - (Function> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenCompose(fn, executor); + public CompletableFuture thenComposeAsync( + Function> fn, + Executor executor) { + return uniComposeStage(screenExecutor(executor), fn); } - public CompletableFuture whenComplete - (BiConsumer action) { - return doWhenComplete(action, null); + public CompletableFuture whenComplete( + BiConsumer action) { + return uniWhenCompleteStage(null, action); } - public CompletableFuture whenCompleteAsync - (BiConsumer action) { - return doWhenComplete(action, ForkJoinPool.commonPool()); + public CompletableFuture whenCompleteAsync( + BiConsumer action) { + return uniWhenCompleteStage(asyncPool, action); } - public CompletableFuture whenCompleteAsync - (BiConsumer action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doWhenComplete(action, executor); + public CompletableFuture whenCompleteAsync( + BiConsumer action, Executor executor) { + return uniWhenCompleteStage(screenExecutor(executor), action); } - public CompletableFuture handle - (BiFunction fn) { - return doHandle(fn, null); + public CompletableFuture handle( + BiFunction fn) { + return uniHandleStage(null, fn); } - public CompletableFuture handleAsync - (BiFunction fn) { - return doHandle(fn, ForkJoinPool.commonPool()); + public CompletableFuture handleAsync( + BiFunction fn) { + return uniHandleStage(asyncPool, fn); } - public CompletableFuture handleAsync - (BiFunction fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doHandle(fn, executor); + public CompletableFuture handleAsync( + BiFunction fn, Executor executor) { + return uniHandleStage(screenExecutor(executor), fn); } /** - * Returns this CompletableFuture + * Returns this CompletableFuture. * * @return this CompletableFuture */ @@ -2618,52 +2183,13 @@ public class CompletableFuture implements Future, CompletionStage { * exceptionally * @return the new CompletableFuture */ - public CompletableFuture exceptionally - (Function fn) { - if (fn == null) throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - ExceptionCompletion d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new ExceptionCompletion - (this, fn, dst)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t = null; Throwable ex, dx = null; - if (r instanceof AltResult) { - if ((ex = ((AltResult)r).ex) != null) { - try { - t = fn.apply(ex); - } catch (Throwable rex) { - dx = rex; - } - } - } - else { - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - dst.internalComplete(t, dx); - } - helpPostComplete(); - return dst; + public CompletableFuture exceptionally( + Function fn) { + return uniExceptionallyStage(fn); } /* ------------- Arbitrary-arity constructions -------------- */ - /* - * The basic plan of attack is to recursively form binary - * completion trees of elements. This can be overkill for small - * sets, but scales nicely. The And/All vs Or/Any forms use the - * same idea, but details differ. - */ - /** * Returns a new CompletableFuture that is completed when all of * the given CompletableFutures complete. If any of the given @@ -2688,82 +2214,7 @@ public class CompletableFuture implements Future, CompletionStage { * {@code null} */ public static CompletableFuture allOf(CompletableFuture... cfs) { - int len = cfs.length; // Directly handle empty and singleton cases - if (len > 1) - return allTree(cfs, 0, len - 1); - else { - CompletableFuture dst = new CompletableFuture(); - CompletableFuture f; - if (len == 0) - dst.result = NIL; - else if ((f = cfs[0]) == null) - throw new NullPointerException(); - else { - ThenPropagate d = null; - CompletionNode p = null; - Object r; - while ((r = f.result) == null) { - if (d == null) - d = new ThenPropagate(f, dst); - else if (p == null) - p = new CompletionNode(d); - else if (UNSAFE.compareAndSwapObject - (f, COMPLETIONS, p.next = f.completions, p)) - break; - } - if (r != null && (d == null || d.compareAndSet(0, 1))) - dst.internalComplete(null, (r instanceof AltResult) ? - ((AltResult)r).ex : null); - f.helpPostComplete(); - } - return dst; - } - } - - /** - * Recursively constructs an And'ed tree of CompletableFutures. - * Called only when array known to have at least two elements. - */ - private static CompletableFuture allTree(CompletableFuture[] cfs, - int lo, int hi) { - CompletableFuture fst, snd; - int mid = (lo + hi) >>> 1; - if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null || - (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null) - throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - AndCompletion d = null; - CompletionNode p = null, q = null; - Object r = null, s = null; - while ((r = fst.result) == null || (s = snd.result) == null) { - if (d == null) - d = new AndCompletion(fst, snd, dst); - else if (p == null) - p = new CompletionNode(d); - else if (q == null) { - if (UNSAFE.compareAndSwapObject - (fst, COMPLETIONS, p.next = fst.completions, p)) - q = new CompletionNode(d); - } - else if (UNSAFE.compareAndSwapObject - (snd, COMPLETIONS, q.next = snd.completions, q)) - break; - } - if ((r != null || (r = fst.result) != null) && - (s != null || (s = snd.result) != null) && - (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - dst.internalComplete(null, ex); - } - fst.helpPostComplete(); - snd.helpPostComplete(); - return dst; + return andTree(cfs, 0, cfs.length - 1); } /** @@ -2782,92 +2233,7 @@ public class CompletableFuture implements Future, CompletionStage { * {@code null} */ public static CompletableFuture anyOf(CompletableFuture... cfs) { - int len = cfs.length; // Same idea as allOf - if (len > 1) - return anyTree(cfs, 0, len - 1); - else { - CompletableFuture dst = new CompletableFuture(); - CompletableFuture f; - if (len == 0) - ; // skip - else if ((f = cfs[0]) == null) - throw new NullPointerException(); - else { - ThenCopy d = null; - CompletionNode p = null; - Object r; - while ((r = f.result) == null) { - if (d == null) - d = new ThenCopy(f, dst); - else if (p == null) - p = new CompletionNode(d); - else if (UNSAFE.compareAndSwapObject - (f, COMPLETIONS, p.next = f.completions, p)) - break; - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; Object t; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - t = r; - } - dst.internalComplete(t, ex); - } - f.helpPostComplete(); - } - return dst; - } - } - - /** - * Recursively constructs an Or'ed tree of CompletableFutures. - */ - private static CompletableFuture anyTree(CompletableFuture[] cfs, - int lo, int hi) { - CompletableFuture fst, snd; - int mid = (lo + hi) >>> 1; - if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null || - (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null) - throw new NullPointerException(); - CompletableFuture dst = new CompletableFuture(); - OrCompletion d = null; - CompletionNode p = null, q = null; - Object r; - while ((r = fst.result) == null && (r = snd.result) == null) { - if (d == null) - d = new OrCompletion(fst, snd, dst); - else if (p == null) - p = new CompletionNode(d); - else if (q == null) { - if (UNSAFE.compareAndSwapObject - (fst, COMPLETIONS, p.next = fst.completions, p)) - q = new CompletionNode(d); - } - else if (UNSAFE.compareAndSwapObject - (snd, COMPLETIONS, q.next = snd.completions, q)) - break; - } - if ((r != null || (r = fst.result) != null || - (r = snd.result) != null) && - (d == null || d.compareAndSet(0, 1))) { - Throwable ex; Object t; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - t = r; - } - dst.internalComplete(t, ex); - } - fst.helpPostComplete(); - snd.helpPostComplete(); - return dst; + return orTree(cfs, 0, cfs.length - 1); } /* ------------- Control and status methods -------------- */ @@ -2887,8 +2253,7 @@ public class CompletableFuture implements Future, CompletionStage { */ public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = (result == null) && - UNSAFE.compareAndSwapObject - (this, RESULT, null, new AltResult(new CancellationException())); + internalComplete(new AltResult(new CancellationException())); postComplete(); return cancelled || isCancelled(); } @@ -2940,11 +2305,12 @@ public class CompletableFuture implements Future, CompletionStage { * Forcibly causes subsequent invocations of method {@link #get()} * and related methods to throw the given exception, whether or * not already completed. This method is designed for use only in - * recovery actions, and even in such situations may result in - * ongoing dependent completions using established versus + * error recovery actions, and even in such situations may result + * in ongoing dependent completions using established versus * overwritten outcomes. * * @param ex the exception + * @throws NullPointerException if the exception is null */ public void obtrudeException(Throwable ex) { if (ex == null) throw new NullPointerException(); @@ -2962,7 +2328,7 @@ public class CompletableFuture implements Future, CompletionStage { */ public int getNumberOfDependents() { int count = 0; - for (CompletionNode p = completions; p != null; p = p.next) + for (Completion p = stack; p != null; p = p.next) ++count; return count; } @@ -2993,20 +2359,19 @@ public class CompletableFuture implements Future, CompletionStage { // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long RESULT; - private static final long WAITERS; - private static final long COMPLETIONS; + private static final long STACK; + private static final long NEXT; static { try { - UNSAFE = sun.misc.Unsafe.getUnsafe(); + final sun.misc.Unsafe u; + UNSAFE = u = sun.misc.Unsafe.getUnsafe(); Class k = CompletableFuture.class; - RESULT = UNSAFE.objectFieldOffset - (k.getDeclaredField("result")); - WAITERS = UNSAFE.objectFieldOffset - (k.getDeclaredField("waiters")); - COMPLETIONS = UNSAFE.objectFieldOffset - (k.getDeclaredField("completions")); - } catch (Exception e) { - throw new Error(e); + RESULT = u.objectFieldOffset(k.getDeclaredField("result")); + STACK = u.objectFieldOffset(k.getDeclaredField("stack")); + NEXT = u.objectFieldOffset + (Completion.class.getDeclaredField("next")); + } catch (Exception x) { + throw new Error(x); } } } diff --git a/src/share/classes/java/util/concurrent/CompletionStage.java b/src/share/classes/java/util/concurrent/CompletionStage.java index 6de60980cffbd5a1ac7988aaf6151eb58cc38279..304ede5db506627bbf7f3917414a12521b9908c0 100644 --- a/src/share/classes/java/util/concurrent/CompletionStage.java +++ b/src/share/classes/java/util/concurrent/CompletionStage.java @@ -407,7 +407,7 @@ public interface CompletionStage { /** * Returns a new CompletionStage that, when this and the other * given stage complete normally, executes the given action using - * the supplied executor + * the supplied executor. * * See the {@link CompletionStage} documentation for rules * covering exceptional completion. @@ -569,7 +569,7 @@ public interface CompletionStage { /** * Returns a new CompletionStage that, when either this or the * other given stage complete normally, executes the given action - * using supplied executor. + * using the supplied executor. * * See the {@link CompletionStage} documentation for rules * covering exceptional completion. @@ -649,10 +649,15 @@ public interface CompletionStage { (Function fn); /** - * Returns a new CompletionStage with the same result or exception - * as this stage, and when this stage completes, executes the - * given action with the result (or {@code null} if none) and the - * exception (or {@code null} if none) of this stage. + * Returns a new CompletionStage with the same result or exception as + * this stage, that executes the given action when this stage completes. + * + *

    When this stage is complete, the given action is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} + * if none) of this stage as arguments. The returned stage is completed + * when the action returns. If the supplied action itself encounters an + * exception, then the returned stage exceptionally completes with this + * exception unless this stage also completed exceptionally. * * @param action the action to perform * @return the new CompletionStage @@ -661,12 +666,16 @@ public interface CompletionStage { (BiConsumer action); /** - * Returns a new CompletionStage with the same result or exception - * as this stage, and when this stage completes, executes the - * given action executes the given action using this stage's - * default asynchronous execution facility, with the result (or - * {@code null} if none) and the exception (or {@code null} if - * none) of this stage as arguments. + * Returns a new CompletionStage with the same result or exception as + * this stage, that executes the given action using this stage's + * default asynchronous execution facility when this stage completes. + * + *

    When this stage is complete, the given action is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} + * if none) of this stage as arguments. The returned stage is completed + * when the action returns. If the supplied action itself encounters an + * exception, then the returned stage exceptionally completes with this + * exception unless this stage also completed exceptionally. * * @param action the action to perform * @return the new CompletionStage @@ -675,11 +684,16 @@ public interface CompletionStage { (BiConsumer action); /** - * Returns a new CompletionStage with the same result or exception - * as this stage, and when this stage completes, executes using - * the supplied Executor, the given action with the result (or - * {@code null} if none) and the exception (or {@code null} if - * none) of this stage as arguments. + * Returns a new CompletionStage with the same result or exception as + * this stage, that executes the given action using the supplied + * Executor when this stage completes. + * + *

    When this stage is complete, the given action is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} + * if none) of this stage as arguments. The returned stage is completed + * when the action returns. If the supplied action itself encounters an + * exception, then the returned stage exceptionally completes with this + * exception unless this stage also completed exceptionally. * * @param action the action to perform * @param executor the executor to use for asynchronous execution @@ -693,9 +707,11 @@ public interface CompletionStage { * Returns a new CompletionStage that, when this stage completes * either normally or exceptionally, is executed with this stage's * result and exception as arguments to the supplied function. - * The given function is invoked with the result (or {@code null} - * if none) and the exception (or {@code null} if none) of this - * stage when complete as arguments. + * + *

    When this stage is complete, the given function is invoked + * with the result (or {@code null} if none) and the exception (or + * {@code null} if none) of this stage as arguments, and the + * function's result is used to complete the returned stage. * * @param fn the function to use to compute the value of the * returned CompletionStage @@ -710,9 +726,11 @@ public interface CompletionStage { * either normally or exceptionally, is executed using this stage's * default asynchronous execution facility, with this stage's * result and exception as arguments to the supplied function. - * The given function is invoked with the result (or {@code null} - * if none) and the exception (or {@code null} if none) of this - * stage when complete as arguments. + * + *

    When this stage is complete, the given function is invoked + * with the result (or {@code null} if none) and the exception (or + * {@code null} if none) of this stage as arguments, and the + * function's result is used to complete the returned stage. * * @param fn the function to use to compute the value of the * returned CompletionStage @@ -726,10 +744,12 @@ public interface CompletionStage { * Returns a new CompletionStage that, when this stage completes * either normally or exceptionally, is executed using the * supplied executor, with this stage's result and exception as - * arguments to the supplied function. The given function is - * invoked with the result (or {@code null} if none) and the - * exception (or {@code null} if none) of this stage when complete - * as arguments. + * arguments to the supplied function. + * + *

    When this stage is complete, the given function is invoked + * with the result (or {@code null} if none) and the exception (or + * {@code null} if none) of this stage as arguments, and the + * function's result is used to complete the returned stage. * * @param fn the function to use to compute the value of the * returned CompletionStage