提交 de34fddf 编写于 作者: K Kostas Kloudas

[minor] Use FutureUtils.waitForAll() in AppDispatcherBootstrap.getApplicationResult()

上级 5f81a36a
......@@ -32,6 +32,7 @@ import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.AbstractDispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.Dispatcher;
......@@ -58,7 +59,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -251,19 +251,11 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap
final DispatcherGateway dispatcherGateway,
final Collection<JobID> applicationJobIds,
final ScheduledExecutor executor) {
final CompletableFuture<?>[] jobResultFutures = applicationJobIds
final List<CompletableFuture<?>> jobResultFutures = applicationJobIds
.stream()
.map(jobId ->
unwrapJobResultException(getJobResult(dispatcherGateway, jobId, executor)))
.toArray(CompletableFuture<?>[]::new);
final CompletableFuture<Void> allStatusFutures = CompletableFuture.allOf(jobResultFutures);
Stream.of(jobResultFutures)
.forEach(f -> f.exceptionally(e -> {
allStatusFutures.completeExceptionally(e);
return null;
}));
return allStatusFutures;
.map(jobId -> unwrapJobResultException(getJobResult(dispatcherGateway, jobId, executor)))
.collect(Collectors.toList());
return FutureUtils.waitForAll(jobResultFutures);
}
private CompletableFuture<JobResult> getJobResult(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册