提交 19ffe1d5 编写于 作者: T Technoboy-

update registry and add worker group

上级 cdb3267f
......@@ -22,14 +22,15 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -45,8 +46,6 @@ import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* master server
......@@ -65,11 +64,6 @@ public class MasterServer implements IStoppable {
@Autowired
private ZKMasterClient zkMasterClient = null;
/**
* heartbeat thread pool
*/
private ScheduledExecutorService heartbeatMasterService;
/**
* process service
*/
......@@ -87,6 +81,11 @@ public class MasterServer implements IStoppable {
@Autowired
private MasterConfig masterConfig;
/**
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* spring application context
......@@ -95,8 +94,15 @@ public class MasterServer implements IStoppable {
@Autowired
private SpringApplicationContext springApplicationContext;
/**
* netty remote server
*/
private NettyRemotingServer nettyRemotingServer;
/**
* master registry
*/
private MasterRegistry masterRegistry;
/**
* master server startup
......@@ -115,7 +121,6 @@ public class MasterServer implements IStoppable {
@PostConstruct
public void run(){
//
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(45678);
......@@ -124,23 +129,17 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
this.nettyRemotingServer.start();
//
this.masterRegistry = new MasterRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), masterConfig.getMasterHeartbeatInterval());
this.masterRegistry.registry();
//
zkMasterClient.init();
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM);
// heartbeat thread implement
Runnable heartBeatThread = heartBeatThread();
zkMasterClient.setStoppable(this);
// regular heartbeat
// delay 5 seconds, send heartbeat every 30 seconds
heartbeatMasterService.
scheduleAtFixedRate(heartBeatThread, 5, masterConfig.getMasterHeartbeatInterval(), TimeUnit.SECONDS);
// master scheduler thread
MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread(
zkMasterClient,
......@@ -206,13 +205,8 @@ public class MasterServer implements IStoppable {
}catch (Exception e){
logger.warn("thread sleep exception ", e);
}
try {
heartbeatMasterService.shutdownNow();
}catch (Exception e){
logger.warn("heartbeat service stopped exception");
}
logger.info("heartbeat service stopped");
this.nettyRemotingServer.close();
this.masterRegistry.unRegistry();
//close quartz
try{
......@@ -247,35 +241,10 @@ public class MasterServer implements IStoppable {
logger.info("zookeeper service stopped");
} catch (Exception e) {
logger.error("master server stop exception ", e);
System.exit(-1);
}
}
/**
* heartbeat thread implement
* @return
*/
private Runnable heartBeatThread(){
logger.info("start master heart beat thread...");
Runnable heartBeatThread = new Runnable() {
@Override
public void run() {
if(Stopper.isRunning()) {
// send heartbeat to zk
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
return;
}
zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
}
}
};
return heartBeatThread;
}
}
......@@ -19,11 +19,21 @@ package org.apache.dolphinscheduler.server.master.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
/**
* master registry
*/
......@@ -41,14 +51,32 @@ public class MasterRegistry {
*/
private final int port;
/**
* heartbeat interval
*/
private final long heartBeatInterval;
/**
* heartbeat executor
*/
private final ScheduledExecutorService heartBeatExecutor;
/**
* worker start time
*/
private final String startTime;
/**
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
*/
public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
this.heartBeatInterval = heartBeatInterval;
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
/**
......@@ -56,8 +84,8 @@ public class MasterRegistry {
*/
public void registry() {
String address = Constants.LOCAL_ADDRESS;
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
......@@ -65,13 +93,14 @@ public class MasterRegistry {
logger.error("master : {} connection lost from zookeeper", address);
} else if(newState == ConnectionState.RECONNECTED){
logger.info("master : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
} else if(newState == ConnectionState.SUSPENDED){
logger.warn("master : {} connection SUSPENDED ", address);
}
}
});
logger.info("master node : {} registry to ZK successfully.", address);
this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval);
}
/**
......@@ -79,18 +108,18 @@ public class MasterRegistry {
*/
public void unRegistry() {
String address = getLocalAddress();
String localNodePath = getWorkerPath();
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
logger.info("worker node : {} unRegistry to ZK.", address);
logger.info("master node : {} unRegistry to ZK.", address);
}
/**
* get worker path
* get master path
* @return
*/
private String getWorkerPath() {
private String getMasterPath() {
String address = getLocalAddress();
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
String localNodePath = this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
return localNodePath;
}
......@@ -101,4 +130,26 @@ public class MasterRegistry {
private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port;
}
/**
* hear beat task
*/
class HeartBeatTask implements Runnable{
@Override
public void run() {
try {
StringBuilder builder = new StringBuilder(100);
builder.append(OSUtils.cpuUsage()).append(COMMA);
builder.append(OSUtils.memoryUsage()).append(COMMA);
builder.append(OSUtils.loadAverage()).append(COMMA);
builder.append(startTime).append(COMMA);
builder.append(DateUtils.dateToString(new Date()));
String masterPath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().update(masterPath, builder.toString());
} catch (Throwable ex){
logger.error("error write master heartbeat info", ex);
}
}
}
}
......@@ -150,7 +150,6 @@ public class WorkerServer implements IStoppable {
*/
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
System.setProperty("spring.profiles.active","worker");
new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
}
......@@ -169,7 +168,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.start();
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval());
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());
this.workerRegistry.registry();
this.zkWorkerClient.init();
......@@ -188,22 +187,12 @@ public class WorkerServer implements IStoppable {
// submit kill process thread
killExecutorService.execute(killProcessThread);
// new fetch task thread
// FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue);
//
// // submit fetch task thread
// fetchTaskExecutorService.execute(fetchTaskThread);
/**
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
// worker server exit alert
if (zkWorkerClient.getActiveMasterNum() <= 1) {
alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
}
stop("shutdownhook");
}
}));
......
......@@ -34,9 +34,20 @@ public class WorkerConfig {
@Value("${worker.max.cpuload.avg}")
private int workerMaxCpuloadAvg;
@Value("${master.reserved.memory}")
@Value("${worker.reserved.memory}")
private double workerReservedMemory;
@Value("${worker.group: DEFAULT}")
private String workerGroup;
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public int getWorkerExecThreads() {
return workerExecThreads;
}
......
......@@ -21,6 +21,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
......@@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
import static org.apache.dolphinscheduler.remote.utils.Constants.SLASH;
/**
......@@ -42,6 +44,8 @@ public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
private static final String DEFAULT_GROUP = "DEFAULT";
/**
* zookeeper registry center
*/
......@@ -67,15 +71,30 @@ public class WorkerRegistry {
*/
private final String startTime;
/**
* worker group
*/
private final String workerGroup;
/**
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_GROUP);
}
/**
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval, String workerGroup){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
this.heartBeatInterval = heartBeatInterval;
this.workerGroup = workerGroup;
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
......@@ -86,7 +105,7 @@ public class WorkerRegistry {
public void registry() {
String address = Constants.LOCAL_ADDRESS;
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
......@@ -94,7 +113,7 @@ public class WorkerRegistry {
logger.error("worker : {} connection lost from zookeeper", address);
} else if(newState == ConnectionState.RECONNECTED){
logger.info("worker : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
} else if(newState == ConnectionState.SUSPENDED){
logger.warn("worker : {} connection SUSPENDED ", address);
}
......@@ -122,8 +141,14 @@ public class WorkerRegistry {
*/
private String getWorkerPath() {
String address = getLocalAddress();
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
return localNodePath;
StringBuilder builder = new StringBuilder(100);
String workerPath = this.zookeeperRegistryCenter.getWorkerPath();
builder.append(workerPath).append(SLASH);
if(StringUtils.isNotEmpty(workerGroup) && !DEFAULT_GROUP.equalsIgnoreCase(workerGroup)){
builder.append(workerGroup.trim()).append(SLASH);
}
builder.append(address);
return builder.toString();
}
/**
......@@ -149,7 +174,7 @@ public class WorkerRegistry {
builder.append(startTime).append(COMMA);
builder.append(DateUtils.dateToString(new Date()));
String workerPath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(workerPath, builder.toString());
zookeeperRegistryCenter.getZookeeperCachedOperator().update(workerPath, builder.toString());
} catch (Throwable ex){
logger.error("error write worker heartbeat info", ex);
}
......
......@@ -100,9 +100,6 @@ public class ZKMasterClient extends AbstractZKClient {
// init system znode
this.initSystemZNode();
// register master
this.registerMaster();
// check if fault tolerance is required,failure and tolerance
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
......@@ -132,25 +129,6 @@ public class ZKMasterClient extends AbstractZKClient {
return alertDao;
}
/**
* register master znode
*/
public void registerMaster(){
try {
String serverPath = registerServer(ZKNodeType.MASTER);
if(StringUtils.isEmpty(serverPath)){
System.exit(-1);
}
masterZNode = serverPath;
} catch (Exception e) {
logger.error("register master failure ",e);
System.exit(-1);
}
}
/**
* handle path events that this class cares about
* @param client zkClient
......
......@@ -55,24 +55,6 @@ public class ZKWorkerClient extends AbstractZKClient {
// init system znode
this.initSystemZNode();
// register worker
this.registWorker();
}
/**
* register worker
*/
private void registWorker(){
try {
String serverPath = registerServer(ZKNodeType.WORKER);
if(StringUtils.isEmpty(serverPath)){
System.exit(-1);
}
workerZNode = serverPath;
} catch (Exception e) {
logger.error("register worker failure",e);
System.exit(-1);
}
}
/**
......
......@@ -46,40 +46,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
*/
protected IStoppable stoppable = null;
/**
* heartbeat for zookeeper
* @param znode zookeeper node
* @param serverType server type
*/
public void heartBeatForZk(String znode, String serverType){
try {
//check dead or not in zookeeper
if(zkClient.getState() == CuratorFrameworkState.STOPPED || checkIsDeadServer(znode, serverType)){
stoppable.stop("i was judged to death, release resources and stop myself");
return;
}
String resInfoStr = super.get(znode);
String[] splits = resInfoStr.split(Constants.COMMA);
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
return;
}
String str = splits[0] + Constants.COMMA
+ splits[1] + Constants.COMMA
+ OSUtils.cpuUsage() + Constants.COMMA
+ OSUtils.memoryUsage() + Constants.COMMA
+ OSUtils.loadAverage() + Constants.COMMA
+ splits[5] + Constants.COMMA
+ DateUtils.dateToString(new Date());
zkClient.setData().forPath(znode,str.getBytes());
} catch (Exception e) {
logger.error("heartbeat for zk failed", e);
stoppable.stop("heartbeat for zk exception, release resources and stop myself");
}
}
/**
* check dead server or not , if dead, stop self
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册