diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java index 2258eb1f10f5fba7058ccc499c6f68f8f94dc373..81cbc5dd3d48179ea5080a7aaaec6ef432ec1f4a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java @@ -30,7 +30,7 @@ public class HeartbeatManagerOptions { /** Time interval for requesting heartbeat from sender side */ public static final ConfigOption HEARTBEAT_INTERVAL = - key("heartbeat.sender.interval") + key("heartbeat.interval") .defaultValue(10000L); /** Timeout for requesting and receiving heartbeat for both sender and receiver sides */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java new file mode 100644 index 0000000000000000000000000000000000000000..7d55b9c56301b645c6629317987ac73b07e610ff --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +/** + * HeartbeatServices gives access to all services needed for heartbeating. This includes the + * creation of heartbeat receivers and heartbeat senders. + */ +public class HeartbeatServices { + + /** Heartbeat interval for the created services */ + protected final long heartbeatInterval; + + /** Heartbeat timeout for the created services */ + protected final long heartbeatTimeout; + + public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) { + Preconditions.checkArgument(0L < heartbeatInterval, "The heartbeat interval must be larger than 0."); + Preconditions.checkArgument(heartbeatInterval <= heartbeatTimeout, "The heartbeat timeout should be larger or equal than the heartbeat timeout."); + + this.heartbeatInterval = heartbeatInterval; + this.heartbeatTimeout = heartbeatTimeout; + } + + /** + * Creates a heartbeat manager which does not actively send heartbeats. + * + * @param resourceId Resource Id which identifies the owner of the heartbeat manager + * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered + * targets + * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts + * @param log Logger to be used for the logging + * @param Type of the incoming payload + * @param Type of the outgoing payload + * @return A new HeartbeatManager instance + */ + public HeartbeatManager createHeartbeatManager( + ResourceID resourceId, + HeartbeatListener heartbeatListener, + ScheduledExecutor scheduledExecutor, + Logger log) { + + return new HeartbeatManagerImpl<>( + heartbeatTimeout, + resourceId, + heartbeatListener, + scheduledExecutor, + scheduledExecutor, + log); + } + + /** + * Creates a heartbeat manager which actively sends heartbeats to monitoring targets. + * + * @param resourceId Resource Id which identifies the owner of the heartbeat manager + * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered + * targets + * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts + * @param log Logger to be used for the logging + * @param Type of the incoming payload + * @param Type of the outgoing payload + * @return A new HeartbeatManager instance which actively sends heartbeats + */ + public HeartbeatManager createHeartbeatManagerSender( + ResourceID resourceId, + HeartbeatListener heartbeatListener, + ScheduledExecutor scheduledExecutor, + Logger log) { + + return new HeartbeatManagerSenderImpl<>( + heartbeatInterval, + heartbeatTimeout, + resourceId, + heartbeatListener, + scheduledExecutor, + scheduledExecutor, + log); + } + + /** + * Creates an HeartbeatServices instance from a {@link Configuration}. + * + * @param configuration Configuration to be used for the HeartbeatServices creation + * @return An HeartbeatServices instance created from the given configuration + */ + public static HeartbeatServices fromConfiguration(Configuration configuration) { + long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL); + + long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT); + + return new HeartbeatServices(heartbeatInterval, heartbeatTimeout); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java index 1238f1a263905d532752d1dbd7ab14c3f19d05fd..a6e056dae4a87238c74177645089a59ed15ef039 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java @@ -40,11 +40,12 @@ public class TestingHeartbeatManagerImpl extends HeartbeatManagerImpl heartbeatListener, Executor executor, ScheduledExecutor scheduledExecutor, Logger log) { - super(heartbeatTimeoutIntervalMs, ownResourceID, executor, scheduledExecutor, log); + super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log); this.waitLatch = waitLatch; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java index 700089508237c4a2824e33614a7f4eb1da46dc0a..36f7e96cd47af8892e3a8027ec0e5d843ce52749 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java @@ -38,11 +38,12 @@ public class TestingHeartbeatManagerSenderImpl extends HeartbeatManagerSen long heartbeatPeriod, long heartbeatTimeout, ResourceID ownResourceID, + HeartbeatListener heartbeatListener, Executor executor, ScheduledExecutor scheduledExecutor, Logger log) { - super(heartbeatPeriod, heartbeatTimeout, ownResourceID, executor, scheduledExecutor, log); + super(heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, executor, scheduledExecutor, log); this.waitLatch = waitLatch; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index eced869445712e8d0495dff2e00b90ff97200890..33ee29df1228e1aebbc63d2f3a8ab104d351f5ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -21,11 +21,10 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; -import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; @@ -88,31 +87,47 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F // ------------------------------------------------------------------------ public JobManagerRunner( + final ResourceID resourceId, final JobGraph jobGraph, final Configuration configuration, final RpcService rpcService, final HighAvailabilityServices haServices, + final HeartbeatServices heartbeatServices, final OnCompletionActions toNotifyOnComplete, - final FatalErrorHandler errorHandler) throws Exception - { - this(jobGraph, configuration, rpcService, haServices, - new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)), - toNotifyOnComplete, errorHandler); + final FatalErrorHandler errorHandler) throws Exception { + this( + resourceId, + jobGraph, + configuration, + rpcService, + haServices, + heartbeatServices, + new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)), + toNotifyOnComplete, + errorHandler); } public JobManagerRunner( + final ResourceID resourceId, final JobGraph jobGraph, final Configuration configuration, final RpcService rpcService, final HighAvailabilityServices haServices, + final HeartbeatServices heartbeatServices, final MetricRegistry metricRegistry, final OnCompletionActions toNotifyOnComplete, - final FatalErrorHandler errorHandler) throws Exception - { - this(jobGraph, configuration, rpcService, haServices, - JobManagerServices.fromConfiguration(configuration, haServices), - metricRegistry, - toNotifyOnComplete, errorHandler); + final FatalErrorHandler errorHandler) throws Exception { + this( + resourceId, + jobGraph, + configuration, + rpcService, + haServices, + heartbeatServices, + JobManagerServices.fromConfiguration(configuration, haServices), + metricRegistry, + toNotifyOnComplete, + errorHandler); } /** @@ -127,15 +142,16 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F * required services could not be started, ot the Job could not be initialized. */ public JobManagerRunner( + final ResourceID resourceId, final JobGraph jobGraph, final Configuration configuration, final RpcService rpcService, final HighAvailabilityServices haServices, + final HeartbeatServices heartbeatServices, final JobManagerServices jobManagerServices, final MetricRegistry metricRegistry, final OnCompletionActions toNotifyOnComplete, - final FatalErrorHandler errorHandler) throws Exception - { + final FatalErrorHandler errorHandler) throws Exception { JobManagerMetricGroup jobManagerMetrics = null; @@ -170,31 +186,22 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F this.runningJobsRegistry = haServices.getRunningJobsRegistry(); this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID()); - // heartbeat manager last - final ResourceID resourceID = ResourceID.generate(); - final HeartbeatManagerSenderImpl jobManagerHeartbeatManager = new HeartbeatManagerSenderImpl<>( - configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL), - configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT), - resourceID, - rpcService.getExecutor(), - rpcService.getScheduledExecutor(), - log); - // now start the JobManager this.jobManager = new JobMaster( - jobGraph, configuration, - rpcService, - haServices, - jobManagerServices.executorService, - jobManagerServices.libraryCacheManager, - jobManagerServices.restartStrategyFactory, - jobManagerServices.rpcAskTimeout, - jobManagerMetrics, - resourceID, - jobManagerHeartbeatManager, - this, - this, - userCodeLoader); + resourceId, + jobGraph, + configuration, + rpcService, + haServices, + heartbeatServices, + jobManagerServices.executorService, + jobManagerServices.libraryCacheManager, + jobManagerServices.restartStrategyFactory, + jobManagerServices.rpcAskTimeout, + jobManagerMetrics, + this, + this, + userCodeLoader); } catch (Throwable t) { // clean up everything diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 16c243c88e9f3ccdd01e3a29ed977c6b76faffeb..243b57f2fe79d7c917325465644cf4cbd45bbc08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -51,7 +51,8 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.heartbeat.HeartbeatListener; -import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl; +import org.apache.flink.runtime.heartbeat.HeartbeatManager; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.LeaderIdMismatchException; @@ -105,8 +106,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -129,6 +130,8 @@ public class JobMaster extends RpcEndpoint { // ------------------------------------------------------------------------ + private final ResourceID resourceId; + /** Logical representation of the job */ private final JobGraph jobGraph; @@ -150,10 +153,10 @@ public class JobMaster extends RpcEndpoint { private final MetricGroup jobMetricGroup; /** The heartbeat manager with task managers */ - private final HeartbeatManagerImpl heartbeatManager; + private final HeartbeatManager heartbeatManager; /** The execution context which is used to execute futures */ - private final ExecutorService executionContext; + private final Executor executor; private final OnCompletionActions jobCompletionActions; @@ -170,8 +173,6 @@ public class JobMaster extends RpcEndpoint { private volatile UUID leaderSessionID; - private final ResourceID resourceID; - // --------- ResourceManager -------- /** Leader retriever service used to locate ResourceManager's address */ @@ -187,34 +188,38 @@ public class JobMaster extends RpcEndpoint { // ------------------------------------------------------------------------ public JobMaster( + ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityService, - ScheduledExecutorService executorService, + HeartbeatServices heartbeatServices, + ScheduledExecutorService executor, BlobLibraryCacheManager libraryCacheManager, RestartStrategyFactory restartStrategyFactory, Time rpcAskTimeout, @Nullable JobManagerMetricGroup jobManagerMetricGroup, - ResourceID resourceID, - HeartbeatManagerImpl heartbeatManager, OnCompletionActions jobCompletionActions, FatalErrorHandler errorHandler, - ClassLoader userCodeLoader) throws Exception - { + ClassLoader userCodeLoader) throws Exception { super(rpcService); + this.resourceId = checkNotNull(resourceId); this.jobGraph = checkNotNull(jobGraph); this.configuration = checkNotNull(configuration); this.rpcTimeout = rpcAskTimeout; this.highAvailabilityServices = checkNotNull(highAvailabilityService); this.libraryCacheManager = checkNotNull(libraryCacheManager); - this.executionContext = checkNotNull(executorService); + this.executor = checkNotNull(executor); this.jobCompletionActions = checkNotNull(jobCompletionActions); this.errorHandler = checkNotNull(errorHandler); this.userCodeLoader = checkNotNull(userCodeLoader); - this.resourceID = checkNotNull(resourceID); - this.heartbeatManager = checkNotNull(heartbeatManager); + + this.heartbeatManager = heartbeatServices.createHeartbeatManagerSender( + resourceId, + new TaskManagerHeartbeatListener(), + rpcService.getScheduledExecutor(), + log); final String jobName = jobGraph.getName(); final JobID jid = jobGraph.getJobID(); @@ -251,8 +256,8 @@ public class JobMaster extends RpcEndpoint { null, jobGraph, configuration, - executorService, - executorService, + executor, + executor, slotPool.getSlotProvider(), userCodeLoader, checkpointRecoveryFactory, @@ -288,27 +293,6 @@ public class JobMaster extends RpcEndpoint { // make sure we receive RPC and async calls super.start(); - heartbeatManager.start(new HeartbeatListener() { - @Override - public void notifyHeartbeatTimeout(ResourceID resourceID) { - log.info("Notify heartbeat timeout with task manager {}", resourceID); - heartbeatManager.unmonitorTarget(resourceID); - - getSelf().disconnectTaskManager(resourceID); - } - - @Override - public void reportPayload(ResourceID resourceID, Void payload) { - // currently there is no payload from task manager and resource manager, so this method will not be called. - } - - @Override - public Future retrievePayload() { - // currently no need payload. - return null; - } - }); - log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID()); getSelf().startJobExecution(); } @@ -322,8 +306,9 @@ public class JobMaster extends RpcEndpoint { */ @Override public void shutDown() throws Exception { - // make sure there is a graceful exit heartbeatManager.stop(); + + // make sure there is a graceful exit getSelf().suspendExecution(new Exception("JobManager is shutting down.")); super.shutDown(); } @@ -371,7 +356,7 @@ public class JobMaster extends RpcEndpoint { } // start scheduling job in another thread - executionContext.execute(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { try { @@ -545,9 +530,16 @@ public class JobMaster extends RpcEndpoint { } @RpcMethod - public void disconnectTaskManager(final ResourceID resourceID) { - registeredTaskManagers.remove(resourceID); + public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) { + heartbeatManager.unmonitorTarget(resourceID); slotPoolGateway.releaseTaskManager(resourceID); + + Tuple2 taskManagerConnection = registeredTaskManagers.remove(resourceID); + + if (taskManagerConnection != null) { + taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause); + } + } // TODO: This method needs a leader session ID @@ -743,7 +735,7 @@ public class JobMaster extends RpcEndpoint { if (registeredTaskManagers.containsKey(taskManagerId)) { final RegistrationResponse response = new JMTMRegistrationSuccess( - resourceID, libraryCacheManager.getBlobServerPort()); + resourceId, libraryCacheManager.getBlobServerPort()); return FlinkCompletableFuture.completed(response); } else { return getRpcService().execute(new Callable() { @@ -773,7 +765,7 @@ public class JobMaster extends RpcEndpoint { // monitor the task manager as heartbeat target heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget() { @Override - public void sendHeartbeat(ResourceID resourceID, Void payload) { + public void receiveHeartbeat(ResourceID resourceID, Void payload) { // the task manager will not request heartbeat, so this method will never be called currently } @@ -783,7 +775,7 @@ public class JobMaster extends RpcEndpoint { } }); - return new JMTMRegistrationSuccess(resourceID, libraryCacheManager.getBlobServerPort()); + return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort()); } }, getMainThreadExecutor()); } @@ -799,7 +791,7 @@ public class JobMaster extends RpcEndpoint { @RpcMethod public void heartbeatFromTaskManager(final ResourceID resourceID) { - heartbeatManager.sendHeartbeat(resourceID, null); + heartbeatManager.receiveHeartbeat(resourceID, null); } //---------------------------------------------------------------------------------------------- @@ -903,7 +895,7 @@ public class JobMaster extends RpcEndpoint { log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); resourceManagerConnection = new ResourceManagerConnection( log, jobGraph.getJobID(), getAddress(), leaderSessionID, - resourceManagerAddress, resourceManagerLeaderId, executionContext); + resourceManagerAddress, resourceManagerLeaderId, executor); resourceManagerConnection.start(); } } @@ -1046,4 +1038,26 @@ public class JobMaster extends RpcEndpoint { }); } } + + private class TaskManagerHeartbeatListener implements HeartbeatListener { + + @Override + public void notifyHeartbeatTimeout(ResourceID resourceID) { + log.info("Task manager with id {} timed out.", resourceID); + + getSelf().disconnectTaskManager( + resourceID, + new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out.")); + } + + @Override + public void reportPayload(ResourceID resourceID, Void payload) { + // nothing to do since there is no payload + } + + @Override + public Future retrievePayload() { + return FlinkCompletableFuture.completed(null); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index e7e3111f4859008e48dad428b7fba3b5517c9ab9..13a7372607442c97374ed12f72e1dacba195379e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -124,8 +124,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * {@link JobMaster}. * * @param resourceID identifying the TaskManager to disconnect + * @param cause for the disconnection of the TaskManager */ - void disconnectTaskManager(ResourceID resourceID); + void disconnectTaskManager(ResourceID resourceID, Exception cause); /** * Disconnects the resource manager from the job manager because of the given cause. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 1933554b935e87bbd017ce1e0fb51a2957fb179b..25c4aba47d0a257085e5c5699fe62e3ccef48ee4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -81,6 +82,9 @@ public class MiniCluster { @GuardedBy("lock") private HighAvailabilityServices haServices; + @GuardedBy("lock") + private HeartbeatServices heartbeatServices; + @GuardedBy("lock") private ResourceManagerRunner[] resourceManagerRunners; @@ -232,6 +236,8 @@ public class MiniCluster { LOG.info("Starting high-availability services"); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration); + heartbeatServices = HeartbeatServices.fromConfiguration(configuration); + // bring up the ResourceManager(s) LOG.info("Starting {} ResourceManger(s)", numResourceManagers); resourceManagerRunners = startResourceManagers( @@ -245,7 +251,12 @@ public class MiniCluster { // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers); jobDispatcher = new MiniClusterJobDispatcher( - configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices); + configuration, + haServices, + heartbeatServices, + metricRegistry, + numJobManagers, + jobManagerRpcServices); } catch (Exception e) { // cleanup everything @@ -533,6 +544,7 @@ public class MiniCluster { new ResourceID(UUID.randomUUID().toString()), taskManagerRpcServices[i], haServices, + heartbeatServices, metricRegistry, localCommunication); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index dd80ada10b6f68eb17424aef3cbdf7b973adf583..1f8ae80a7bf152b4e2df01af0867c251eb32118b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; @@ -63,6 +65,9 @@ public class MiniClusterJobDispatcher { /** services for discovery, leader election, and recovery */ private final HighAvailabilityServices haServices; + /** services for heartbeating */ + private final HeartbeatServices heartbeatServices; + /** all the services that the JobManager needs, such as BLOB service, factories, etc */ private final JobManagerServices jobManagerServices; @@ -94,8 +99,9 @@ public class MiniClusterJobDispatcher { Configuration config, RpcService rpcService, HighAvailabilityServices haServices, + HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception { - this(config, haServices, metricRegistry, 1, new RpcService[] { rpcService }); + this(config, haServices, heartbeatServices, metricRegistry, 1, new RpcService[] { rpcService }); } /** @@ -113,6 +119,7 @@ public class MiniClusterJobDispatcher { public MiniClusterJobDispatcher( Configuration config, HighAvailabilityServices haServices, + HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, int numJobManagers, RpcService[] rpcServices) throws Exception { @@ -123,6 +130,7 @@ public class MiniClusterJobDispatcher { this.configuration = checkNotNull(config); this.rpcServices = rpcServices; this.haServices = checkNotNull(haServices); + this.heartbeatServices = checkNotNull(heartbeatServices); this.metricRegistry = checkNotNull(metricRegistry); this.numJobManagers = numJobManagers; @@ -232,9 +240,17 @@ public class MiniClusterJobDispatcher { JobManagerRunner[] runners = new JobManagerRunner[numJobManagers]; for (int i = 0; i < numJobManagers; i++) { try { - runners[i] = new JobManagerRunner(job, configuration, - rpcServices[i], haServices, jobManagerServices, metricRegistry, - onCompletion, errorHandler); + runners[i] = new JobManagerRunner( + ResourceID.generate(), + job, + configuration, + rpcServices[i], + haServices, + heartbeatServices, + jobManagerServices, + metricRegistry, + onCompletion, + errorHandler); runners[i].start(); } catch (Throwable t) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index e601b0be626b43420ead4a076cce2564e937244a..00a1bf89c7daedf3d688b748870ba90f6e092bd1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -39,7 +40,8 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.heartbeat.HeartbeatListener; -import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl; +import org.apache.flink.runtime.heartbeat.HeartbeatManager; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -131,7 +133,7 @@ public class TaskExecutor extends RpcEndpoint { private final MetricRegistry metricRegistry; /** The heartbeat manager for job manager in the task manager */ - private final HeartbeatManagerImpl jobManagerHeartbeatManager; + private final HeartbeatManager jobManagerHeartbeatManager; /** The fatal error handler to use in case of a fatal error */ private final FatalErrorHandler fatalErrorHandler; @@ -168,8 +170,8 @@ public class TaskExecutor extends RpcEndpoint { IOManager ioManager, NetworkEnvironment networkEnvironment, HighAvailabilityServices haServices, + HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, - HeartbeatManagerImpl jobManagerHeartbeatManager, TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, @@ -189,7 +191,6 @@ public class TaskExecutor extends RpcEndpoint { this.networkEnvironment = checkNotNull(networkEnvironment); this.haServices = checkNotNull(haServices); this.metricRegistry = checkNotNull(metricRegistry); - this.jobManagerHeartbeatManager = checkNotNull(jobManagerHeartbeatManager); this.taskSlotTable = checkNotNull(taskSlotTable); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); @@ -199,6 +200,12 @@ public class TaskExecutor extends RpcEndpoint { this.jobLeaderService = checkNotNull(jobLeaderService); this.jobManagerConnections = new HashMap<>(4); + + this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager( + getResourceID(), + new JobManagerHeartbeatListener(), + rpcService.getScheduledExecutor(), + log); } // ------------------------------------------------------------------------ @@ -221,38 +228,6 @@ public class TaskExecutor extends RpcEndpoint { // start the job leader service jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); - - // start the heartbeat manager for monitoring job manager - jobManagerHeartbeatManager.start(new HeartbeatListener() { - @Override - public void notifyHeartbeatTimeout(final ResourceID resourceID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("Notify heartbeat timeout with job manager {}", resourceID); - jobManagerHeartbeatManager.unmonitorTarget(resourceID); - - if (jobManagerConnections.containsKey(resourceID)) { - JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID); - if (jobManagerConnection != null) { - closeJobManagerConnection(jobManagerConnection.getJobID()); - } - } - } - }); - } - - @Override - public void reportPayload(ResourceID resourceID, Void payload) { - // currently there is no payload from job manager, so this method will not be called. - } - - @Override - public Future retrievePayload() { - // currently no need payload. - return null; - } - }); } /** @@ -644,6 +619,11 @@ public class TaskExecutor extends RpcEndpoint { return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId); } + @RpcMethod + public void disconnectJobManager(JobID jobId, Exception cause) { + closeJobManagerConnection(jobId, cause); + } + // ====================================================================== // Internal methods // ====================================================================== @@ -786,7 +766,7 @@ public class TaskExecutor extends RpcEndpoint { if (jobManagerTable.contains(jobId)) { JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId); if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) { - closeJobManagerConnection(jobId); + closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId + '.')); } } @@ -803,20 +783,20 @@ public class TaskExecutor extends RpcEndpoint { // monitor the job manager as heartbeat target jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget() { @Override - public void sendHeartbeat(ResourceID resourceID, Void payload) { + public void receiveHeartbeat(ResourceID resourceID, Void payload) { jobMasterGateway.heartbeatFromTaskManager(resourceID); } @Override public void requestHeartbeat(ResourceID resourceID, Void payload) { - // request heartbeat will never be called in task manager side + // request heartbeat will never be called on the task manager side } }); offerSlotsToJobManager(jobId); } - private void closeJobManagerConnection(JobID jobId) { + private void closeJobManagerConnection(JobID jobId, Exception cause) { log.info("Close JobManager connection for job {}.", jobId); // 1. fail tasks running under this JobID @@ -847,8 +827,10 @@ public class TaskExecutor extends RpcEndpoint { if (jobManagerConnection != null) { try { + jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceID()); + jobManagerConnections.remove(jobManagerConnection.getResourceID()); - disassociateFromJobManager(jobManagerConnection); + disassociateFromJobManager(jobManagerConnection, cause); } catch (IOException e) { log.warn("Could not properly disassociate from JobManager {}.", jobManagerConnection.getJobManagerGateway().getAddress(), e); @@ -909,10 +891,10 @@ public class TaskExecutor extends RpcEndpoint { partitionStateChecker); } - private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException { + private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException { Preconditions.checkNotNull(jobManagerConnection); JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway(); - jobManagerGateway.disconnectTaskManager(getResourceID()); + jobManagerGateway.disconnectTaskManager(getResourceID(), cause); jobManagerConnection.getLibraryCacheManager().shutdown(); } @@ -1138,7 +1120,9 @@ public class TaskExecutor extends RpcEndpoint { runAsync(new Runnable() { @Override public void run() { - closeJobManagerConnection(jobId); + closeJobManagerConnection( + jobId, + new Exception("Job leader for job id " + jobId + " lost leadership.")); } }); } @@ -1220,4 +1204,37 @@ public class TaskExecutor extends RpcEndpoint { }); } } + + private class JobManagerHeartbeatListener implements HeartbeatListener { + + @Override + public void notifyHeartbeatTimeout(final ResourceID resourceID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("The JobManager connection {} has timed out.", resourceID); + + if (jobManagerConnections.containsKey(resourceID)) { + JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID); + if (jobManagerConnection != null) { + closeJobManagerConnection( + jobManagerConnection.getJobID(), + new TimeoutException("The heartbeat of JobManager with id " + + resourceID + " timed out.")); + } + } + } + }); + } + + @Override + public void reportPayload(ResourceID resourceID, Void payload) { + // nothing to do since the payload is of type Void + } + + @Override + public Future retrievePayload() { + return FlinkCompletableFuture.completed(null); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 95db932f959d51ca6caae8f08cc4fbc7598d7fa4..2dcc3a48625878cb4eecdd290a7f5803a2df9f60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -134,9 +134,17 @@ public interface TaskExecutorGateway extends RpcGateway { Future cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout); /** - * Request heartbeat from the job manager + * Heartbeat request from the job manager * - * @param resourceID unique id of the job manager + * @param heartbeatOrigin unique id of the job manager */ - void heartbeatFromJobManager(ResourceID resourceID); + void heartbeatFromJobManager(ResourceID heartbeatOrigin); + + /** + * Disconnects the given JobManager from the TaskManager. + * + * @param jobId JobID for which the JobManager was the leader + * @param cause for the disconnection from the JobManager + */ + void disconnectJobManager(JobID jobId, Exception cause); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 402421cea8737e23aa34095e57dd0eccee39a755..c99eb91640dc4a5b95b46ecf53c812ac43c26c83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -21,11 +21,10 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -73,18 +72,27 @@ public class TaskManagerRunner implements FatalErrorHandler { ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception { - this(configuration, resourceID, rpcService, highAvailabilityServices, metricRegistry, false); + this( + configuration, + resourceID, + rpcService, + highAvailabilityServices, + heartbeatServices, + metricRegistry, + false); } public TaskManagerRunner( - Configuration configuration, - ResourceID resourceID, - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - MetricRegistry metricRegistry, - boolean localCommunicationOnly) throws Exception { + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + boolean localCommunicationOnly) throws Exception { this.configuration = Preconditions.checkNotNull(configuration); this.resourceID = Preconditions.checkNotNull(resourceID); @@ -114,13 +122,6 @@ public class TaskManagerRunner implements FatalErrorHandler { // Initialize the TM metrics TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment()); - HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl<>( - configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT), - resourceID, - executor, - rpcService.getScheduledExecutor(), - LOG); - this.taskManager = new TaskExecutor( taskManagerConfiguration, taskManagerServices.getTaskManagerLocation(), @@ -129,8 +130,8 @@ public class TaskManagerRunner implements FatalErrorHandler { taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), highAvailabilityServices, + heartbeatServices, metricRegistry, - heartbeatManager, taskManagerMetricGroup, taskManagerServices.getBroadcastVariableManager(), taskManagerServices.getFileCache(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index 1a9818ee3b5312406657e983fdc4be874c3ef2dc..d2221c50137e7e56416c0c5eba2bb9023db65bdd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobStore; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -99,11 +101,15 @@ public class JobManagerRunnerMockTest { when(haServices.createBlobStore()).thenReturn(blobStore); when(haServices.getRunningJobsRegistry()).thenReturn(runningJobsRegistry); + HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); + runner = PowerMockito.spy(new JobManagerRunner( + ResourceID.generate(), new JobGraph("test", new JobVertex("vertex")), mock(Configuration.class), mockRpc, haServices, + heartbeatServices, JobManagerServices.fromConfiguration(new Configuration(), haServices), new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()), jobCompletion, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index cdad87fa556e99f77360a02e945bd95b7418127b..567a8fc6044338ea6753c55e5ca5b75823078183 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -19,17 +19,18 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.heartbeat.HeartbeatListener; +import org.apache.flink.runtime.heartbeat.HeartbeatManager; import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl; -import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerSenderImpl; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.instance.SlotPoolGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; @@ -37,24 +38,22 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; +import org.slf4j.Logger; import java.net.InetAddress; import java.net.URL; -import java.util.Map; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.*; @@ -84,32 +83,28 @@ public class JobMasterTest extends TestLogger { final long heartbeatInterval = 1L; final long heartbeatTimeout = 5L; - final CountDownLatch waitLatch = new CountDownLatch(1); - final HeartbeatManagerSenderImpl jmHeartbeatManager = new TestingHeartbeatManagerSenderImpl<>( - waitLatch, - heartbeatInterval, - heartbeatTimeout, - jmResourceId, - rpc.getExecutor(), - rpc.getScheduledExecutor(), - log); + + final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); + + final JobGraph jobGraph = new JobGraph(); try { final JobMaster jobMaster = new JobMaster( - new JobGraph(), - new Configuration(), - rpc, - haServices, - Executors.newScheduledThreadPool(1), - mock(BlobLibraryCacheManager.class), - mock(RestartStrategyFactory.class), - Time.of(10, TimeUnit.SECONDS), - null, - jmResourceId, - jmHeartbeatManager, - mock(OnCompletionActions.class), - testingFatalErrorHandler, - new FlinkUserCodeClassLoader(new URL[0])); + jmResourceId, + jobGraph, + new Configuration(), + rpc, + haServices, + heartbeatServices, + Executors.newScheduledThreadPool(1), + mock(BlobLibraryCacheManager.class), + mock(RestartStrategyFactory.class), + Time.of(10, TimeUnit.SECONDS), + null, + mock(OnCompletionActions.class), + testingFatalErrorHandler, + new FlinkUserCodeClassLoader(new URL[0])); // also start the heartbeat manager in job manager jobMaster.start(jmLeaderId); @@ -117,24 +112,29 @@ public class JobMasterTest extends TestLogger { // register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId); - verify(taskExecutorGateway, atLeast(1)).heartbeatFromJobManager(eq(jmResourceId)); + ArgumentCaptor heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutor, times(1)).scheduleAtFixedRate( + heartbeatRunnableCaptor.capture(), + eq(0L), + eq(heartbeatInterval), + eq(TimeUnit.MILLISECONDS)); + + Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue(); + + ArgumentCaptor timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS)); - final ConcurrentHashMap heartbeatTargets = Whitebox.getInternalState(jmHeartbeatManager, "heartbeatTargets"); - final Map> registeredTMsInJM = Whitebox.getInternalState(jobMaster, "registeredTaskManagers"); - final SlotPoolGateway slotPoolGateway = mock(SlotPoolGateway.class); - Whitebox.setInternalState(jobMaster, "slotPoolGateway", slotPoolGateway); + Runnable timeoutRunnable = timeoutRunnableCaptor.getValue(); - // before heartbeat timeout - assertTrue(heartbeatTargets.containsKey(tmResourceId)); - assertTrue(registeredTMsInJM.containsKey(tmResourceId)); + // run the first heartbeat request + heartbeatRunnable.run(); - // continue to unmonitor heartbeat target - waitLatch.countDown(); + verify(taskExecutorGateway, times(1)).heartbeatFromJobManager(eq(jmResourceId)); - // after heartbeat timeout - verify(slotPoolGateway, timeout(heartbeatTimeout * 5)).releaseTaskManager(eq(tmResourceId)); - assertFalse(heartbeatTargets.containsKey(tmResourceId)); - assertFalse(registeredTMsInJM.containsKey(tmResourceId)); + // run the timeout runnable to simulate a heartbeat timeout + timeoutRunnable.run(); + + verify(taskExecutorGateway).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class)); // check if a concurrent error occurred testingFatalErrorHandler.rethrowError(); @@ -143,4 +143,32 @@ public class JobMasterTest extends TestLogger { rpc.stopService(); } } + + private static class TestingHeartbeatServices extends HeartbeatServices { + + private final ScheduledExecutor scheduledExecutorToUse; + + public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) { + super(heartbeatInterval, heartbeatTimeout); + + this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse); + } + + @Override + public HeartbeatManager createHeartbeatManagerSender( + ResourceID resourceId, + HeartbeatListener heartbeatListener, + ScheduledExecutor scheduledExecutor, + Logger log) { + + return new HeartbeatManagerSenderImpl<>( + heartbeatInterval, + heartbeatTimeout, + resourceId, + heartbeatListener, + org.apache.flink.runtime.concurrent.Executors.directExecutor(), + scheduledExecutorToUse, + log); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 5ffc97eab108c9cf56e2f1a5ef95704df480ac34..16edbf70e7701efbfadea55759e6130903de85c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.filecache.FileCache; -import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; @@ -67,6 +67,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -104,7 +105,7 @@ public class TaskExecutorITCase { rpcService.getScheduledExecutor(), resourceManagerConfiguration.getJobTimeout()); MetricRegistry metricRegistry = mock(MetricRegistry.class); - HeartbeatManagerImpl heartbeatManager = mock(HeartbeatManagerImpl.class); + HeartbeatServices heartbeatServices = mock(HeartbeatServices.class, RETURNS_MOCKS); final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234); @@ -135,8 +136,8 @@ public class TaskExecutorITCase { ioManager, networkEnvironment, testingHAServices, + heartbeatServices, metricRegistry, - heartbeatManager, taskManagerMetricGroup, broadcastVariableManager, fileCache, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index f50024686b1ba862f7092ae786e5118bcfb9f092..0f5bad3cfa7d44e58c552e16480e5d8c160dfa4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -39,8 +40,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.heartbeat.HeartbeatListener; import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl; -import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerImpl; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.NonHaServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; @@ -82,16 +84,17 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.mockito.Matchers; -import org.powermock.reflect.Whitebox; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; import java.net.InetAddress; import java.net.URL; import java.util.Arrays; import java.util.Collections; -import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertFalse; @@ -124,15 +127,27 @@ public class TaskExecutorTest extends TestLogger { final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); - final CountDownLatch waitLatch = new CountDownLatch(1); final long heartbeatTimeout = 10L; - final HeartbeatManagerImpl tmHeartbeatManager = new TestingHeartbeatManagerImpl<>( - waitLatch, - heartbeatTimeout, - tmResourceId, - rpc.getExecutor(), - rpc.getScheduledExecutor(), - log); + + HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); + when(heartbeatServices.createHeartbeatManager( + eq(taskManagerLocation.getResourceID()), + any(HeartbeatListener.class), + any(ScheduledExecutor.class), + any(Logger.class))).thenAnswer( + new Answer>() { + @Override + public HeartbeatManagerImpl answer(InvocationOnMock invocation) throws Throwable { + return new HeartbeatManagerImpl<>( + heartbeatTimeout, + taskManagerLocation.getResourceID(), + (HeartbeatListener)invocation.getArguments()[1], + (Executor)invocation.getArguments()[2], + (ScheduledExecutor)invocation.getArguments()[2], + (Logger)invocation.getArguments()[3]); + } + } + ); final String jobMasterAddress = "jm"; final UUID jmLeaderId = UUID.randomUUID(); @@ -147,25 +162,26 @@ public class TaskExecutorTest extends TestLogger { any(Time.class) )).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress); + when(jobMasterGateway.getHostname()).thenReturn("localhost"); try { final TaskExecutor taskManager = new TaskExecutor( - tmConfig, - taskManagerLocation, - rpc, - mock(MemoryManager.class), - mock(IOManager.class), - mock(NetworkEnvironment.class), - haServices, - mock(MetricRegistry.class), - tmHeartbeatManager, - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), - taskSlotTable, - new JobManagerTable(), - jobLeaderService, - testingFatalErrorHandler); + tmConfig, + taskManagerLocation, + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + heartbeatServices, + mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + new JobManagerTable(), + jobLeaderService, + testingFatalErrorHandler); taskManager.start(); @@ -182,23 +198,8 @@ public class TaskExecutorTest extends TestLogger { verify(jobMasterGateway).registerTaskManager( eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class)); - final ConcurrentHashMap heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, "heartbeatTargets"); - final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable"); - final Map jobManagerConnections = Whitebox.getInternalState(taskManager, "jobManagerConnections"); - - // before heartbeat timeout - assertTrue(heartbeatTargets.containsKey(jmResourceId)); - assertTrue(jobManagerTable.contains(jobId)); - assertTrue(jobManagerConnections.containsKey(jmResourceId)); - - // continue to unmonitor heartbeat target - waitLatch.countDown(); - - // after heartbeat timeout - verify(jobMasterGateway, timeout(heartbeatTimeout)).disconnectTaskManager(eq(tmResourceId)); - assertFalse(heartbeatTargets.containsKey(jmResourceId)); - assertFalse(jobManagerTable.contains(jobId)); - assertFalse(jobManagerConnections.containsKey(jmResourceId)); + // the timeout should trigger disconnecting from the JobManager + verify(jobMasterGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class)); // check if a concurrent error occurred testingFatalErrorHandler.rethrowError(); @@ -247,8 +248,8 @@ public class TaskExecutorTest extends TestLogger { mock(IOManager.class), mock(NetworkEnvironment.class), haServices, + mock(HeartbeatServices.class, RETURNS_MOCKS), mock(MetricRegistry.class), - mock(HeartbeatManagerImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -324,8 +325,8 @@ public class TaskExecutorTest extends TestLogger { mock(IOManager.class), mock(NetworkEnvironment.class), haServices, + mock(HeartbeatServices.class, RETURNS_MOCKS), mock(MetricRegistry.class), - mock(HeartbeatManagerImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -456,8 +457,8 @@ public class TaskExecutorTest extends TestLogger { mock(IOManager.class), networkEnvironment, haServices, + mock(HeartbeatServices.class, RETURNS_MOCKS), mock(MetricRegistry.class), - mock(HeartbeatManagerImpl.class), taskManagerMetricGroup, mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -563,8 +564,8 @@ public class TaskExecutorTest extends TestLogger { mock(IOManager.class), mock(NetworkEnvironment.class), haServices, + mock(HeartbeatServices.class, RETURNS_MOCKS), mock(MetricRegistry.class), - mock(HeartbeatManagerImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -602,7 +603,7 @@ public class TaskExecutorTest extends TestLogger { } /** - * Tests that accepted slots go into state assigned and the others are returned to the resource + * Tests that accepted slots go into state assigned and the others are returned to the resource * manager. */ @Test @@ -649,7 +650,6 @@ public class TaskExecutorTest extends TestLogger { final AllocationID allocationId2 = new AllocationID(); final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN); - final SlotOffer offer2 = new SlotOffer(allocationId2, 0, ResourceProfile.UNKNOWN); final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); @@ -677,8 +677,8 @@ public class TaskExecutorTest extends TestLogger { mock(IOManager.class), mock(NetworkEnvironment.class), haServices, + mock(HeartbeatServices.class, RETURNS_MOCKS), mock(MetricRegistry.class), - mock(HeartbeatManagerImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -752,8 +752,8 @@ public class TaskExecutorTest extends TestLogger { mock(IOManager.class), mock(NetworkEnvironment.class), haServices, + mock(HeartbeatServices.class, RETURNS_MOCKS), mock(MetricRegistry.class), - mock(HeartbeatManagerImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -895,8 +895,8 @@ public class TaskExecutorTest extends TestLogger { mock(IOManager.class), mock(NetworkEnvironment.class), haServices, + mock(HeartbeatServices.class, RETURNS_MOCKS), mock(MetricRegistry.class), - mock(HeartbeatManagerImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index ddeb02e4fe768bfdbb6fae1ba291cc5fba0cde41..6fb7c8605acf6740409b6d79d9903ee67b2a5018 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -89,6 +91,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati @GuardedBy("lock") private HighAvailabilityServices haServices; + @GuardedBy("lock") + private HeartbeatServices heartbeatServices; + @GuardedBy("lock") private RpcService commonRpcService; @@ -135,6 +140,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati synchronized (lock) { LOG.info("Starting High Availability Services"); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config); + + heartbeatServices = HeartbeatServices.fromConfiguration(config); metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); commonRpcService = createRpcService(config, appMasterHostname, amPortRange); @@ -210,11 +217,14 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati // now the JobManagerRunner return new JobManagerRunner( - jobGraph, config, - commonRpcService, - haServices, - this, - this); + ResourceID.generate(), + jobGraph, + config, + commonRpcService, + haServices, + heartbeatServices, + this, + this); } protected void shutdown(ApplicationStatus status, String msg) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index 414c3de7977c496980af585c8745e5f500dd3fbb..23697656c6f591e027345d208da48555d7593b51 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -208,11 +209,19 @@ public class YarnTaskExecutorRunner { LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString()); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config); + HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(config); + metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); // ---- (2) init task manager runner ------- taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices); - taskManagerRunner = new TaskManagerRunner(config, resourceID, taskExecutorRpcService, haServices, metricRegistry); + taskManagerRunner = new TaskManagerRunner( + config, + resourceID, + taskExecutorRpcService, + haServices, + heartbeatServices, + metricRegistry); // ---- (3) start the task manager runner taskManagerRunner.start();