未验证 提交 7ddb674c 编写于 作者: G gyao 提交者: Till Rohrmann

[FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher

Implement SubmittedJobGraphListener interface in Dispatcher

Call start() on SubmittedJobGraphStore with Dispatcher as listener. To enable
this, the dispatcher must implement the SubmittedJobGraphListener interface. Add
simple unit tests for the new methods. Refactor DispatcherTest to remove
redundancy.

[FLINK-8176][flip6] Make InMemorySubmittedJobGraphStore thread-safe

[FLINK-8176][flip6] Add method isStarted() to TestingLeaderElectionService

[FLINK-8176][flip6] Return same RunningJobsRegistry instance from TestingHighAvailabilityServices

[FLINK-8176][flip6] Fix race conditions in Dispatcher and DispatcherTest

Check if jobManagerRunner exists before submitting job.
Replace JobManagerRunner mock used in tests with real instance.
Do not run job graph recovery in actor main thread when job graph is recovered
from SubmittedJobGraphListener#onAddedJobGraph(JobID).

[FLINK-8176][flip6] Rename variables in DispatcherTest

[FLINK-8176][flip6] Remove injectMocks in DispatcherTest

[FLINK-8176][flip6] Update Dispatcher's SubmittedJobGraphListener callbacks

Always attempt the job submission if onAddedJobGraph or onRemovedJobGraph are
called. The checks in submitJob and removeJob are sufficient.

This closes #5107.
上级 8941f636
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
......@@ -63,6 +64,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
......@@ -73,7 +75,8 @@ import java.util.concurrent.CompletableFuture;
* the jobs and to recover them in case of a master failure. Furthermore, it knows
* about the state of the Flink session cluster.
*/
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements DispatcherGateway, LeaderContender {
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
public static final String DISPATCHER_NAME = "dispatcher";
......@@ -173,6 +176,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
public void start() throws Exception {
super.start();
submittedJobGraphStore.start(this);
leaderElectionService.start(this);
}
......@@ -197,7 +201,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
new JobSubmissionException(jobId, "Could not retrieve the job status.", e));
}
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING &&
!jobManagerRunners.containsKey(jobId)) {
try {
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
} catch (Exception e) {
......@@ -248,7 +253,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
return CompletableFuture.completedFuture(jobManagerRunners.keySet());
return CompletableFuture.completedFuture(
Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
}
@Override
......@@ -399,7 +405,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
/**
* Recovers all jobs persisted via the submitted job graph store.
*/
private void recoverJobs() {
@VisibleForTesting
void recoverJobs() {
log.info("Recovering all persisted jobs.");
getRpcService().execute(
......@@ -507,6 +514,37 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception));
}
//------------------------------------------------------
// SubmittedJobGraphListener
//------------------------------------------------------
@Override
public void onAddedJobGraph(final JobID jobId) {
getRpcService().execute(() -> {
final SubmittedJobGraph submittedJobGraph;
try {
submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
} catch (final Exception e) {
log.error("Could not recover job graph for job {}.", jobId, e);
return;
}
runAsync(() -> {
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT);
});
});
}
@Override
public void onRemovedJobGraph(final JobID jobId) {
runAsync(() -> {
try {
removeJob(jobId, false);
} catch (final Exception e) {
log.error("Could not remove job {}.", jobId, e);
}
});
}
//------------------------------------------------------
// Utility classes
//------------------------------------------------------
......
......@@ -20,57 +20,95 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for the {@link Dispatcher} component.
*/
public class DispatcherTest extends TestLogger {
private static RpcService rpcService;
private static final Time TIMEOUT = Time.seconds(10L);
private static final JobID TEST_JOB_ID = new JobID();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public TestName name = new TestName();
private static RpcService rpcService;
private static final Time timeout = Time.seconds(10L);
private JobGraph jobGraph;
private TestingFatalErrorHandler fatalErrorHandler;
private SubmittedJobGraphStore submittedJobGraphStore;
private TestingLeaderElectionService dispatcherLeaderElectionService;
private TestingLeaderElectionService jobMasterLeaderElectionService;
private RunningJobsRegistry runningJobsRegistry;
/** Instance under test. */
private TestingDispatcher dispatcher;
@BeforeClass
public static void setup() {
......@@ -86,60 +124,77 @@ public class DispatcherTest extends TestLogger {
}
}
/**
* Tests that we can submit a job to the Dispatcher which then spawns a
* new JobManagerRunner.
*/
@Test
public void testJobSubmission() throws Exception {
TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService();
TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
haServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
@Before
public void setUp() throws Exception {
final JobVertex testVertex = new JobVertex("testVertex");
testVertex.setInvokableClass(NoOpInvokable.class);
jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
jobGraph.setAllowQueuedScheduling(true);
HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class);
fatalErrorHandler = new TestingFatalErrorHandler();
final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
submittedJobGraphStore = spy(new InMemorySubmittedJobGraphStore());
final JobGraph jobGraph = mock(JobGraph.class);
final JobID jobId = new JobID();
when(jobGraph.getJobID()).thenReturn(jobId);
dispatcherLeaderElectionService = new TestingLeaderElectionService();
jobMasterLeaderElectionService = new TestingLeaderElectionService();
final TestingDispatcher dispatcher = new TestingDispatcher(
final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService);
haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService());
runningJobsRegistry = haServices.getRunningJobsRegistry();
final Configuration blobServerConfig = new Configuration();
blobServerConfig.setString(
BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
dispatcher = new TestingDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
new Configuration(),
haServices,
mock(ResourceManagerGateway.class),
mock(BlobServer.class),
new BlobServer(blobServerConfig, new VoidBlobStore()),
heartbeatServices,
mock(MetricRegistryImpl.class),
new NoOpMetricRegistry(),
fatalErrorHandler,
jobManagerRunner,
jobId);
TEST_JOB_ID);
try {
dispatcher.start();
dispatcher.start();
}
CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
@After
public void tearDown() throws Exception {
try {
fatalErrorHandler.rethrowError();
} finally {
RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
}
}
// wait for the leader to be elected
leaderFuture.get();
/**
* Tests that we can submit a job to the Dispatcher which then spawns a
* new JobManagerRunner.
*/
@Test
public void testJobSubmission() throws Exception {
CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
// wait for the leader to be elected
leaderFuture.get();
CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
acknowledgeFuture.get();
CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start();
acknowledgeFuture.get();
// check that no error has occurred
fatalErrorHandler.rethrowError();
} finally {
RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
}
assertTrue(
"jobManagerRunner was not started",
dispatcherLeaderElectionService.isStarted());
}
/**
......@@ -147,61 +202,63 @@ public class DispatcherTest extends TestLogger {
*/
@Test
public void testLeaderElection() throws Exception {
TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
UUID expectedLeaderSessionId = UUID.randomUUID();
CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>();
SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class);
TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() {
@Override
public void confirmLeaderSessionID(UUID leaderSessionId) {
super.confirmLeaderSessionID(leaderSessionId);
leaderSessionIdFuture.complete(leaderSessionId);
}
};
haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
final JobID jobId = new JobID();
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
final TestingDispatcher dispatcher = new TestingDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
new Configuration(),
haServices,
mock(ResourceManagerGateway.class),
mock(BlobServer.class),
heartbeatServices,
mock(MetricRegistryImpl.class),
fatalErrorHandler,
mock(JobManagerRunner.class),
jobId);
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
try {
dispatcher.start();
UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture()
.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
verify(submittedJobGraphStore, Mockito.timeout(TIMEOUT.toMilliseconds()).atLeast(1)).getJobIds();
}
/**
* Test callbacks from
* {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}.
*/
@Test
public void testSubmittedJobGraphListener() throws Exception {
dispatcher.recoverJobsEnabled.set(false);
assertFalse(leaderSessionIdFuture.isDone());
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
testingLeaderElectionService.isLeader(expectedLeaderSessionId);
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
UUID actualLeaderSessionId = leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(TEST_JOB_ID);
verify(mockSubmittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
} finally {
RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
}
// pretend that other Dispatcher has removed job from submittedJobGraphStore
submittedJobGraphStore.removeJobGraph(TEST_JOB_ID);
dispatcher.onRemovedJobGraph(TEST_JOB_ID);
assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), empty());
// pretend that other Dispatcher has added a job to submittedJobGraphStore
runningJobsRegistry.clearJob(TEST_JOB_ID);
submittedJobGraphStore.putJobGraph(submittedJobGraph);
dispatcher.onAddedJobGraph(TEST_JOB_ID);
dispatcher.submitJobLatch.await();
assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), hasSize(1));
}
private static class TestingDispatcher extends Dispatcher {
private final JobManagerRunner jobManagerRunner;
private final JobID expectedJobId;
protected TestingDispatcher(
private final CountDownLatch submitJobLatch = new CountDownLatch(2);
/**
* Controls whether existing jobs in {@link SubmittedJobGraphStore} should be recovered
* when {@link TestingDispatcher} is granted leadership.
* */
private final AtomicBoolean recoverJobsEnabled = new AtomicBoolean(true);
private TestingDispatcher(
RpcService rpcService,
String endpointId,
Configuration configuration,
......@@ -211,7 +268,6 @@ public class DispatcherTest extends TestLogger {
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
JobManagerRunner jobManagerRunner,
JobID expectedJobId) throws Exception {
super(
rpcService,
......@@ -225,7 +281,6 @@ public class DispatcherTest extends TestLogger {
fatalErrorHandler,
null);
this.jobManagerRunner = jobManagerRunner;
this.expectedJobId = expectedJobId;
}
......@@ -243,7 +298,32 @@ public class DispatcherTest extends TestLogger {
FatalErrorHandler fatalErrorHandler) throws Exception {
assertEquals(expectedJobId, jobGraph.getJobID());
return jobManagerRunner;
return new JobManagerRunner(resourceId, jobGraph, configuration, rpcService,
highAvailabilityServices, heartbeatServices, jobManagerServices, metricRegistry,
onCompleteActions, fatalErrorHandler, null);
}
@Override
public CompletableFuture<Acknowledge> submitJob(final JobGraph jobGraph, final Time timeout) {
final CompletableFuture<Acknowledge> submitJobFuture = super.submitJob(jobGraph, timeout);
try {
submitJobFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException(e);
}
submitJobLatch.countDown();
return submitJobFuture;
}
@Override
void recoverJobs() {
if (recoverJobsEnabled.get()) {
super.recoverJobs();
}
}
}
}
......@@ -52,6 +52,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
private volatile SubmittedJobGraphStore submittedJobGraphStore;
private final RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
// ------------------------------------------------------------------------
// Setters for mock / testing implementations
// ------------------------------------------------------------------------
......@@ -185,7 +187,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
@Override
public RunningJobsRegistry getRunningJobsRegistry() {
return new StandaloneRunningJobsRegistry();
return runningJobsRegistry;
}
@Override
......
......@@ -86,4 +86,13 @@ public class TestingLeaderElectionService implements LeaderElectionService {
public synchronized String getAddress() {
return contender.getAddress();
}
/**
* Returns <code>true</code> if {@link #start(LeaderContender)} was called,
* <code>false</code> otherwise.
*/
public synchronized boolean isStarted() {
return contender != null;
}
}
......@@ -26,9 +26,13 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import static java.util.Objects.requireNonNull;
/**
* In-Memory implementation of {@link SubmittedJobGraphStore} for testing purposes.
*/
......@@ -36,43 +40,45 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
private final Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
private volatile boolean started;
private boolean started;
@Override
public void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception {
public synchronized void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception {
started = true;
}
@Override
public void stop() throws Exception {
public synchronized void stop() throws Exception {
started = false;
}
@Override
public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
verifyIsStarted();
return storedJobs.getOrDefault(jobId, null);
return requireNonNull(
storedJobs.get(jobId),
"Job graph for job " + jobId + " does not exist");
}
@Override
public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
public synchronized void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
verifyIsStarted();
storedJobs.put(jobGraph.getJobId(), jobGraph);
}
@Override
public void removeJobGraph(JobID jobId) throws Exception {
public synchronized void removeJobGraph(JobID jobId) throws Exception {
verifyIsStarted();
storedJobs.remove(jobId);
}
@Override
public Collection<JobID> getJobIds() throws Exception {
public synchronized Collection<JobID> getJobIds() throws Exception {
verifyIsStarted();
return storedJobs.keySet();
return Collections.unmodifiableSet(new HashSet<>(storedJobs.keySet()));
}
public boolean contains(JobID jobId) {
public synchronized boolean contains(JobID jobId) {
verifyIsStarted();
return storedJobs.containsKey(jobId);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册