提交 97ccc147 编写于 作者: T Till Rohrmann

[FLINK-4364] Introduce HeartbeatServices for the HeartbeatManager instantiation

The HeartbeatServices are used to create all services relevant for heartbeating. This
includes at the moment the creation of HeartbeatManager implementations which actively
send heartbeats and those which only respond to heartbeat requests.

Add comments
上级 0b3d5c27
......@@ -30,7 +30,7 @@ public class HeartbeatManagerOptions {
/** Time interval for requesting heartbeat from sender side */
public static final ConfigOption<Long> HEARTBEAT_INTERVAL =
key("heartbeat.sender.interval")
key("heartbeat.interval")
.defaultValue(10000L);
/** Timeout for requesting and receiving heartbeat for both sender and receiver sides */
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.heartbeat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
/**
* HeartbeatServices gives access to all services needed for heartbeating. This includes the
* creation of heartbeat receivers and heartbeat senders.
*/
public class HeartbeatServices {
/** Heartbeat interval for the created services */
protected final long heartbeatInterval;
/** Heartbeat timeout for the created services */
protected final long heartbeatTimeout;
public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
Preconditions.checkArgument(0L < heartbeatInterval, "The heartbeat interval must be larger than 0.");
Preconditions.checkArgument(heartbeatInterval <= heartbeatTimeout, "The heartbeat timeout should be larger or equal than the heartbeat timeout.");
this.heartbeatInterval = heartbeatInterval;
this.heartbeatTimeout = heartbeatTimeout;
}
/**
* Creates a heartbeat manager which does not actively send heartbeats.
*
* @param resourceId Resource Id which identifies the owner of the heartbeat manager
* @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
* targets
* @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
* @param log Logger to be used for the logging
* @param <I> Type of the incoming payload
* @param <O> Type of the outgoing payload
* @return A new HeartbeatManager instance
*/
public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor scheduledExecutor,
Logger log) {
return new HeartbeatManagerImpl<>(
heartbeatTimeout,
resourceId,
heartbeatListener,
scheduledExecutor,
scheduledExecutor,
log);
}
/**
* Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
*
* @param resourceId Resource Id which identifies the owner of the heartbeat manager
* @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
* targets
* @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
* @param log Logger to be used for the logging
* @param <I> Type of the incoming payload
* @param <O> Type of the outgoing payload
* @return A new HeartbeatManager instance which actively sends heartbeats
*/
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor scheduledExecutor,
Logger log) {
return new HeartbeatManagerSenderImpl<>(
heartbeatInterval,
heartbeatTimeout,
resourceId,
heartbeatListener,
scheduledExecutor,
scheduledExecutor,
log);
}
/**
* Creates an HeartbeatServices instance from a {@link Configuration}.
*
* @param configuration Configuration to be used for the HeartbeatServices creation
* @return An HeartbeatServices instance created from the given configuration
*/
public static HeartbeatServices fromConfiguration(Configuration configuration) {
long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);
return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}
}
......@@ -40,11 +40,12 @@ public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O
CountDownLatch waitLatch,
long heartbeatTimeoutIntervalMs,
ResourceID ownResourceID,
HeartbeatListener<I, O> heartbeatListener,
Executor executor,
ScheduledExecutor scheduledExecutor,
Logger log) {
super(heartbeatTimeoutIntervalMs, ownResourceID, executor, scheduledExecutor, log);
super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
this.waitLatch = waitLatch;
}
......
......@@ -38,11 +38,12 @@ public class TestingHeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerSen
long heartbeatPeriod,
long heartbeatTimeout,
ResourceID ownResourceID,
HeartbeatListener<I, O> heartbeatListener,
Executor executor,
ScheduledExecutor scheduledExecutor,
Logger log) {
super(heartbeatPeriod, heartbeatTimeout, ownResourceID, executor, scheduledExecutor, log);
super(heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
this.waitLatch = waitLatch;
}
......
......@@ -21,11 +21,10 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
......@@ -88,31 +87,47 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
// ------------------------------------------------------------------------
public JobManagerRunner(
final ResourceID resourceId,
final JobGraph jobGraph,
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices haServices,
final HeartbeatServices heartbeatServices,
final OnCompletionActions toNotifyOnComplete,
final FatalErrorHandler errorHandler) throws Exception
{
this(jobGraph, configuration, rpcService, haServices,
new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
toNotifyOnComplete, errorHandler);
final FatalErrorHandler errorHandler) throws Exception {
this(
resourceId,
jobGraph,
configuration,
rpcService,
haServices,
heartbeatServices,
new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
toNotifyOnComplete,
errorHandler);
}
public JobManagerRunner(
final ResourceID resourceId,
final JobGraph jobGraph,
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices haServices,
final HeartbeatServices heartbeatServices,
final MetricRegistry metricRegistry,
final OnCompletionActions toNotifyOnComplete,
final FatalErrorHandler errorHandler) throws Exception
{
this(jobGraph, configuration, rpcService, haServices,
JobManagerServices.fromConfiguration(configuration, haServices),
metricRegistry,
toNotifyOnComplete, errorHandler);
final FatalErrorHandler errorHandler) throws Exception {
this(
resourceId,
jobGraph,
configuration,
rpcService,
haServices,
heartbeatServices,
JobManagerServices.fromConfiguration(configuration, haServices),
metricRegistry,
toNotifyOnComplete,
errorHandler);
}
/**
......@@ -127,15 +142,16 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
* required services could not be started, ot the Job could not be initialized.
*/
public JobManagerRunner(
final ResourceID resourceId,
final JobGraph jobGraph,
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices haServices,
final HeartbeatServices heartbeatServices,
final JobManagerServices jobManagerServices,
final MetricRegistry metricRegistry,
final OnCompletionActions toNotifyOnComplete,
final FatalErrorHandler errorHandler) throws Exception
{
final FatalErrorHandler errorHandler) throws Exception {
JobManagerMetricGroup jobManagerMetrics = null;
......@@ -170,31 +186,22 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
this.runningJobsRegistry = haServices.getRunningJobsRegistry();
this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
// heartbeat manager last
final ResourceID resourceID = ResourceID.generate();
final HeartbeatManagerSenderImpl<Void, Void> jobManagerHeartbeatManager = new HeartbeatManagerSenderImpl<>(
configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL),
configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
resourceID,
rpcService.getExecutor(),
rpcService.getScheduledExecutor(),
log);
// now start the JobManager
this.jobManager = new JobMaster(
jobGraph, configuration,
rpcService,
haServices,
jobManagerServices.executorService,
jobManagerServices.libraryCacheManager,
jobManagerServices.restartStrategyFactory,
jobManagerServices.rpcAskTimeout,
jobManagerMetrics,
resourceID,
jobManagerHeartbeatManager,
this,
this,
userCodeLoader);
resourceId,
jobGraph,
configuration,
rpcService,
haServices,
heartbeatServices,
jobManagerServices.executorService,
jobManagerServices.libraryCacheManager,
jobManagerServices.restartStrategyFactory,
jobManagerServices.rpcAskTimeout,
jobManagerMetrics,
this,
this,
userCodeLoader);
}
catch (Throwable t) {
// clean up everything
......
......@@ -51,7 +51,8 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
......@@ -105,8 +106,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -129,6 +130,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
// ------------------------------------------------------------------------
private final ResourceID resourceId;
/** Logical representation of the job */
private final JobGraph jobGraph;
......@@ -150,10 +153,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
private final MetricGroup jobMetricGroup;
/** The heartbeat manager with task managers */
private final HeartbeatManagerImpl<Void, Void> heartbeatManager;
private final HeartbeatManager<Void, Void> heartbeatManager;
/** The execution context which is used to execute futures */
private final ExecutorService executionContext;
private final Executor executor;
private final OnCompletionActions jobCompletionActions;
......@@ -170,8 +173,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
private volatile UUID leaderSessionID;
private final ResourceID resourceID;
// --------- ResourceManager --------
/** Leader retriever service used to locate ResourceManager's address */
......@@ -187,34 +188,38 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
// ------------------------------------------------------------------------
public JobMaster(
ResourceID resourceId,
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityService,
ScheduledExecutorService executorService,
HeartbeatServices heartbeatServices,
ScheduledExecutorService executor,
BlobLibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
Time rpcAskTimeout,
@Nullable JobManagerMetricGroup jobManagerMetricGroup,
ResourceID resourceID,
HeartbeatManagerImpl<Void, Void> heartbeatManager,
OnCompletionActions jobCompletionActions,
FatalErrorHandler errorHandler,
ClassLoader userCodeLoader) throws Exception
{
ClassLoader userCodeLoader) throws Exception {
super(rpcService);
this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
this.configuration = checkNotNull(configuration);
this.rpcTimeout = rpcAskTimeout;
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.executionContext = checkNotNull(executorService);
this.executor = checkNotNull(executor);
this.jobCompletionActions = checkNotNull(jobCompletionActions);
this.errorHandler = checkNotNull(errorHandler);
this.userCodeLoader = checkNotNull(userCodeLoader);
this.resourceID = checkNotNull(resourceID);
this.heartbeatManager = checkNotNull(heartbeatManager);
this.heartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
rpcService.getScheduledExecutor(),
log);
final String jobName = jobGraph.getName();
final JobID jid = jobGraph.getJobID();
......@@ -251,8 +256,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
null,
jobGraph,
configuration,
executorService,
executorService,
executor,
executor,
slotPool.getSlotProvider(),
userCodeLoader,
checkpointRecoveryFactory,
......@@ -288,27 +293,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
// make sure we receive RPC and async calls
super.start();
heartbeatManager.start(new HeartbeatListener<Void, Void>() {
@Override
public void notifyHeartbeatTimeout(ResourceID resourceID) {
log.info("Notify heartbeat timeout with task manager {}", resourceID);
heartbeatManager.unmonitorTarget(resourceID);
getSelf().disconnectTaskManager(resourceID);
}
@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// currently there is no payload from task manager and resource manager, so this method will not be called.
}
@Override
public Future<Void> retrievePayload() {
// currently no need payload.
return null;
}
});
log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID());
getSelf().startJobExecution();
}
......@@ -322,8 +306,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
*/
@Override
public void shutDown() throws Exception {
// make sure there is a graceful exit
heartbeatManager.stop();
// make sure there is a graceful exit
getSelf().suspendExecution(new Exception("JobManager is shutting down."));
super.shutDown();
}
......@@ -371,7 +356,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
// start scheduling job in another thread
executionContext.execute(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
try {
......@@ -545,9 +530,16 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
@RpcMethod
public void disconnectTaskManager(final ResourceID resourceID) {
registeredTaskManagers.remove(resourceID);
public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
heartbeatManager.unmonitorTarget(resourceID);
slotPoolGateway.releaseTaskManager(resourceID);
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
if (taskManagerConnection != null) {
taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause);
}
}
// TODO: This method needs a leader session ID
......@@ -743,7 +735,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
if (registeredTaskManagers.containsKey(taskManagerId)) {
final RegistrationResponse response = new JMTMRegistrationSuccess(
resourceID, libraryCacheManager.getBlobServerPort());
resourceId, libraryCacheManager.getBlobServerPort());
return FlinkCompletableFuture.completed(response);
} else {
return getRpcService().execute(new Callable<TaskExecutorGateway>() {
......@@ -773,7 +765,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
// monitor the task manager as heartbeat target
heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
@Override
public void sendHeartbeat(ResourceID resourceID, Void payload) {
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
// the task manager will not request heartbeat, so this method will never be called currently
}
......@@ -783,7 +775,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
});
return new JMTMRegistrationSuccess(resourceID, libraryCacheManager.getBlobServerPort());
return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
}
}, getMainThreadExecutor());
}
......@@ -799,7 +791,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@RpcMethod
public void heartbeatFromTaskManager(final ResourceID resourceID) {
heartbeatManager.sendHeartbeat(resourceID, null);
heartbeatManager.receiveHeartbeat(resourceID, null);
}
//----------------------------------------------------------------------------------------------
......@@ -903,7 +895,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
resourceManagerConnection = new ResourceManagerConnection(
log, jobGraph.getJobID(), getAddress(), leaderSessionID,
resourceManagerAddress, resourceManagerLeaderId, executionContext);
resourceManagerAddress, resourceManagerLeaderId, executor);
resourceManagerConnection.start();
}
}
......@@ -1046,4 +1038,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
});
}
}
private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
@Override
public void notifyHeartbeatTimeout(ResourceID resourceID) {
log.info("Task manager with id {} timed out.", resourceID);
getSelf().disconnectTaskManager(
resourceID,
new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out."));
}
@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// nothing to do since there is no payload
}
@Override
public Future<Void> retrievePayload() {
return FlinkCompletableFuture.completed(null);
}
}
}
......@@ -124,8 +124,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* {@link JobMaster}.
*
* @param resourceID identifying the TaskManager to disconnect
* @param cause for the disconnection of the TaskManager
*/
void disconnectTaskManager(ResourceID resourceID);
void disconnectTaskManager(ResourceID resourceID, Exception cause);
/**
* Disconnects the resource manager from the job manager because of the given cause.
......
......@@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -81,6 +82,9 @@ public class MiniCluster {
@GuardedBy("lock")
private HighAvailabilityServices haServices;
@GuardedBy("lock")
private HeartbeatServices heartbeatServices;
@GuardedBy("lock")
private ResourceManagerRunner[] resourceManagerRunners;
......@@ -232,6 +236,8 @@ public class MiniCluster {
LOG.info("Starting high-availability services");
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
// bring up the ResourceManager(s)
LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
resourceManagerRunners = startResourceManagers(
......@@ -245,7 +251,12 @@ public class MiniCluster {
// bring up the dispatcher that launches JobManagers when jobs submitted
LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers);
jobDispatcher = new MiniClusterJobDispatcher(
configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
configuration,
haServices,
heartbeatServices,
metricRegistry,
numJobManagers,
jobManagerRpcServices);
}
catch (Exception e) {
// cleanup everything
......@@ -533,6 +544,7 @@ public class MiniCluster {
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServices[i],
haServices,
heartbeatServices,
metricRegistry,
localCommunication);
......
......@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
......@@ -63,6 +65,9 @@ public class MiniClusterJobDispatcher {
/** services for discovery, leader election, and recovery */
private final HighAvailabilityServices haServices;
/** services for heartbeating */
private final HeartbeatServices heartbeatServices;
/** all the services that the JobManager needs, such as BLOB service, factories, etc */
private final JobManagerServices jobManagerServices;
......@@ -94,8 +99,9 @@ public class MiniClusterJobDispatcher {
Configuration config,
RpcService rpcService,
HighAvailabilityServices haServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry) throws Exception {
this(config, haServices, metricRegistry, 1, new RpcService[] { rpcService });
this(config, haServices, heartbeatServices, metricRegistry, 1, new RpcService[] { rpcService });
}
/**
......@@ -113,6 +119,7 @@ public class MiniClusterJobDispatcher {
public MiniClusterJobDispatcher(
Configuration config,
HighAvailabilityServices haServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
int numJobManagers,
RpcService[] rpcServices) throws Exception {
......@@ -123,6 +130,7 @@ public class MiniClusterJobDispatcher {
this.configuration = checkNotNull(config);
this.rpcServices = rpcServices;
this.haServices = checkNotNull(haServices);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.metricRegistry = checkNotNull(metricRegistry);
this.numJobManagers = numJobManagers;
......@@ -232,9 +240,17 @@ public class MiniClusterJobDispatcher {
JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
for (int i = 0; i < numJobManagers; i++) {
try {
runners[i] = new JobManagerRunner(job, configuration,
rpcServices[i], haServices, jobManagerServices, metricRegistry,
onCompletion, errorHandler);
runners[i] = new JobManagerRunner(
ResourceID.generate(),
job,
configuration,
rpcServices[i],
haServices,
heartbeatServices,
jobManagerServices,
metricRegistry,
onCompletion,
errorHandler);
runners[i].start();
}
catch (Throwable t) {
......
......@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
......@@ -39,7 +40,8 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
......@@ -131,7 +133,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private final MetricRegistry metricRegistry;
/** The heartbeat manager for job manager in the task manager */
private final HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager;
private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
/** The fatal error handler to use in case of a fatal error */
private final FatalErrorHandler fatalErrorHandler;
......@@ -168,8 +170,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
IOManager ioManager,
NetworkEnvironment networkEnvironment,
HighAvailabilityServices haServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager,
TaskManagerMetricGroup taskManagerMetricGroup,
BroadcastVariableManager broadcastVariableManager,
FileCache fileCache,
......@@ -189,7 +191,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
this.networkEnvironment = checkNotNull(networkEnvironment);
this.haServices = checkNotNull(haServices);
this.metricRegistry = checkNotNull(metricRegistry);
this.jobManagerHeartbeatManager = checkNotNull(jobManagerHeartbeatManager);
this.taskSlotTable = checkNotNull(taskSlotTable);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
......@@ -199,6 +200,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
this.jobLeaderService = checkNotNull(jobLeaderService);
this.jobManagerConnections = new HashMap<>(4);
this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
getResourceID(),
new JobManagerHeartbeatListener(),
rpcService.getScheduledExecutor(),
log);
}
// ------------------------------------------------------------------------
......@@ -221,38 +228,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
// start the heartbeat manager for monitoring job manager
jobManagerHeartbeatManager.start(new HeartbeatListener<Void, Void>() {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
runAsync(new Runnable() {
@Override
public void run() {
log.info("Notify heartbeat timeout with job manager {}", resourceID);
jobManagerHeartbeatManager.unmonitorTarget(resourceID);
if (jobManagerConnections.containsKey(resourceID)) {
JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
if (jobManagerConnection != null) {
closeJobManagerConnection(jobManagerConnection.getJobID());
}
}
}
});
}
@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// currently there is no payload from job manager, so this method will not be called.
}
@Override
public Future<Void> retrievePayload() {
// currently no need payload.
return null;
}
});
}
/**
......@@ -644,6 +619,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId);
}
@RpcMethod
public void disconnectJobManager(JobID jobId, Exception cause) {
closeJobManagerConnection(jobId, cause);
}
// ======================================================================
// Internal methods
// ======================================================================
......@@ -786,7 +766,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
if (jobManagerTable.contains(jobId)) {
JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId);
if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
closeJobManagerConnection(jobId);
closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId + '.'));
}
}
......@@ -803,20 +783,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// monitor the job manager as heartbeat target
jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>() {
@Override
public void sendHeartbeat(ResourceID resourceID, Void payload) {
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
jobMasterGateway.heartbeatFromTaskManager(resourceID);
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
// request heartbeat will never be called in task manager side
// request heartbeat will never be called on the task manager side
}
});
offerSlotsToJobManager(jobId);
}
private void closeJobManagerConnection(JobID jobId) {
private void closeJobManagerConnection(JobID jobId, Exception cause) {
log.info("Close JobManager connection for job {}.", jobId);
// 1. fail tasks running under this JobID
......@@ -847,8 +827,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
if (jobManagerConnection != null) {
try {
jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceID());
jobManagerConnections.remove(jobManagerConnection.getResourceID());
disassociateFromJobManager(jobManagerConnection);
disassociateFromJobManager(jobManagerConnection, cause);
} catch (IOException e) {
log.warn("Could not properly disassociate from JobManager {}.",
jobManagerConnection.getJobManagerGateway().getAddress(), e);
......@@ -909,10 +891,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
partitionStateChecker);
}
private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException {
private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException {
Preconditions.checkNotNull(jobManagerConnection);
JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
jobManagerGateway.disconnectTaskManager(getResourceID());
jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
jobManagerConnection.getLibraryCacheManager().shutdown();
}
......@@ -1138,7 +1120,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
runAsync(new Runnable() {
@Override
public void run() {
closeJobManagerConnection(jobId);
closeJobManagerConnection(
jobId,
new Exception("Job leader for job id " + jobId + " lost leadership."));
}
});
}
......@@ -1220,4 +1204,37 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
});
}
}
private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
runAsync(new Runnable() {
@Override
public void run() {
log.info("The JobManager connection {} has timed out.", resourceID);
if (jobManagerConnections.containsKey(resourceID)) {
JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
if (jobManagerConnection != null) {
closeJobManagerConnection(
jobManagerConnection.getJobID(),
new TimeoutException("The heartbeat of JobManager with id " +
resourceID + " timed out."));
}
}
}
});
}
@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// nothing to do since the payload is of type Void
}
@Override
public Future<Void> retrievePayload() {
return FlinkCompletableFuture.completed(null);
}
}
}
......@@ -134,9 +134,17 @@ public interface TaskExecutorGateway extends RpcGateway {
Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
/**
* Request heartbeat from the job manager
* Heartbeat request from the job manager
*
* @param resourceID unique id of the job manager
* @param heartbeatOrigin unique id of the job manager
*/
void heartbeatFromJobManager(ResourceID resourceID);
void heartbeatFromJobManager(ResourceID heartbeatOrigin);
/**
* Disconnects the given JobManager from the TaskManager.
*
* @param jobId JobID for which the JobManager was the leader
* @param cause for the disconnection from the JobManager
*/
void disconnectJobManager(JobID jobId, Exception cause);
}
......@@ -21,11 +21,10 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
......@@ -73,18 +72,27 @@ public class TaskManagerRunner implements FatalErrorHandler {
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry) throws Exception {
this(configuration, resourceID, rpcService, highAvailabilityServices, metricRegistry, false);
this(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
false);
}
public TaskManagerRunner(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
MetricRegistry metricRegistry,
boolean localCommunicationOnly) throws Exception {
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
boolean localCommunicationOnly) throws Exception {
this.configuration = Preconditions.checkNotNull(configuration);
this.resourceID = Preconditions.checkNotNull(resourceID);
......@@ -114,13 +122,6 @@ public class TaskManagerRunner implements FatalErrorHandler {
// Initialize the TM metrics
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
HeartbeatManagerImpl<Void, Void> heartbeatManager = new HeartbeatManagerImpl<>(
configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
resourceID,
executor,
rpcService.getScheduledExecutor(),
LOG);
this.taskManager = new TaskExecutor(
taskManagerConfiguration,
taskManagerServices.getTaskManagerLocation(),
......@@ -129,8 +130,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
taskManagerServices.getIOManager(),
taskManagerServices.getNetworkEnvironment(),
highAvailabilityServices,
heartbeatServices,
metricRegistry,
heartbeatManager,
taskManagerMetricGroup,
taskManagerServices.getBroadcastVariableManager(),
taskManagerServices.getFileCache(),
......
......@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -99,11 +101,15 @@ public class JobManagerRunnerMockTest {
when(haServices.createBlobStore()).thenReturn(blobStore);
when(haServices.getRunningJobsRegistry()).thenReturn(runningJobsRegistry);
HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
runner = PowerMockito.spy(new JobManagerRunner(
ResourceID.generate(),
new JobGraph("test", new JobVertex("vertex")),
mock(Configuration.class),
mockRpc,
haServices,
heartbeatServices,
JobManagerServices.fromConfiguration(new Configuration(), haServices),
new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
jobCompletion,
......
......@@ -19,17 +19,18 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerSenderImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.SlotPoolGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
......@@ -37,24 +38,22 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import java.net.InetAddress;
import java.net.URL;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
......@@ -84,32 +83,28 @@ public class JobMasterTest extends TestLogger {
final long heartbeatInterval = 1L;
final long heartbeatTimeout = 5L;
final CountDownLatch waitLatch = new CountDownLatch(1);
final HeartbeatManagerSenderImpl<Void, Void> jmHeartbeatManager = new TestingHeartbeatManagerSenderImpl<>(
waitLatch,
heartbeatInterval,
heartbeatTimeout,
jmResourceId,
rpc.getExecutor(),
rpc.getScheduledExecutor(),
log);
final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
final JobGraph jobGraph = new JobGraph();
try {
final JobMaster jobMaster = new JobMaster(
new JobGraph(),
new Configuration(),
rpc,
haServices,
Executors.newScheduledThreadPool(1),
mock(BlobLibraryCacheManager.class),
mock(RestartStrategyFactory.class),
Time.of(10, TimeUnit.SECONDS),
null,
jmResourceId,
jmHeartbeatManager,
mock(OnCompletionActions.class),
testingFatalErrorHandler,
new FlinkUserCodeClassLoader(new URL[0]));
jmResourceId,
jobGraph,
new Configuration(),
rpc,
haServices,
heartbeatServices,
Executors.newScheduledThreadPool(1),
mock(BlobLibraryCacheManager.class),
mock(RestartStrategyFactory.class),
Time.of(10, TimeUnit.SECONDS),
null,
mock(OnCompletionActions.class),
testingFatalErrorHandler,
new FlinkUserCodeClassLoader(new URL[0]));
// also start the heartbeat manager in job manager
jobMaster.start(jmLeaderId);
......@@ -117,24 +112,29 @@ public class JobMasterTest extends TestLogger {
// register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
verify(taskExecutorGateway, atLeast(1)).heartbeatFromJobManager(eq(jmResourceId));
ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
heartbeatRunnableCaptor.capture(),
eq(0L),
eq(heartbeatInterval),
eq(TimeUnit.MILLISECONDS));
Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue();
ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(jmHeartbeatManager, "heartbeatTargets");
final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTMsInJM = Whitebox.getInternalState(jobMaster, "registeredTaskManagers");
final SlotPoolGateway slotPoolGateway = mock(SlotPoolGateway.class);
Whitebox.setInternalState(jobMaster, "slotPoolGateway", slotPoolGateway);
Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
// before heartbeat timeout
assertTrue(heartbeatTargets.containsKey(tmResourceId));
assertTrue(registeredTMsInJM.containsKey(tmResourceId));
// run the first heartbeat request
heartbeatRunnable.run();
// continue to unmonitor heartbeat target
waitLatch.countDown();
verify(taskExecutorGateway, times(1)).heartbeatFromJobManager(eq(jmResourceId));
// after heartbeat timeout
verify(slotPoolGateway, timeout(heartbeatTimeout * 5)).releaseTaskManager(eq(tmResourceId));
assertFalse(heartbeatTargets.containsKey(tmResourceId));
assertFalse(registeredTMsInJM.containsKey(tmResourceId));
// run the timeout runnable to simulate a heartbeat timeout
timeoutRunnable.run();
verify(taskExecutorGateway).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
......@@ -143,4 +143,32 @@ public class JobMasterTest extends TestLogger {
rpc.stopService();
}
}
private static class TestingHeartbeatServices extends HeartbeatServices {
private final ScheduledExecutor scheduledExecutorToUse;
public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
super(heartbeatInterval, heartbeatTimeout);
this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
}
@Override
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor scheduledExecutor,
Logger log) {
return new HeartbeatManagerSenderImpl<>(
heartbeatInterval,
heartbeatTimeout,
resourceId,
heartbeatListener,
org.apache.flink.runtime.concurrent.Executors.directExecutor(),
scheduledExecutorToUse,
log);
}
}
}
......@@ -28,7 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
......@@ -67,6 +67,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -104,7 +105,7 @@ public class TaskExecutorITCase {
rpcService.getScheduledExecutor(),
resourceManagerConfiguration.getJobTimeout());
MetricRegistry metricRegistry = mock(MetricRegistry.class);
HeartbeatManagerImpl heartbeatManager = mock(HeartbeatManagerImpl.class);
HeartbeatServices heartbeatServices = mock(HeartbeatServices.class, RETURNS_MOCKS);
final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
......@@ -135,8 +136,8 @@ public class TaskExecutorITCase {
ioManager,
networkEnvironment,
testingHAServices,
heartbeatServices,
metricRegistry,
heartbeatManager,
taskManagerMetricGroup,
broadcastVariableManager,
fileCache,
......
......@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
......@@ -39,8 +40,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.NonHaServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
......@@ -82,16 +84,17 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Matchers;
import org.powermock.reflect.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import java.net.InetAddress;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertFalse;
......@@ -124,15 +127,27 @@ public class TaskExecutorTest extends TestLogger {
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
final CountDownLatch waitLatch = new CountDownLatch(1);
final long heartbeatTimeout = 10L;
final HeartbeatManagerImpl<Void, Void> tmHeartbeatManager = new TestingHeartbeatManagerImpl<>(
waitLatch,
heartbeatTimeout,
tmResourceId,
rpc.getExecutor(),
rpc.getScheduledExecutor(),
log);
HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
when(heartbeatServices.createHeartbeatManager(
eq(taskManagerLocation.getResourceID()),
any(HeartbeatListener.class),
any(ScheduledExecutor.class),
any(Logger.class))).thenAnswer(
new Answer<HeartbeatManagerImpl<Void, Void>>() {
@Override
public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws Throwable {
return new HeartbeatManagerImpl<>(
heartbeatTimeout,
taskManagerLocation.getResourceID(),
(HeartbeatListener<Void, Void>)invocation.getArguments()[1],
(Executor)invocation.getArguments()[2],
(ScheduledExecutor)invocation.getArguments()[2],
(Logger)invocation.getArguments()[3]);
}
}
);
final String jobMasterAddress = "jm";
final UUID jmLeaderId = UUID.randomUUID();
......@@ -147,25 +162,26 @@ public class TaskExecutorTest extends TestLogger {
any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
when(jobMasterGateway.getHostname()).thenReturn("localhost");
try {
final TaskExecutor taskManager = new TaskExecutor(
tmConfig,
taskManagerLocation,
rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
haServices,
mock(MetricRegistry.class),
tmHeartbeatManager,
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
taskSlotTable,
new JobManagerTable(),
jobLeaderService,
testingFatalErrorHandler);
tmConfig,
taskManagerLocation,
rpc,
mock(MemoryManager.class),
mock(IOManager.class),
mock(NetworkEnvironment.class),
haServices,
heartbeatServices,
mock(MetricRegistry.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
taskSlotTable,
new JobManagerTable(),
jobLeaderService,
testingFatalErrorHandler);
taskManager.start();
......@@ -182,23 +198,8 @@ public class TaskExecutorTest extends TestLogger {
verify(jobMasterGateway).registerTaskManager(
eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, "heartbeatTargets");
final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable");
final Map<ResourceID, JobManagerConnection> jobManagerConnections = Whitebox.getInternalState(taskManager, "jobManagerConnections");
// before heartbeat timeout
assertTrue(heartbeatTargets.containsKey(jmResourceId));
assertTrue(jobManagerTable.contains(jobId));
assertTrue(jobManagerConnections.containsKey(jmResourceId));
// continue to unmonitor heartbeat target
waitLatch.countDown();
// after heartbeat timeout
verify(jobMasterGateway, timeout(heartbeatTimeout)).disconnectTaskManager(eq(tmResourceId));
assertFalse(heartbeatTargets.containsKey(jmResourceId));
assertFalse(jobManagerTable.contains(jobId));
assertFalse(jobManagerConnections.containsKey(jmResourceId));
// the timeout should trigger disconnecting from the JobManager
verify(jobMasterGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
......@@ -247,8 +248,8 @@ public class TaskExecutorTest extends TestLogger {
mock(IOManager.class),
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
mock(MetricRegistry.class),
mock(HeartbeatManagerImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
......@@ -324,8 +325,8 @@ public class TaskExecutorTest extends TestLogger {
mock(IOManager.class),
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
mock(MetricRegistry.class),
mock(HeartbeatManagerImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
......@@ -456,8 +457,8 @@ public class TaskExecutorTest extends TestLogger {
mock(IOManager.class),
networkEnvironment,
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
mock(MetricRegistry.class),
mock(HeartbeatManagerImpl.class),
taskManagerMetricGroup,
mock(BroadcastVariableManager.class),
mock(FileCache.class),
......@@ -563,8 +564,8 @@ public class TaskExecutorTest extends TestLogger {
mock(IOManager.class),
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
mock(MetricRegistry.class),
mock(HeartbeatManagerImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
......@@ -602,7 +603,7 @@ public class TaskExecutorTest extends TestLogger {
}
/**
* Tests that accepted slots go into state assigned and the others are returned to the resource
* Tests that accepted slots go into state assigned and the others are returned to the resource
* manager.
*/
@Test
......@@ -649,7 +650,6 @@ public class TaskExecutorTest extends TestLogger {
final AllocationID allocationId2 = new AllocationID();
final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
final SlotOffer offer2 = new SlotOffer(allocationId2, 0, ResourceProfile.UNKNOWN);
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
......@@ -677,8 +677,8 @@ public class TaskExecutorTest extends TestLogger {
mock(IOManager.class),
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
mock(MetricRegistry.class),
mock(HeartbeatManagerImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
......@@ -752,8 +752,8 @@ public class TaskExecutorTest extends TestLogger {
mock(IOManager.class),
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
mock(MetricRegistry.class),
mock(HeartbeatManagerImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
......@@ -895,8 +895,8 @@ public class TaskExecutorTest extends TestLogger {
mock(IOManager.class),
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
mock(MetricRegistry.class),
mock(HeartbeatManagerImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
......
......@@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -89,6 +91,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
@GuardedBy("lock")
private HighAvailabilityServices haServices;
@GuardedBy("lock")
private HeartbeatServices heartbeatServices;
@GuardedBy("lock")
private RpcService commonRpcService;
......@@ -135,6 +140,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
synchronized (lock) {
LOG.info("Starting High Availability Services");
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
heartbeatServices = HeartbeatServices.fromConfiguration(config);
metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
......@@ -210,11 +217,14 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
// now the JobManagerRunner
return new JobManagerRunner(
jobGraph, config,
commonRpcService,
haServices,
this,
this);
ResourceID.generate(),
jobGraph,
config,
commonRpcService,
haServices,
heartbeatServices,
this,
this);
}
protected void shutdown(ApplicationStatus status, String msg) {
......
......@@ -24,6 +24,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.metrics.MetricRegistry;
......@@ -208,11 +209,19 @@ public class YarnTaskExecutorRunner {
LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString());
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(config);
metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
// ---- (2) init task manager runner -------
taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices);
taskManagerRunner = new TaskManagerRunner(config, resourceID, taskExecutorRpcService, haServices, metricRegistry);
taskManagerRunner = new TaskManagerRunner(
config,
resourceID,
taskExecutorRpcService,
haServices,
heartbeatServices,
metricRegistry);
// ---- (3) start the task manager runner
taskManagerRunner.start();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册