提交 c9b99daa 编写于 作者: J Juergen Hoeller

SettableListenableFuture centralizes state in ListenableFutureTask subclass

Issue: SPR-15216
上级 ebaf6e1c
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -63,8 +63,9 @@ public class ListenableFutureTask<T> extends FutureTask<T> implements Listenable ...@@ -63,8 +63,9 @@ public class ListenableFutureTask<T> extends FutureTask<T> implements Listenable
this.callbacks.addFailureCallback(failureCallback); this.callbacks.addFailureCallback(failureCallback);
} }
@Override @Override
protected final void done() { protected void done() {
Throwable cause; Throwable cause;
try { try {
T result = get(); T result = get();
......
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -20,10 +20,8 @@ import java.util.concurrent.Callable; ...@@ -20,10 +20,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
/** /**
* A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture} * A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture}
...@@ -39,15 +37,15 @@ import org.springframework.util.ReflectionUtils; ...@@ -39,15 +37,15 @@ import org.springframework.util.ReflectionUtils;
*/ */
public class SettableListenableFuture<T> implements ListenableFuture<T> { public class SettableListenableFuture<T> implements ListenableFuture<T> {
private final SettableTask<T> settableTask; private static final Callable<Object> DUMMY_CALLABLE = new Callable<Object>() {
@Override
private final ListenableFutureTask<T> listenableFuture; public Object call() throws Exception {
throw new IllegalStateException("Should never be called");
}
};
public SettableListenableFuture() { private final SettableTask<T> settableTask = new SettableTask<>();
this.settableTask = new SettableTask<>();
this.listenableFuture = new ListenableFutureTask<>(this.settableTask);
}
/** /**
...@@ -58,11 +56,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { ...@@ -58,11 +56,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
* @return {@code true} if the value was successfully set, else {@code false} * @return {@code true} if the value was successfully set, else {@code false}
*/ */
public boolean set(T value) { public boolean set(T value) {
boolean success = this.settableTask.setValue(value); return this.settableTask.setResultValue(value);
if (success) {
this.listenableFuture.run();
}
return success;
} }
/** /**
...@@ -74,27 +68,22 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { ...@@ -74,27 +68,22 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
*/ */
public boolean setException(Throwable exception) { public boolean setException(Throwable exception) {
Assert.notNull(exception, "Exception must not be null"); Assert.notNull(exception, "Exception must not be null");
boolean success = this.settableTask.setException(exception); return this.settableTask.setExceptionResult(exception);
if (success) {
this.listenableFuture.run();
}
return success;
} }
@Override @Override
public void addCallback(ListenableFutureCallback<? super T> callback) { public void addCallback(ListenableFutureCallback<? super T> callback) {
this.listenableFuture.addCallback(callback); this.settableTask.addCallback(callback);
} }
@Override @Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.listenableFuture.addCallback(successCallback, failureCallback); this.settableTask.addCallback(successCallback, failureCallback);
} }
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = this.settableTask.setCancelled(); boolean cancelled = this.settableTask.cancel(mayInterruptIfRunning);
this.listenableFuture.cancel(mayInterruptIfRunning);
if (cancelled && mayInterruptIfRunning) { if (cancelled && mayInterruptIfRunning) {
interruptTask(); interruptTask();
} }
...@@ -113,78 +102,76 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { ...@@ -113,78 +102,76 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
/** /**
* Retrieve the value. * Retrieve the value.
* <p>Will return the value if it has been set via {@link #set(Object)}, * <p>This method returns the value if it has been set via {@link #set(Object)},
* throw an {@link java.util.concurrent.ExecutionException} if it has been * throws an {@link java.util.concurrent.ExecutionException} if an exception has
* set via {@link #setException(Throwable)} or throw a * been set via {@link #setException(Throwable)}, or throws a
* {@link java.util.concurrent.CancellationException} if it has been cancelled. * {@link java.util.concurrent.CancellationException} if the future has been cancelled.
* @return The value associated with this future. * @return the value associated with this future
*/ */
@Override @Override
public T get() throws InterruptedException, ExecutionException { public T get() throws InterruptedException, ExecutionException {
return this.listenableFuture.get(); return this.settableTask.get();
} }
/** /**
* Retrieve the value. * Retrieve the value.
* <p>Will return the value if it has been set via {@link #set(Object)}, * <p>This method returns the value if it has been set via {@link #set(Object)},
* throw an {@link java.util.concurrent.ExecutionException} if it has been * throws an {@link java.util.concurrent.ExecutionException} if an exception has
* set via {@link #setException(Throwable)} or throw a * been set via {@link #setException(Throwable)}, or throws a
* {@link java.util.concurrent.CancellationException} if it has been cancelled. * {@link java.util.concurrent.CancellationException} if the future has been cancelled.
* @param timeout the maximum time to wait. * @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument. * @param unit the unit of the timeout argument
* @return The value associated with this future. * @return the value associated with this future
*/ */
@Override @Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.listenableFuture.get(timeout, unit); return this.settableTask.get(timeout, unit);
} }
/** /**
* Subclasses can override this method to implement interruption of the future's * Subclasses can override this method to implement interruption of the future's
* computation. The method is invoked automatically by a successful call to * computation. The method is invoked automatically by a successful call to
* {@link #cancel(boolean) cancel(true)}. * {@link #cancel(boolean) cancel(true)}.
* <p>The default implementation does nothing. * <p>The default implementation is empty.
*/ */
protected void interruptTask() { protected void interruptTask() {
} }
private static class SettableTask<T> implements Callable<T> { private static class SettableTask<T> extends ListenableFutureTask<T> {
private static final Object NO_VALUE = new Object();
private static final Object CANCELLED = new Object();
private final AtomicReference<Object> value = new AtomicReference<>(NO_VALUE); private volatile Thread completingThread;
public boolean setValue(T value) { @SuppressWarnings("unchecked")
return this.value.compareAndSet(NO_VALUE, value); public SettableTask() {
} super((Callable<T>) DUMMY_CALLABLE);
public boolean setException(Throwable exception) {
return this.value.compareAndSet(NO_VALUE, exception);
}
public boolean setCancelled() {
return this.value.compareAndSet(NO_VALUE, CANCELLED);
} }
public boolean isCancelled() { public boolean setResultValue(T value) {
return (this.value.get() == CANCELLED); set(value);
return checkCompletingThread();
} }
public boolean isDone() { public boolean setExceptionResult(Throwable exception) {
return (this.value.get() != NO_VALUE); setException(exception);
return checkCompletingThread();
} }
@SuppressWarnings("unchecked")
@Override @Override
public T call() throws Exception { protected void done() {
Object val = this.value.get(); if (!isCancelled()) {
if (val instanceof Throwable) { // Implicitly invoked by set/setException: store current thread for
ReflectionUtils.rethrowException((Throwable) val); // determining whether the given result has actually triggered completion
// (since FutureTask.set/setException unfortunately don't expose that)
this.completingThread = Thread.currentThread();
} }
return (T) val; super.done();
}
private boolean checkCompletingThread() {
boolean check = (this.completingThread == Thread.currentThread());
this.completingThread = null; // only first check actually counts
return check;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册