[FLINK-11414] Introduce JobMasterService interface

For a better separation of concerns in the JobManagerRunner, this commit introduces
a JobMasterService which only exposes the JobMaster's lifecycle methods to the
JobManagerRunner. This allows for an easier substitution when testing the JobManagerRunner.

This closes #7563.
上级 3138734d
......@@ -77,7 +77,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
private final JobManagerSharedServices jobManagerSharedServices;
private final JobMaster jobMaster;
private final JobMasterService jobMasterService;
private final FatalErrorHandler fatalErrorHandler;
......@@ -152,7 +152,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
rpcService);
// now start the JobManager
this.jobMaster = new JobMaster(
this.jobMasterService = new JobMaster(
rpcService,
jobMasterConfiguration,
resourceId,
......@@ -212,8 +212,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
setNewLeaderGatewayFuture();
leaderGatewayFuture.completeExceptionally(new FlinkException("JobMaster has been shut down."));
jobMaster.shutDown();
final CompletableFuture<Void> jobManagerTerminationFuture = jobMaster.getTerminationFuture();
final CompletableFuture<Void> jobManagerTerminationFuture = jobMasterService.closeAsync();
jobManagerTerminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
......@@ -328,7 +327,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);
final CompletableFuture<Acknowledge> startFuture = jobMasterService.start(new JobMasterId(leaderSessionId), rpcTimeout);
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
startFuture.whenCompleteAsync(
......@@ -345,7 +344,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
private void confirmLeaderSessionIdIfStillLeader(UUID leaderSessionId, CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture) {
if (leaderElectionService.hasLeadership(leaderSessionId)) {
currentLeaderGatewayFuture.complete(jobMaster.getSelfGateway(JobMasterGateway.class));
currentLeaderGatewayFuture.complete(jobMasterService.getGateway());
leaderElectionService.confirmLeaderSessionID(leaderSessionId);
} else {
log.debug("Ignoring confirmation of leader session id because {} is no longer the leader.", getAddress());
......@@ -365,7 +364,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
setNewLeaderGatewayFuture();
CompletableFuture<Acknowledge> suspendFuture = jobMaster.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);
CompletableFuture<Acknowledge> suspendFuture = jobMasterService.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);
suspendFuture.whenCompleteAsync(
(Acknowledge ack, Throwable throwable) -> {
......@@ -396,7 +395,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
@Override
public String getAddress() {
return jobMaster.getAddress();
return jobMasterService.getAddress();
}
@Override
......
......@@ -145,7 +145,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* given task</li>
* </ul>
*/
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
/** Default names for Flink's distributed components. */
public static final String JOB_MANAGER_NAME = "jobmanager";
......@@ -1499,6 +1499,21 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
}
//----------------------------------------------------------------------------------------------
// Service methods
//----------------------------------------------------------------------------------------------
@Override
public JobMasterGateway getGateway() {
return getSelfGateway(JobMasterGateway.class);
}
@Override
public CompletableFuture<Void> closeAsync() {
shutDown();
return getTerminationFuture();
}
//----------------------------------------------------------------------------------------------
// Utility classes
//----------------------------------------------------------------------------------------------
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.util.AutoCloseableAsync;
import java.util.concurrent.CompletableFuture;
/**
* Interface which specifies the JobMaster service.
*/
public interface JobMasterService extends AutoCloseableAsync {
/**
* Start the JobMaster service with the given {@link JobMasterId}.
*
* @param jobMasterId to start the service with
* @param rpcTimeout timeout of this operation
* @return Future which is completed once the JobMaster service has been started
* @throws Exception if the JobMaster service could not be started
*/
CompletableFuture<Acknowledge> start(JobMasterId jobMasterId, Time rpcTimeout) throws Exception;
/**
* Suspend the JobMaster service. This means that the service will stop to react
* to messages.
*
* @param cause for the suspension
* @param rpcTimeout timeout of this operation
* @return Future which is completed once the JobMaster service has been suspended
*/
CompletableFuture<Acknowledge> suspend(Exception cause, Time rpcTimeout);
/**
* Get the {@link JobMasterGateway} belonging to this service.
*
* @return JobMasterGateway belonging to this service
*/
JobMasterGateway getGateway();
/**
* Get the address of the JobMaster service under which it is reachable.
*
* @return Address of the JobMaster service
*/
String getAddress();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册