[FLINK-7664] Replace FlinkFutureException by java.util.concurrent.CompletionException

FlinkFutureException was introduced to fail a CompletableFuture callback. However, there
was already such a class which allows to better handle failures in different stages which
is the java.util.CompletionException. Therefore we replace FlinkFutureException by
CompletionException and remove the former.

This closes #4701.
上级 e9decac6
......@@ -18,10 +18,10 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -30,6 +30,7 @@ import java.io.FilenameFilter;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -80,7 +81,7 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
return writer.toString();
}
catch (Exception e) {
throw new FlinkFutureException("Failed to delete jar id " + pathParams.get("jarid") + '.', e);
throw new CompletionException(new FlinkException("Failed to delete jar id " + pathParams.get("jarid") + '.', e));
}
},
executor);
......
......@@ -19,11 +19,11 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -33,6 +33,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
......@@ -140,7 +141,7 @@ public class JarListHandler extends AbstractJsonRequestHandler {
return writer.toString();
}
catch (Exception e) {
throw new FlinkFutureException("Failed to fetch jar list.", e);
throw new CompletionException(new FlinkException("Failed to fetch jar list.", e));
}
},
executor);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
......@@ -30,6 +29,7 @@ import java.io.File;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -65,7 +65,7 @@ public class JarPlanHandler extends JarActionHandler {
return writer.toString();
}
catch (Exception e) {
throw new FlinkFutureException(e);
throw new CompletionException(e);
}
},
executor);
......
......@@ -24,10 +24,10 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -36,6 +36,7 @@ import java.io.File;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -86,7 +87,7 @@ public class JarRunHandler extends JarActionHandler {
gen.close();
return writer.toString();
} catch (Exception e) {
throw new FlinkFutureException("Could not run the jar.", e);
throw new CompletionException(new FlinkException("Could not run the jar.", e));
}
},
executor);
......
......@@ -20,7 +20,6 @@ package org.apache.flink.runtime.akka;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
......@@ -36,12 +35,14 @@ import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import scala.Option;
import scala.reflect.ClassTag$;
......@@ -101,15 +102,15 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
if (Objects.equals(success.jobId(), jobGraph.getJobID())) {
return Acknowledge.get();
} else {
throw new FlinkFutureException("JobManager responded for wrong Job. This Job: " +
jobGraph.getJobID() + ", response: " + success.jobId());
throw new CompletionException(new FlinkException("JobManager responded for wrong Job. This Job: " +
jobGraph.getJobID() + ", response: " + success.jobId()));
}
} else if (response instanceof JobManagerMessages.JobResultFailure) {
JobManagerMessages.JobResultFailure failure = ((JobManagerMessages.JobResultFailure) response);
throw new FlinkFutureException("Job submission failed.", failure.cause());
throw new CompletionException(new FlinkException("Job submission failed.", failure.cause()));
} else {
throw new FlinkFutureException("Unknown response to SubmitJob message: " + response + '.');
throw new CompletionException(new FlinkException("Unknown response to SubmitJob message: " + response + '.'));
}
}
);
......@@ -127,7 +128,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
if (response instanceof JobManagerMessages.CancellationSuccess) {
return ((JobManagerMessages.CancellationSuccess) response).savepointPath();
} else {
throw new FlinkFutureException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause());
throw new CompletionException(new FlinkException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause()));
}
});
}
......@@ -144,7 +145,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
if (response instanceof JobManagerMessages.CancellationSuccess) {
return Acknowledge.get();
} else {
throw new FlinkFutureException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause());
throw new CompletionException(new FlinkException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause()));
}
});
}
......@@ -161,7 +162,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
if (response instanceof JobManagerMessages.StoppingSuccess) {
return Acknowledge.get();
} else {
throw new FlinkFutureException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause());
throw new CompletionException(new FlinkException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause()));
}
});
}
......@@ -211,7 +212,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
} else if (response instanceof JobManagerMessages.JobNotFound) {
return Optional.empty();
} else {
throw new FlinkFutureException("Unknown response: " + response + '.');
throw new CompletionException(new FlinkException("Unknown response: " + response + '.'));
}
});
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.concurrent;
import org.apache.flink.util.FlinkRuntimeException;
import java.util.concurrent.CompletionStage;
/**
* Base class for exceptions which are thrown in {@link CompletionStage}.
*
* <p>The exception has to extend {@link FlinkRuntimeException} because only
* unchecked exceptions can be thrown in a future's stage. Additionally we let
* it extend the Flink runtime exception because it designates the exception to
* come from a Flink stage.
*/
public class FlinkFutureException extends FlinkRuntimeException {
private static final long serialVersionUID = -8878194471694178210L;
public FlinkFutureException(String message) {
super(message);
}
public FlinkFutureException(Throwable cause) {
super(cause);
}
public FlinkFutureException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
......@@ -40,6 +39,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -223,7 +224,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
}
if (exception != null) {
throw new FlinkFutureException("Could not properly shut down the JobManagerRunner.", exception);
throw new CompletionException(new FlinkException("Could not properly shut down the JobManagerRunner.", exception));
}
});
}
......
......@@ -20,17 +20,19 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
/**
* Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services
......@@ -107,7 +109,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
try {
resourceManagerRuntimeServices.shutDown();
} catch (Exception e) {
throw new FlinkFutureException("Could not properly shut down the resource manager runtime services.", e);
throw new CompletionException(new FlinkException("Could not properly shut down the resource manager runtime services.", e));
}
});
}
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
......@@ -30,6 +29,7 @@ import org.apache.flink.util.Preconditions;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -70,7 +70,7 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
if (optGraph.isPresent()) {
return handleRequest(optGraph.get(), pathParams);
} else {
throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.'));
throw new CompletionException(new NotFoundException("Could not find job with jobId " + jid + '.'));
}
}, executor);
}
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
......@@ -39,6 +38,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders.CLUSTER_OVERVIEW_REST_PATH;
......@@ -96,7 +96,7 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler implement
gen.close();
return writer.toString();
} catch (IOException exception) {
throw new FlinkFutureException("Could not write cluster overview.", exception);
throw new CompletionException(new FlinkException("Could not write cluster overview.", exception));
}
},
executor);
......
......@@ -20,15 +20,16 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
......@@ -104,7 +105,7 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
}
}
catch (Exception e) {
throw new FlinkFutureException("Failed to fetch list of all running jobs.", e);
throw new CompletionException(new FlinkException("Failed to fetch list of all running jobs.", e));
}
},
executor);
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
......@@ -33,6 +32,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -42,6 +42,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -129,7 +130,7 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler imple
gen.close();
return writer.toString();
} catch (IOException e) {
throw new FlinkFutureException("Could not write current jobs overview json.", e);
throw new CompletionException(new FlinkException("Could not write current jobs overview json.", e));
}
},
executor);
......
......@@ -19,10 +19,10 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -57,7 +58,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
try {
return createJobAccumulatorsJson(graph);
} catch (IOException e) {
throw new FlinkFutureException("Could not create job accumulators json.", e);
throw new CompletionException(new FlinkException("Could not create job accumulators json.", e));
}
},
executor);
......
......@@ -20,13 +20,14 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -64,7 +65,7 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
}
}
catch (Exception e) {
throw new FlinkFutureException("Failed to cancel the job with id: " + pathParams.get("jobid"), e);
throw new CompletionException(new FlinkException("Failed to cancel the job with id: " + pathParams.get("jobid"), e));
}
},
executor);
......
......@@ -24,12 +24,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
......@@ -50,6 +50,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -154,12 +155,12 @@ public class JobCancellationWithSavepointHandlers {
return graphFuture.thenApplyAsync(
(Optional<AccessExecutionGraph> optGraph) -> {
final AccessExecutionGraph graph = optGraph.orElseThrow(
() -> new FlinkFutureException(
() -> new CompletionException(
new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
CheckpointCoordinator coord = graph.getCheckpointCoordinator();
if (coord == null) {
throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
throw new CompletionException(new FlinkException("Cannot find CheckpointCoordinator for job."));
}
String targetDirectory = pathParams.get("targetDirectory");
......@@ -177,7 +178,7 @@ public class JobCancellationWithSavepointHandlers {
try {
return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
} catch (IOException e) {
throw new FlinkFutureException("Could not cancel job with savepoint.", e);
throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e));
}
}, executor);
} else {
......@@ -342,7 +343,7 @@ public class JobCancellationWithSavepointHandlers {
}
}
} catch (Exception e) {
throw new FlinkFutureException("Could not handle in progress request.", e);
throw new CompletionException(new FlinkException("Could not handle in progress request.", e));
}
});
}
......
......@@ -19,10 +19,10 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -57,7 +58,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
try {
return createJobConfigJson(graph);
} catch (IOException e) {
throw new FlinkFutureException("Could not write job config json.", e);
throw new CompletionException(new FlinkException("Could not write job config json.", e));
}
},
executor);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
......@@ -29,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -76,7 +77,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
try {
return createJobDetailsJson(graph, fetcher);
} catch (IOException e) {
throw new FlinkFutureException("Could not create job details json.", e);
throw new CompletionException(new FlinkException("Could not create job details json.", e));
}
},
executor);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
......@@ -27,6 +26,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -63,7 +64,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
try {
return createJobExceptionsJson(graph);
} catch (IOException e) {
throw new FlinkFutureException("Could not create job exceptions json.", e);
throw new CompletionException(new FlinkException("Could not create job exceptions json.", e));
}
},
executor
......
......@@ -19,8 +19,8 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -79,7 +80,7 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
gen.close();
return writer.toString();
} catch (IOException e) {
throw new FlinkFutureException("Could not write configuration.", e);
throw new CompletionException(new FlinkException("Could not write configuration.", e));
}
},
executor);
......
......@@ -20,13 +20,14 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -64,7 +65,7 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
}
}
catch (Exception e) {
throw new FlinkFutureException("Failed to stop the job with id: " + pathParams.get("jobid") + '.', e);
throw new CompletionException(new FlinkException("Failed to stop the job with id: " + pathParams.get("jobid") + '.', e));
}
},
executor);
......
......@@ -19,11 +19,11 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -34,6 +34,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -59,7 +60,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
try {
return createVertexAccumulatorsJson(jobVertex);
} catch (IOException e) {
throw new FlinkFutureException("Could not create job vertex accumulators json.", e);
throw new CompletionException(new FlinkException("Could not create job vertex accumulators json.", e));
}
},
executor);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
......@@ -28,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -40,6 +40,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -69,7 +70,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
try {
return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
} catch (IOException e) {
throw new FlinkFutureException("Could not write the vertex details json.", e);
throw new CompletionException(new FlinkException("Could not write the vertex details json.", e));
}
},
executor);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
......@@ -29,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -42,6 +42,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -71,7 +72,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
try {
return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
} catch (IOException e) {
throw new FlinkFutureException("Could not create TaskManager json.", e);
throw new CompletionException(new FlinkException("Could not create TaskManager json.", e));
}
},
executor);
......
......@@ -19,13 +19,13 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -62,7 +63,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
try {
return createAttemptAccumulatorsJson(execAttempt);
} catch (IOException e) {
throw new FlinkFutureException("Could not create accumulator json.", e);
throw new CompletionException(new FlinkException("Could not create accumulator json.", e));
}
},
executor);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
......@@ -29,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -41,6 +41,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
......@@ -71,7 +72,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
try {
return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
} catch (IOException e) {
throw new FlinkFutureException("Could not create attempt details json.", e);
throw new CompletionException(new FlinkException("Could not create attempt details json.", e));
}
},
executor);
......
......@@ -19,13 +19,13 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -61,7 +62,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
try {
return createSubtasksAccumulatorsJson(jobVertex);
} catch (IOException e) {
throw new FlinkFutureException("Could not create subtasks accumulator json.", e);
throw new CompletionException(new FlinkException("Could not create subtasks accumulator json.", e));
}
},
executor);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
......@@ -26,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -62,7 +63,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
try {
return createSubtaskTimesJson(jobVertex);
} catch (IOException e) {
throw new FlinkFutureException("Could not write subtask time json.", e);
throw new CompletionException(new FlinkException("Could not write subtask time json.", e));
}
},
executor);
......
......@@ -32,13 +32,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
......@@ -74,6 +74,7 @@ import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
......@@ -160,7 +161,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
try {
return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
} catch (IOException e) {
throw new FlinkFutureException("Could not create BlobCache.", e);
throw new CompletionException(new FlinkException("Could not create BlobCache.", e));
}
},
executor);
......@@ -178,7 +179,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose(
(Optional<Instance> optTMInstance) -> {
Instance taskManagerInstance = optTMInstance.orElseThrow(
() -> new FlinkFutureException("Could not find instance with " + instanceID + '.'));
() -> new CompletionException(new FlinkException("Could not find instance with " + instanceID + '.')));
switch (fileMode) {
case LOG:
return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);
......@@ -200,7 +201,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
try {
blobCache.deleteGlobal(lastSubmittedFile.get(taskManagerID));
} catch (IOException e) {
throw new FlinkFutureException("Could not delete file for " + taskManagerID + '.', e);
throw new CompletionException(new FlinkException("Could not delete file for " + taskManagerID + '.', e));
}
lastSubmittedFile.put(taskManagerID, blobKey);
}
......@@ -210,7 +211,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
try {
return blobCache.getFile(blobKey).getAbsolutePath();
} catch (IOException e) {
throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
throw new CompletionException(new FlinkException("Could not retrieve blob for " + blobKey + '.', e));
}
},
executor);
......
......@@ -19,13 +19,13 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -37,6 +37,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static java.util.Objects.requireNonNull;
......@@ -83,7 +84,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
pathParams);
} catch (IOException e) {
throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
throw new CompletionException(new FlinkException("Could not write TaskManagers JSON.", e));
}
},
executor);
......@@ -95,7 +96,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
try {
return writeTaskManagersJson(taskManagers, pathParams);
} catch (IOException e) {
throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
throw new CompletionException(new FlinkException("Could not write TaskManagers JSON.", e));
}
},
executor);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
......@@ -27,6 +26,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -61,7 +62,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
try {
return createCheckpointConfigJson(graph);
} catch (IOException e) {
throw new FlinkFutureException("Could not create checkpoint config json.", e);
throw new CompletionException(new FlinkException("Could not create checkpoint config json.", e));
}
},
executor);
......
......@@ -24,13 +24,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -92,7 +93,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
try {
return createCheckpointDetailsJson(checkpoint);
} catch (IOException e) {
throw new FlinkFutureException("Could not create checkpoint details json.", e);
throw new CompletionException(new FlinkException("Could not create checkpoint details json.", e));
}
},
executor);
......
......@@ -27,13 +27,13 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -45,6 +45,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -70,7 +71,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
try {
return createCheckpointStatsJson(graph);
} catch (IOException e) {
throw new FlinkFutureException("Could not create checkpoint stats json.", e);
throw new CompletionException(new FlinkException("Could not create checkpoint stats json.", e));
}
},
executor);
......
......@@ -18,10 +18,10 @@
package org.apache.flink.runtime.rest.handler.legacy.metrics;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonGenerator;
......@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
......@@ -62,7 +63,7 @@ public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler
? getMetricsValues(pathParams, requestedMetricsList)
: getAvailableMetricsList(pathParams);
} catch (IOException e) {
throw new FlinkFutureException("Could not retrieve metrics.", e);
throw new CompletionException(new FlinkException("Could not retrieve metrics.", e));
}
},
executor);
......
......@@ -21,10 +21,11 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
......@@ -37,6 +38,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -397,7 +399,7 @@ public class BlobServerDeleteTest extends TestLogger {
try (BlobClient blobClient = blobServer.createClient()) {
deleteHelper(blobClient, jobId, blobKey);
} catch (IOException e) {
throw new FlinkFutureException("Could not delete the given blob key " + blobKey + '.', e);
throw new CompletionException(new FlinkException("Could not delete the given blob key " + blobKey + '.', e));
}
return null;
......
......@@ -18,14 +18,15 @@
package org.apache.flink.runtime.blob;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
......@@ -36,7 +37,6 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.security.MessageDigest;
......@@ -45,12 +45,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
import static org.junit.Assert.*;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
......@@ -303,7 +308,7 @@ public class BlobServerGetTest extends TestLogger {
return new ByteArrayInputStream(buffer);
} catch (IOException e) {
throw new FlinkFutureException("Could not read blob for key " + blobKey + '.', e);
throw new CompletionException(new FlinkException("Could not read blob for key " + blobKey + '.', e));
}
},
executor);
......
......@@ -22,11 +22,12 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
......@@ -41,6 +42,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
......@@ -475,7 +477,7 @@ public class BlobServerPutTest extends TestLogger {
.put(jobId, new BlockingInputStream(countDownLatch, data));
}
} catch (IOException e) {
throw new FlinkFutureException("Could not upload blob.", e);
throw new CompletionException(new FlinkException("Could not upload blob.", e));
}
},
executor);
......
......@@ -29,6 +29,7 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
......@@ -65,7 +66,7 @@ public class FutureUtilsTest extends TestLogger {
if (atomicInteger.incrementAndGet() == retries) {
return true;
} else {
throw new FlinkFutureException("Test exception");
throw new CompletionException(new FlinkException("Test exception"));
}
},
TestingUtils.defaultExecutor()),
......@@ -119,7 +120,7 @@ public class FutureUtilsTest extends TestLogger {
}
}
throw new FlinkFutureException("Test exception");
throw new CompletionException(new FlinkException("Test exception"));
},
TestingUtils.defaultExecutor()),
retries,
......
......@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
......@@ -40,11 +39,13 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -977,7 +978,7 @@ public class SlotManagerTest extends TestLogger {
try {
return slotManager.registerSlotRequest(slotRequest);
} catch (SlotManagerException e) {
throw new FlinkFutureException(e);
throw new CompletionException(e);
}
},
mainThreadExecutor)
......
......@@ -20,11 +20,11 @@ package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
......@@ -33,6 +33,7 @@ import org.junit.Test;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -326,7 +327,7 @@ public class FencedRpcEndpointTest extends TestLogger {
try {
computationLatch.await();
} catch (InterruptedException e) {
throw new FlinkFutureException("Waiting on latch failed.", e);
throw new CompletionException(new FlinkException("Waiting on latch failed.", e));
}
return value;
......
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -34,6 +33,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -98,7 +98,7 @@ public class OrderedStreamElementQueueTest extends TestLogger {
try {
result.add(queue.poll());
} catch (InterruptedException e) {
throw new FlinkFutureException(e);
throw new CompletionException(e);
}
}
......
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -37,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -180,7 +180,7 @@ public class StreamElementQueueTest extends TestLogger {
try {
queue.put(streamRecordQueueEntry2);
} catch (InterruptedException e) {
throw new FlinkFutureException(e);
throw new CompletionException(e);
}
},
executor);
......@@ -220,7 +220,7 @@ public class StreamElementQueueTest extends TestLogger {
try {
return queue.peekBlockingly();
} catch (InterruptedException e) {
throw new FlinkFutureException(e);
throw new CompletionException(e);
}
},
executor);
......@@ -244,7 +244,7 @@ public class StreamElementQueueTest extends TestLogger {
try {
return queue.poll();
} catch (InterruptedException e) {
throw new FlinkFutureException(e);
throw new CompletionException(e);
}
},
executor);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -35,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -104,7 +104,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
try {
return queue.poll();
} catch (InterruptedException e) {
throw new FlinkFutureException(e);
throw new CompletionException(e);
}
},
executor);
......@@ -125,7 +125,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
try {
return queue.poll();
} catch (InterruptedException e) {
throw new FlinkFutureException(e);
throw new CompletionException(e);
}
},
executor);
......@@ -171,7 +171,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
try {
return queue.poll();
} catch (InterruptedException e) {
throw new FlinkFutureException(e);
throw new CompletionException(e);
}
},
executor);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册