[FLINK-9706] Properly wait for termination of JobManagerRunner before restarting jobs

In order to avoid race conditions between resource clean up, we now wait for the proper
termination of a previously running JobMaster responsible for the same job (e.g. originating
from a job recovery or a re-submission).

This closes #6279.
上级 dc7d81c9
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.util.function;
import org.apache.flink.util.ExceptionUtils;
import java.util.function.Consumer;
/**
* A checked extension of the {@link Consumer} interface.
*
* @param <T> type of the first argument
* @param <E> type of the thrown exception
*/
public interface ConsumerWithException<T, E extends Throwable> extends Consumer<T> {
void acceptWithException(T value) throws E;
@Override
default void accept(T value) {
try {
acceptWithException(value);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
}
......@@ -64,13 +64,13 @@ import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ConsumerWithException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
......@@ -126,7 +126,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
@Nullable
protected final String restAddress;
private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
public Dispatcher(
RpcService rpcService,
......@@ -173,6 +173,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory);
this.jobManagerTerminationFutures = new HashMap<>(2);
}
//------------------------------------------------------
......@@ -183,11 +185,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
public CompletableFuture<Void> postStop() {
log.info("Stopping dispatcher {}.", getAddress());
final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList(
jobManagerRunnersTerminationFuture,
orphanedJobManagerRunnersTerminationFuture));
final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = terminateJobManagerRunnersAndGetTerminationFuture();
return FutureUtils.runAfterwards(
allJobManagerRunnersTerminationFuture,
......@@ -238,20 +236,26 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
final JobID jobId = jobGraph.getJobID();
log.info("Submitting job {} ({}).", jobId, jobGraph.getName());
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
try {
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
} catch (IOException e) {
return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
}
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
} else {
persistAndRunJob(jobGraph);
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
} else {
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobId, jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
return CompletableFuture.completedFuture(Acknowledge.get());
}
} catch (Exception e) {
return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to submit job %s.", jobId), e));
return persistAndRunFuture.exceptionally(
(Throwable throwable) -> {
throw new CompletionException(
new JobSubmissionException(jobId, "Failed to submit job.", throwable));
});
}
}
......@@ -536,7 +540,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) {
final CompletableFuture<Void> cleanupFuture = removeJob(jobId, cleanupHA);
registerOrphanedJobManagerTerminationFuture(cleanupFuture);
registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
}
private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture);
// clean up the pending termination future
jobManagerRunnerTerminationFuture.thenRunAsync(
() -> {
final CompletableFuture<Void> terminationFuture = jobManagerTerminationFutures.remove(jobId);
//noinspection ObjectEquality
if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) {
jobManagerTerminationFutures.put(jobId, terminationFuture);
}
},
getUnfencedMainThreadExecutor());
}
private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
......@@ -573,19 +595,21 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
/**
* Terminate all currently running {@link JobManagerRunner}.
*
* @return Future which is completed once all {@link JobManagerRunner} have terminated
*/
private CompletableFuture<Void> terminateJobManagerRunners() {
private void terminateJobManagerRunners() {
log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunners.keySet());
final List<CompletableFuture<Void>> terminationFutures = jobsToRemove.stream()
.map(jobId -> removeJob(jobId, false))
.collect(Collectors.toList());
for (JobID jobId : jobsToRemove) {
removeJobAndRegisterTerminationFuture(jobId, false);
}
}
return FutureUtils.completeAll(terminationFutures);
private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFuture() {
terminateJobManagerRunners();
final Collection<CompletableFuture<Void>> values = jobManagerTerminationFutures.values();
return FutureUtils.completeAll(values);
}
/**
......@@ -677,12 +701,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
}
private void registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
orphanedJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList(
orphanedJobManagerRunnersTerminationFuture,
jobManagerRunnerTerminationFuture));
}
private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
......@@ -741,7 +759,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();
final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenApplyAsync(
final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
getUnfencedMainThreadExecutor());
......@@ -761,31 +779,44 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
});
}
private boolean tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
setNewFencingToken(dispatcherId);
Collection<CompletableFuture<Void>> runFutures = new ArrayList<>(recoveredJobs.size());
for (JobGraph recoveredJob : recoveredJobs) {
try {
runJob(recoveredJob);
} catch (Exception e) {
throw new CompletionException(
new FlinkException(
String.format("Failed to recover job %s.", recoveredJob.getJobID()),
e));
}
final CompletableFuture<Void> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
runFutures.add(runFuture);
}
return true;
return FutureUtils.waitForAll(runFutures).thenApply(ignored -> true);
} else {
log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", getAddress(), dispatcherId);
return false;
return CompletableFuture.completedFuture(false);
}
}
private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture = jobManagerTerminationFutures
.getOrDefault(jobId, CompletableFuture.completedFuture(null))
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
new DispatcherException(
String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
throwable)); });
return jobManagerTerminationFuture.thenRunAsync(
() -> {
jobManagerTerminationFutures.remove(jobId);
action.accept(jobGraph);
},
getMainThreadExecutor());
}
private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
// clear the state if we've been the leader before
if (getFencingToken() != null) {
......@@ -796,8 +827,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
private void clearDispatcherState() {
final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture);
terminateJobManagerRunners();
}
/**
......
......@@ -127,6 +127,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private CompletableFuture<JobID> deleteAllFuture;
private CompletableFuture<ArchivedExecutionGraph> resultFuture;
private CompletableFuture<JobID> cleanupJobFuture;
private CompletableFuture<Void> terminationFuture;
@BeforeClass
public static void setupClass() {
......@@ -162,6 +163,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
.createTestingBlobStore();
cleanupJobFuture = new CompletableFuture<>();
terminationFuture = new CompletableFuture<>();
blobServer = new TestingBlobServer(configuration, testingBlobStore, cleanupJobFuture);
......@@ -185,7 +187,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
new TestingJobManagerRunnerFactory(resultFuture, CompletableFuture.completedFuture(null)),
new TestingJobManagerRunnerFactory(resultFuture, terminationFuture),
fatalErrorHandler);
dispatcher.start();
......@@ -225,6 +227,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
// complete the job
resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build());
terminationFuture.complete(null);
assertThat(cleanupJobFuture.get(), equalTo(jobId));
......@@ -245,6 +248,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
// job not finished
resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
terminationFuture.complete(null);
assertThat(cleanupJobFuture.get(), equalTo(jobId));
......@@ -266,6 +270,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
submitJob();
dispatcher.shutDown();
terminationFuture.complete(null);
dispatcher.getTerminationFuture().get();
assertThat(cleanupJobFuture.get(), equalTo(jobId));
......@@ -295,6 +300,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(runningJobsRegistry.contains(jobId), is(true));
resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
terminationFuture.complete(null);
// wait for the clearing
clearedJobLatch.await();
......@@ -302,6 +308,57 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(runningJobsRegistry.contains(jobId), is(false));
}
/**
* Tests that the previous JobManager needs to be completely terminated
* before a new job with the same {@link JobID} is started.
*/
@Test
public void testJobSubmissionUnderSameJobId() throws Exception {
submitJob();
runningJobsRegistry.setJobRunning(jobId);
resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);
try {
submissionFuture.get(10L, TimeUnit.MILLISECONDS);
fail("The job submission future should not complete until the previous JobManager " +
"termination future has been completed.");
} catch (TimeoutException ignored) {
// expected
} finally {
terminationFuture.complete(null);
}
assertThat(submissionFuture.get(), equalTo(Acknowledge.get()));
}
/**
* Tests that recovered jobs will only be started after the complete termination of any
* other previously running JobMasters for the same job.
*/
@Test
public void testJobRecoveryWithPendingTermination() throws Exception {
submitJob();
runningJobsRegistry.setJobRunning(jobId);
dispatcherLeaderElectionService.notLeader();
final UUID leaderSessionId = UUID.randomUUID();
final CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(leaderSessionId);
try {
leaderFuture.get(10L, TimeUnit.MILLISECONDS);
fail("We should not become leader before all previously running JobMasters have terminated.");
} catch (TimeoutException ignored) {
// expected
} finally {
terminationFuture.complete(null);
}
assertThat(leaderFuture.get(), equalTo(leaderSessionId));
}
private static final class SingleRunningJobsRegistry implements RunningJobsRegistry {
@Nonnull
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册