未验证 提交 2be1d4bf 编写于 作者: W Wenjun Ruan 提交者: GitHub

Fix worker cannot shutdown due to resource close failed or heart beat check failed (#10979)

* Use try-with-resource to close resource, and add heart error threshold to avoid worker cannot close due to heart beat check failed

* Move heartbeat error threshold to applicaiton.yml
上级 dad8981c
......@@ -50,6 +50,7 @@ public final class Constants {
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
public static final String FORMAT_S_S_COLON = "%s:%s";
......
......@@ -21,12 +21,18 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.experimental.UtilityClass;
@UtilityClass
public class ThreadUtils {
private static final Logger logger = LoggerFactory.getLogger(ThreadUtils.class);
/**
* Wrapper over newDaemonFixedThreadExecutor.
*
......@@ -35,10 +41,7 @@ public class ThreadUtils {
* @return ExecutorService
*/
public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadName)
.build();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}
......@@ -48,8 +51,9 @@ public class ThreadUtils {
public static void sleep(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ignore) {
} catch (final InterruptedException interruptedException) {
Thread.currentThread().interrupt();
logger.error("Current thread sleep error", interruptedException);
}
}
}
......@@ -115,32 +115,28 @@ public class MasterServer implements IStoppable {
* @param cause close cause
*/
public void close(String cause) {
try {
// set stop signal is true
// execute only once
if (!Stopper.stop()) {
logger.warn("MasterServer is already stopped, current cause: {}", cause);
return;
}
// set stop signal is true
// execute only once
if (!Stopper.stop()) {
logger.warn("MasterServer is already stopped, current cause: {}", cause);
return;
}
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
try (SchedulerApi closedSchedulerApi = schedulerApi;
MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap;
MasterRPCServer closedRpcServer = masterRPCServer;
MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
SpringApplicationContext closedSpringContext = springApplicationContext) {
logger.info("Master server is stopping, current cause : {}", cause);
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.schedulerApi.close();
this.masterSchedulerBootstrap.close();
this.masterRPCServer.close();
this.masterRegistryClient.closeRegistry();
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
logger.info("MasterServer stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("MasterServer stop failed, current cause: {}", cause, e);
return;
}
logger.info("MasterServer stopped, current cause: {}", cause);
}
@Override
......
......@@ -67,6 +67,10 @@ public class MasterConfig implements Validator {
* Master heart beat task execute interval.
*/
private Duration heartbeatInterval = Duration.ofSeconds(10);
/**
* Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
*/
private int heartbeatErrorThreshold = 5;
/**
* task submit max retry times.
*/
......@@ -129,6 +133,9 @@ public class MasterConfig implements Validator {
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
if (masterConfig.getHeartbeatErrorThreshold() <= 0) {
errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
}
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
}
......@@ -53,7 +53,7 @@ import com.google.common.collect.Sets;
* <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
*/
@Component
public class MasterRegistryClient {
public class MasterRegistryClient implements AutoCloseable {
/**
* logger
......@@ -107,7 +107,8 @@ public class MasterRegistryClient {
registryClient.setStoppable(stoppable);
}
public void closeRegistry() {
@Override
public void close() {
// TODO unsubscribe MasterRegistryDataListener
deregister();
}
......@@ -189,11 +190,12 @@ public class MasterRegistryClient {
String localNodePath = getCurrentNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
Constants.MASTER_TYPE,
registryClient);
masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
Constants.MASTER_TYPE,
registryClient,
masterConfig.getHeartbeatErrorThreshold());
// remove before persist
registryClient.remove(localNodePath);
......
......@@ -58,7 +58,7 @@ import org.springframework.stereotype.Service;
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@Service
public class MasterSchedulerBootstrap extends BaseDaemonThread {
public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
......@@ -116,6 +116,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread {
logger.info("Master schedule bootstrap started...");
}
@Override
public void close() {
logger.info("Master schedule bootstrap stopping...");
logger.info("Master schedule bootstrap stopped...");
......
......@@ -98,6 +98,8 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
# Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval
......
......@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,16 +39,22 @@ public class HeartBeatTask implements Runnable {
private final String serverType;
private final HeartBeat heartBeat;
private final int heartBeatErrorThreshold;
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
RegistryClient registryClient) {
RegistryClient registryClient,
int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public HeartBeatTask(long startupTime,
......@@ -58,13 +65,14 @@ public class HeartBeatTask implements Runnable {
String serverType,
RegistryClient registryClient,
int workerThreadCount,
int workerWaitingTaskCount
) {
int workerWaitingTaskCount,
int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public String getHeartBeatInfo() {
......@@ -88,8 +96,13 @@ public class HeartBeatTask implements Runnable {
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
logger.error("HeartBeat task execute failed", ex);
if (heartBeatErrorTimes.incrementAndGet() >= heartBeatErrorThreshold) {
registryClient.getStoppable()
.stop("HeartBeat task connect to zk failed too much times: " + heartBeatErrorTimes);
}
}
}
}
......@@ -24,7 +24,7 @@ import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;
@Component
public class SpringApplicationContext implements ApplicationContextAware {
public class SpringApplicationContext implements ApplicationContextAware, AutoCloseable {
private static ApplicationContext applicationContext;
......@@ -36,6 +36,7 @@ public class SpringApplicationContext implements ApplicationContextAware {
/**
* Close this application context, destroying all beans in its bean factory.
*/
@Override
public void close() {
((AbstractApplicationContext)applicationContext).close();
}
......
......@@ -19,15 +19,41 @@ package org.apache.dolphinscheduler;
import org.apache.curator.test.TestingServer;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationFailedEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import lombok.NonNull;
@SpringBootApplication
public class StandaloneServer {
public class StandaloneServer implements ApplicationListener<ApplicationEvent> {
private static final Logger logger = LoggerFactory.getLogger(StandaloneServer.class);
private static TestingServer zookeeperServer;
public static void main(String[] args) throws Exception {
final TestingServer server = new TestingServer(true);
System.setProperty("registry.zookeeper.connect-string", server.getConnectString());
zookeeperServer = new TestingServer(true);
System.setProperty("registry.zookeeper.connect-string", zookeeperServer.getConnectString());
SpringApplication.run(StandaloneServer.class, args);
}
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
if (event instanceof ApplicationFailedEvent || event instanceof ContextClosedEvent) {
try (TestingServer closedServer = zookeeperServer) {
// close the zookeeper server
logger.info("Receive spring context close event: {}, will closed zookeeper server", event);
} catch (IOException e) {
logger.error("Close zookeeper server error", e);
}
}
}
}
......@@ -133,6 +133,8 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
# Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval
......@@ -154,6 +156,8 @@ worker:
exec-threads: 10
# worker heartbeat interval
heartbeat-interval: 10s
# Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.
......
......@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
......@@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Collection;
import java.util.Set;
import javax.annotation.PostConstruct;
......@@ -121,8 +120,7 @@ public class WorkerServer implements IStoppable {
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
this.workerRegistryClient.handleDeadServer();
this.workerManagerThread.start();
......@@ -139,37 +137,24 @@ public class WorkerServer implements IStoppable {
}
public void close(String cause) {
try {
// execute only once
// set stop signal is true
if (!Stopper.stop()) {
logger.warn("WorkerServer is already stopped, current cause: {}", cause);
return;
}
if (!Stopper.stop()) {
logger.warn("WorkerServer is already stopped, current cause: {}", cause);
return;
}
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
WorkerRegistryClient closedRegistryClient = workerRegistryClient;
AlertClientService closedAlertClientService = alertClientService;
SpringApplicationContext closedSpringContext = springApplicationContext;) {
logger.info("Worker server is stopping, current cause : {}", cause);
try {
// thread sleep 3 seconds for thread quitely stop
Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
} catch (Exception e) {
logger.warn("Worker server close wait error", e);
}
// close
this.workerRpcServer.close();
this.workerRegistryClient.unRegistry();
this.alertClientService.close();
// kill running tasks
this.killAllRunningTasks();
// close the application context
this.springApplicationContext.close();
logger.info("Worker server stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("Worker server stop failed, current cause: {}", cause, e);
return;
}
logger.info("Worker server stopped, current cause: {}", cause);
}
@Override
......
......@@ -40,6 +40,10 @@ public class WorkerConfig implements Validator {
private int listenPort = 1234;
private int execThreads = 10;
private Duration heartbeatInterval = Duration.ofSeconds(10);
/**
* Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
*/
private int heartbeatErrorThreshold = 5;
private int hostWeight = 100;
private boolean tenantAutoCreate = true;
private boolean tenantDistributedUser = false;
......@@ -70,6 +74,9 @@ public class WorkerConfig implements Validator {
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
if (workerConfig.getHeartbeatErrorThreshold() <= 0) {
errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
}
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
}
}
......@@ -54,7 +54,7 @@ import com.google.common.collect.Sets;
* worker registry
*/
@Service
public class WorkerRegistryClient {
public class WorkerRegistryClient implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class);
......@@ -101,15 +101,15 @@ public class WorkerRegistryClient {
long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
workerConfig.getMaxCpuLoadAvg(),
workerConfig.getReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
Constants.WORKER_TYPE,
registryClient,
workerConfig.getExecThreads(),
workerManagerThread.getThreadPoolQueueSize()
);
workerConfig.getMaxCpuLoadAvg(),
workerConfig.getReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
Constants.WORKER_TYPE,
registryClient,
workerConfig.getExecThreads(),
workerManagerThread.getThreadPoolQueueSize(),
workerConfig.getHeartbeatErrorThreshold());
for (String workerZKPath : workerZkPaths) {
// remove before persist
......@@ -147,8 +147,10 @@ public class WorkerRegistryClient {
logger.error("remove worker zk path exception", ex);
}
this.heartBeatExecutor.shutdownNow();
logger.info("heartbeat executor shutdown");
if (heartBeatExecutor != null) {
heartBeatExecutor.shutdownNow();
logger.info("Heartbeat executor shutdown");
}
registryClient.close();
logger.info("registry client closed");
......@@ -175,8 +177,9 @@ public class WorkerRegistryClient {
return workerPaths;
}
public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) {
registryClient.handleDeadServer(nodeSet, nodeType, opType);
public void handleDeadServer() {
Set<String> workerZkPaths = getWorkerZkPaths();
registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
}
/**
......@@ -190,4 +193,9 @@ public class WorkerRegistryClient {
registryClient.setStoppable(stoppable);
}
@Override
public void close() throws IOException {
unRegistry();
}
}
......@@ -60,6 +60,8 @@ worker:
exec-threads: 100
# worker heartbeat interval
heartbeat-interval: 10s
# Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册