提交 83b99f8a 编写于 作者: T Till Rohrmann

[FLINK-4354] [heartbeat] Add heartbeats between the ResourceManager and TaskExecutor

上级 fd90672f
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.heartbeat;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
/**
* Heartbeat manager implementation which extends {@link HeartbeatManagerImpl} for testing.
* It overrides the {@link #unmonitorTarget(ResourceID)} to wait for some tests complete
* when notify heartbeat timeout.
*
* @param <I> Type of the incoming heartbeat payload
* @param <O> Type of the outgoing heartbeat payload
*/
public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
private final CountDownLatch waitLatch;
public TestingHeartbeatManagerImpl(
CountDownLatch waitLatch,
long heartbeatTimeoutIntervalMs,
ResourceID ownResourceID,
HeartbeatListener<I, O> heartbeatListener,
Executor executor,
ScheduledExecutor scheduledExecutor,
Logger log) {
super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
this.waitLatch = waitLatch;
}
@Override
public void unmonitorTarget(ResourceID resourceID) {
try {
waitLatch.await();
} catch (InterruptedException ex) {
log.error("Unexpected interrupted exception.", ex);
}
super.unmonitorTarget(resourceID);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.heartbeat;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
/**
*
* @param <I>
* @param <O>
*/
public class TestingHeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerSenderImpl<I, O> {
private final CountDownLatch waitLatch;
public TestingHeartbeatManagerSenderImpl(
CountDownLatch waitLatch,
long heartbeatPeriod,
long heartbeatTimeout,
ResourceID ownResourceID,
HeartbeatListener<I, O> heartbeatListener,
Executor executor,
ScheduledExecutor scheduledExecutor,
Logger log) {
super(heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
this.waitLatch = waitLatch;
}
@Override
public void unmonitorTarget(ResourceID resourceID) {
try {
waitLatch.await();
} catch (InterruptedException ex) {
log.error("Unexpected interrupted exception.", ex);
}
super.unmonitorTarget(resourceID);
}
}
......@@ -1043,11 +1043,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@Override
public void notifyHeartbeatTimeout(ResourceID resourceID) {
log.info("Task manager with id {} heartbeat timed out.", resourceID);
log.info("Heartbeat of TaskManager with id {} timed out.", resourceID);
getSelf().disconnectTaskManager(
resourceID,
new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
}
@Override
......
......@@ -241,7 +241,12 @@ public class MiniCluster {
// bring up the ResourceManager(s)
LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
resourceManagerRunners = startResourceManagers(
configuration, haServices, heartbeatServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
configuration,
haServices,
heartbeatServices,
metricRegistry,
numResourceManagers,
resourceManagerRpcServices);
// bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
......
......@@ -64,6 +64,7 @@ import org.apache.flink.util.ExceptionUtils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -129,8 +130,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
public ResourceManager(
ResourceID resourceId,
RpcService rpcService,
ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
......@@ -359,7 +360,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
final ResourceID taskExecutorResourceId,
final SlotReport slotReport) {
if (leaderSessionId.equals(resourceManagerLeaderId)) {
if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) {
Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
......@@ -384,7 +385,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
// the task manager will not request heartbeat, so this method will never be called currently
// the ResourceManager will always send heartbeat requests to the
// TaskManager
}
@Override
......@@ -394,7 +396,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
});
return new TaskExecutorRegistrationSuccess(
registration.getInstanceID(), resourceId,
registration.getInstanceID(),
resourceId,
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
}
}
......@@ -606,6 +609,30 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
}
/**
* This method should be called by the framework once it detects that a currently registered
* task executor has failed.
*
* @param resourceID Id of the TaskManager that has failed.
* @param cause The exception which cause the TaskManager failed.
*/
protected void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
taskManagerHeartbeatManager.unmonitorTarget(resourceID);
WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
if (workerRegistration != null) {
log.info("Task manager {} failed because {}.", resourceID, cause);
// TODO :: suggest failed task executor to stop itself
slotManager.notifyTaskManagerFailure(resourceID);
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
} else {
log.debug("Could not find a registered task manager with the process id {}.", resourceID);
}
}
/**
* Checks whether the given resource manager leader id is matching the current leader id and
* not null.
......@@ -756,30 +783,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
onFatalErrorAsync(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
}
/**
* This method should be called by the framework once it detects that a currently registered
* task executor has failed.
*
* @param resourceID Id of the TaskManager that has failed.
* @param cause The exception which cause the TaskManager failed.
*/
public void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
taskManagerHeartbeatManager.unmonitorTarget(resourceID);
WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
if (workerRegistration != null) {
log.info("Task manager {} failed because {}.", resourceID, cause);
// TODO :: suggest failed task executor to stop itself
slotManager.notifyTaskManagerFailure(resourceID);
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
} else {
log.debug("Could not find a registered task manager with the process id {}.", resourceID);
}
}
// ------------------------------------------------------------------------
// Framework specific behavior
// ------------------------------------------------------------------------
......@@ -875,11 +878,17 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
@Override
public void notifyHeartbeatTimeout(ResourceID resourceID) {
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);
closeTaskManagerConnection(resourceID, new TimeoutException(
"Task manager with id " + resourceID + " heartbeat timed out."));
runAsync(new Runnable() {
@Override
public void run() {
closeTaskManagerConnection(
resourceID,
new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
}
});
}
@Override
......
......@@ -134,13 +134,12 @@ public interface ResourceManagerGateway extends RpcGateway {
/**
* Sends the heartbeat to resource manager from task manager
*
* @param resourceID unique id of the task manager
* @param heartbeatOrigin unique id of the task manager
*/
void heartbeatFromTaskManager(final ResourceID resourceID);
void heartbeatFromTaskManager(final ResourceID heartbeatOrigin);
/**
* Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
* {@link ResourceManager}.
* Disconnects a TaskManager specified by the given resourceID from the {@link ResourceManager}.
*
* @param resourceID identifying the TaskManager to disconnect
* @param cause for the disconnection of the TaskManager
......
......@@ -69,8 +69,8 @@ public class ResourceManagerRunner implements FatalErrorHandler {
rpcService.getScheduledExecutor());
this.resourceManager = new StandaloneResourceManager(
resourceId,
rpcService,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
......
......@@ -38,8 +38,8 @@ import org.apache.flink.runtime.rpc.RpcService;
public class StandaloneResourceManager extends ResourceManager<ResourceID> {
public StandaloneResourceManager(
ResourceID resourceId,
RpcService rpcService,
ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
......@@ -48,8 +48,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
resourceId,
rpcService,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
......
......@@ -696,17 +696,35 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
private void establishResourceManagerConnection(ResourceID resourceManagerResourceId) {
// monitor the resource manager as heartbeat target
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
resourceManagerGateway.heartbeatFromTaskManager(resourceID);
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
// the TaskManager won't send heartbeat requests to the ResourceManager
}
});
}
private void closeResourceManagerConnection(Exception cause) {
log.info("Close ResourceManager connection for {}.", cause);
validateRunsInMainThread();
if (isConnectedToResourceManager()) {
log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
resourceManagerConnection.close();
resourceManagerConnection = null;
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
}
}
......@@ -790,7 +808,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
"and returning them to the ResourceManager.", throwable);
// We encountered an exception. Free the slots and return them to the RM.
for (SlotOffer reservedSlot : reservedSlots) {
for (SlotOffer reservedSlot: reservedSlots) {
freeSlot(reservedSlot.getAllocationId(), throwable);
}
}
......@@ -841,6 +859,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
private void closeJobManagerConnection(JobID jobId, Exception cause) {
validateRunsInMainThread();
log.info("Close JobManager connection for job {}.", jobId);
// 1. fail tasks running under this JobID
......@@ -1183,21 +1203,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
final ResourceID resourceManagerId = success.getResourceManagerId();
// monitor the resource manager as heartbeat target
resourceManagerHeartbeatManager.monitorTarget(resourceManagerId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
if (isConnectedToResourceManager()) {
ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
resourceManagerGateway.heartbeatFromTaskManager(resourceID);
runAsync(
new Runnable() {
@Override
public void run() {
establishResourceManagerConnection(resourceManagerId);
}
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
// request heartbeat will never be called on the task manager side
}
});
);
}
@Override
......@@ -1277,14 +1290,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
runAsync(new Runnable() {
@Override
public void run() {
log.info("Job manager with id {} heartbeat timed out.", resourceID);
log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
if (jobManagerConnections.containsKey(resourceID)) {
JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
if (jobManagerConnection != null) {
closeJobManagerConnection(
jobManagerConnection.getJobID(),
new TimeoutException("Job manager with id " + resourceID + " heartbeat timed out."));
new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
}
}
}
......@@ -1305,16 +1319,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
runAsync(new Runnable() {
@Override
public void run() {
log.info("Resource manager with id {} heartbeat timed out.", resourceID);
log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
if (isConnectedToResourceManager() && resourceManagerConnection.getResourceManagerId().equals(resourceID)) {
closeResourceManagerConnection(
new TimeoutException("Resource manager with id " + resourceID + " heartbeat timed out."));
}
closeResourceManagerConnection(
new TimeoutException(
"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
}
});
}
......
......@@ -53,6 +53,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -76,7 +77,7 @@ import static org.mockito.Mockito.verify;
/**
* General tests for the resource manager component.
*/
public class ResourceManagerTest {
public class ResourceManagerTest extends TestLogger {
private static ActorSystem system;
......@@ -393,8 +394,7 @@ public class ResourceManagerTest {
try {
final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
resourceManagerResourceID,
rpcService,
rpcService, resourceManagerResourceID,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
......
......@@ -26,7 +26,8 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.*;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
......@@ -49,8 +50,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(PowerMockRunner.class)
@PrepareForTest(BlobLibraryCacheManager.class)
......@@ -139,4 +143,6 @@ public class JobMasterTest extends TestLogger {
rpc.stopService();
}
}
}
......@@ -68,8 +68,7 @@ public class ResourceManagerHATest {
final ResourceManager resourceManager =
new StandaloneResourceManager(
rmResourceId,
rpcService,
rpcService, rmResourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
......
......@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -45,7 +46,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
public class ResourceManagerJobMasterTest {
public class ResourceManagerJobMasterTest extends TestLogger {
private TestingSerialRpcService rpcService;
......@@ -216,8 +217,7 @@ public class ResourceManagerJobMasterTest {
Time.minutes(5L));
ResourceManager resourceManager = new StandaloneResourceManager(
rmResourceId,
rpcService,
rpcService, rmResourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
......
......@@ -32,6 +32,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -43,7 +44,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class ResourceManagerTaskExecutorTest {
public class ResourceManagerTaskExecutorTest extends TestLogger {
private TestingSerialRpcService rpcService;
......@@ -148,7 +149,7 @@ public class ResourceManagerTaskExecutorTest {
private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L);
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
......@@ -163,8 +164,8 @@ public class ResourceManagerTaskExecutorTest {
StandaloneResourceManager resourceManager =
new StandaloneResourceManager(
resourceManagerResourceID,
rpcService,
resourceManagerResourceID,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
......
......@@ -237,8 +237,7 @@ public class SlotProtocolTest extends TestLogger {
ResourceManager<ResourceID> resourceManager =
Mockito.spy(new StandaloneResourceManager(
rmResourceId,
testRpcService,
testRpcService, rmResourceId,
resourceManagerConfiguration,
testingHaServices,
heartbeatServices,
......@@ -325,8 +324,7 @@ public class SlotProtocolTest extends TestLogger {
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
resourceId,
rpcService,
rpcService, resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
......
......@@ -120,8 +120,7 @@ public class TaskExecutorITCase {
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
rmResourceId,
rpcService,
rpcService, rmResourceId,
resourceManagerConfiguration,
testingHAServices,
heartbeatServices,
......
......@@ -224,13 +224,19 @@ public class TaskExecutorTest extends TestLogger {
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
when(rmGateway.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L)));
.thenReturn(
FlinkCompletableFuture.<RegistrationResponse>completed(
new TaskExecutorRegistrationSuccess(
new InstanceID(),
rmResourceId,
10L)));
final TestingSerialRpcService rpc = new TestingSerialRpcService();
rpc.registerGateway(rmAddress, rmGateway);
final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
null,
null);
final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
haServices.setResourceManagerLeaderRetriever(testLeaderService);
......@@ -292,11 +298,11 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(rmAddress, rmLeaderId);
// register resource manager success will trigger monitoring heartbeat target between tm and rm
verify(rmGateway).registerTaskExecutor(
verify(rmGateway, atLeast(1)).registerTaskExecutor(
eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class));
// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
verify(rmGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
verify(rmGateway, timeout(heartbeatTimeout * 50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
......
......@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
log4j.rootLogger=INFO, console
log4j.rootLogger=OFF, console
# -----------------------------------------------------------------------------
# Console (use 'console')
......
......@@ -199,10 +199,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
commonRpcService.getScheduledExecutor());
return new YarnResourceManager(
ResourceID.generate(),
commonRpcService, ResourceID.generate(),
config,
ENV,
commonRpcService,
resourceManagerConfiguration,
haServices,
heartbeatServices,
......
......@@ -107,10 +107,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
public YarnResourceManager(
RpcService rpcService,
ResourceID resourceId,
Configuration flinkConfig,
Map<String, String> env,
RpcService rpcService,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
......@@ -119,8 +119,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
resourceId,
rpcService,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册