diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index 53d5b7551f361a36f54a5e4404a8a6599689b1b6..9ff1f54f19e7bd2175880bf343747553829616f0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -32,6 +32,7 @@ import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.AbstractDispatcherBootstrap; import org.apache.flink.runtime.dispatcher.Dispatcher; @@ -58,7 +59,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -251,19 +251,11 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap final DispatcherGateway dispatcherGateway, final Collection applicationJobIds, final ScheduledExecutor executor) { - final CompletableFuture[] jobResultFutures = applicationJobIds + final List> jobResultFutures = applicationJobIds .stream() - .map(jobId -> - unwrapJobResultException(getJobResult(dispatcherGateway, jobId, executor))) - .toArray(CompletableFuture[]::new); - - final CompletableFuture allStatusFutures = CompletableFuture.allOf(jobResultFutures); - Stream.of(jobResultFutures) - .forEach(f -> f.exceptionally(e -> { - allStatusFutures.completeExceptionally(e); - return null; - })); - return allStatusFutures; + .map(jobId -> unwrapJobResultException(getJobResult(dispatcherGateway, jobId, executor))) + .collect(Collectors.toList()); + return FutureUtils.waitForAll(jobResultFutures); } private CompletableFuture getJobResult(