[FLINK-7940] Add FutureUtils.orTimeout

This commit adds a convenience function which allows to easily add a timeout to
a CompletableFuture.

This closes #4918.
上级 747cf821
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.concurrent;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.Preconditions;
import akka.dispatch.OnComplete;
......@@ -31,7 +32,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Supplier;
......@@ -445,4 +448,68 @@ public class FutureUtils {
return result;
}
/**
* Times the given future out after the timeout.
*
* @param future to time out
* @param timeout after which the given future is timed out
* @param timeUnit time unit of the timeout
* @param <T> type of the given future
* @return The timeout enriched future
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
future.whenComplete((T value, Throwable throwable) -> {
if (!timeoutFuture.isDone()) {
timeoutFuture.cancel(false);
}
});
return future;
}
/**
* Runnable to complete the given future with a {@link TimeoutException}.
*/
private static final class Timeout implements Runnable {
private final CompletableFuture<?> future;
private Timeout(CompletableFuture<?> future) {
this.future = Preconditions.checkNotNull(future);
}
@Override
public void run() {
future.completeExceptionally(new TimeoutException());
}
}
/**
* Delay scheduler used to timeout futures.
*
* <p>This class creates a singleton scheduler used to run the provided actions.
*/
private static final class Delayer {
static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(
1,
new ExecutorThreadFactory("FlinkCompletableFutureDelayScheduler"));
/**
* Delay the given action by the given delay.
*
* @param runnable to execute after the given delay
* @param delay after which to execute the runnable
* @param timeUnit time unit of the delay
* @return Future of the scheduled action
*/
private static ScheduledFuture<?> delay(Runnable runnable, long delay, TimeUnit timeUnit) {
Preconditions.checkNotNull(runnable);
Preconditions.checkNotNull(timeUnit);
return delayer.schedule(runnable, delay, timeUnit);
}
}
}
......@@ -33,6 +33,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
......@@ -223,4 +224,21 @@ public class FutureUtilsTest extends TestLogger {
assertTrue(retryFuture.isCancelled());
verify(scheduledFutureMock).cancel(anyBoolean());
}
/**
* Tests that a future is timed out after the specified timeout.
*/
@Test
public void testOrTimeout() throws Exception {
final CompletableFuture<String> future = new CompletableFuture<>();
final long timeout = 10L;
FutureUtils.orTimeout(future, timeout, TimeUnit.MILLISECONDS);
try {
future.get();
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册