未验证 提交 21fa38e3 编写于 作者: B bao liang 提交者: GitHub

refactor zkMasterClient/zkWorkerClient (#664)

* refactor zkMasterClient/zkWorkerClient

* fix bug: close zk links .

* fix bug: close zk links .

* delete unused code

* update zkclient

* refactor zk client.

* update readme

* update readme

* update readme
上级 047a65b5
...@@ -33,27 +33,13 @@ Its main objectives are as follows: ...@@ -33,27 +33,13 @@ Its main objectives are as follows:
- There are more waiting partners to explore - There are more waiting partners to explore
### Comparison with similar scheduler systems ### What's in Easy Scheduler
Stability | Easy to use | Features | Scalability |
  | EasyScheduler | Azkaban | Airflow -- | -- | -- | --
-- | -- | -- | -- Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables and so on at a glance.  |  Support pause, recover operation | support custom task types
**Stability** |   |   |   HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the api mode operation is provided. | Users on easyscheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. " Supports traditional shell tasks, while supporting large data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline.
Single point of failure | Decentralized multi-master and multi-worker | Yes <br/> Single Web and Scheduler Combination Node | Yes <br/> Single Scheduler Overload processing: Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | One-click deployment | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | |
Additional HA requirements | Not required (HA is supported by itself) | DB | Celery / Dask / Mesos + Load Balancer + DB
Overload processing | Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | Jammed the server when there are too many tasks | Jammed the server when there are too many tasks
**Easy to use** |   |   |  
DAG Monitoring Interface | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables and so on at a glance. | Only task status can be seen | Can't visually distinguish task types
Visual process definition | Yes <br/> All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the api mode operation is provided. | No <br/> DAG and custom upload via custom DSL | No <br/> DAG is drawn through Python code, which is inconvenient to use, especially for business people who can't write code.
Quick deployment | One-click deployment | Complex clustering deployment | Complex clustering deployment
**Features** |   |   |  
Suspend and resume | Support pause, recover operation | No <br/> Can only kill the workflow first and then re-run | No <br/> Can only kill the workflow first and then re-run
Whether to support multiple tenants | Users on easyscheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. " Supports traditional shell tasks, while supporting large data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | No | No
Task type | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | shell、gobblin、hadoopJava、java、hive、pig、spark、hdfsToTeradata、teradataToHdfs | BashOperator、DummyOperator、MySqlOperator、HiveOperator、EmailOperator、HTTPOperator、SqlOperator
Compatibility | Support the scheduling of big data jobs like spark, hive, Mr. At the same time, it is more compatible with big data business because it supports multiple tenants. | Because it does not support multi-tenant, it is not flexible enough to use business in big data platform. | Because it does not support multi-tenant, it is not flexible enough to use business in big data platform.
**Scalability** |   |   |  
Whether to support custom task types | Yes | Yes | Yes
Is Cluster Extension Supported? | Yes <br/> The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline. | Yes <br/> but complicated Executor horizontal extend | Yes <br/> but complicated Executor horizontal extend
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
* [开发环境搭建](后端开发文档.md#项目编译) * [开发环境搭建](后端开发文档.md#项目编译)
* [自定义任务插件文档](任务插件开发.md#任务插件开发) * [自定义任务插件文档](任务插件开发.md#任务插件开发)
* [接口文档](http://52.82.13.76:8888/easyscheduler/doc.html?language=zh_CN&lang=cn) * [接口文档](http://52.82.13.76:8888/escheduler/doc.html?language=zh_CN&lang=cn)
* FAQ * FAQ
* [FAQ](EasyScheduler-FAQ.md) * [FAQ](EasyScheduler-FAQ.md)
* 系统版本升级文档 * 系统版本升级文档
......
...@@ -23,8 +23,6 @@ import cn.escheduler.api.utils.Constants; ...@@ -23,8 +23,6 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result; import cn.escheduler.api.utils.Result;
import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.User;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
...@@ -19,12 +19,12 @@ package cn.escheduler.api.service; ...@@ -19,12 +19,12 @@ package cn.escheduler.api.service;
import cn.escheduler.api.enums.Status; import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.ZookeeperMonitor; import cn.escheduler.api.utils.ZookeeperMonitor;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.dao.MonitorDBDao; import cn.escheduler.dao.MonitorDBDao;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.MonitorRecord; import cn.escheduler.dao.model.MonitorRecord;
import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.ZookeeperRecord; import cn.escheduler.dao.model.ZookeeperRecord;
import org.apache.hadoop.mapred.Master;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -65,7 +65,7 @@ public class MonitorService extends BaseService{ ...@@ -65,7 +65,7 @@ public class MonitorService extends BaseService{
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
List<MasterServer> masterServers = new ZookeeperMonitor().getMasterServers(); List<MasterServer> masterServers = getServerListFromZK(true);
result.put(Constants.DATA_LIST, masterServers); result.put(Constants.DATA_LIST, masterServers);
putMsg(result,Status.SUCCESS); putMsg(result,Status.SUCCESS);
...@@ -99,11 +99,29 @@ public class MonitorService extends BaseService{ ...@@ -99,11 +99,29 @@ public class MonitorService extends BaseService{
public Map<String,Object> queryWorker(User loginUser) { public Map<String,Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
List<MasterServer> workerServers = getServerListFromZK(false);
List<MasterServer> workerServers = new ZookeeperMonitor().getWorkerServers();
result.put(Constants.DATA_LIST, workerServers); result.put(Constants.DATA_LIST, workerServers);
putMsg(result,Status.SUCCESS); putMsg(result,Status.SUCCESS);
return result; return result;
} }
private List<MasterServer> getServerListFromZK(boolean isMaster){
List<MasterServer> servers = new ArrayList<>();
ZookeeperMonitor zookeeperMonitor = null;
try{
zookeeperMonitor = new ZookeeperMonitor();
ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
servers = zookeeperMonitor.getServersList(zkNodeType);
}catch (Exception e){
throw e;
}finally {
if(zookeeperMonitor != null){
zookeeperMonitor.close();
}
}
return servers;
}
} }
...@@ -25,6 +25,7 @@ import cn.escheduler.common.enums.FailureStrategy; ...@@ -25,6 +25,7 @@ import cn.escheduler.common.enums.FailureStrategy;
import cn.escheduler.common.enums.Priority; import cn.escheduler.common.enums.Priority;
import cn.escheduler.common.enums.ReleaseState; import cn.escheduler.common.enums.ReleaseState;
import cn.escheduler.common.enums.WarningType; import cn.escheduler.common.enums.WarningType;
import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
......
...@@ -20,7 +20,7 @@ import cn.escheduler.api.enums.Status; ...@@ -20,7 +20,7 @@ import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Constants;
import cn.escheduler.dao.mapper.MasterServerMapper; import cn.escheduler.dao.mapper.MasterServerMapper;
import cn.escheduler.dao.mapper.WorkerServerMapper; import cn.escheduler.dao.mapper.WorkerServerMapper;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.WorkerServer; import cn.escheduler.dao.model.WorkerServer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
......
package cn.escheduler.api.utils; package cn.escheduler.api.utils;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.ZookeeperRecord; import cn.escheduler.dao.model.ZookeeperRecord;
import cn.escheduler.server.ResInfo;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory; ...@@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
...@@ -36,29 +35,12 @@ public class ZookeeperMonitor extends AbstractZKClient{ ...@@ -36,29 +35,12 @@ public class ZookeeperMonitor extends AbstractZKClient{
return null; return null;
} }
/**
* get server list.
* @param isMaster
* @return
*/
public List<MasterServer> getServers(boolean isMaster){
List<MasterServer> masterServers = new ArrayList<>();
Map<String, String> masterMap = getServerList(isMaster);
String parentPath = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath();
for(String path : masterMap.keySet()){
MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path));
masterServer.setZkDirectory( parentPath + "/"+ path);
masterServers.add(masterServer);
}
return masterServers;
}
/** /**
* get master servers * get master servers
* @return * @return
*/ */
public List<MasterServer> getMasterServers(){ public List<MasterServer> getMasterServers(){
return getServers(true); return getServersList(ZKNodeType.MASTER);
} }
/** /**
...@@ -66,7 +48,7 @@ public class ZookeeperMonitor extends AbstractZKClient{ ...@@ -66,7 +48,7 @@ public class ZookeeperMonitor extends AbstractZKClient{
* @return * @return
*/ */
public List<MasterServer> getWorkerServers(){ public List<MasterServer> getWorkerServers(){
return getServers(false); return getServersList(ZKNodeType.WORKER);
} }
private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) { private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) {
......
package cn.escheduler.api.utils; package cn.escheduler.api.utils;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package cn.escheduler.dao.model; package cn.escheduler.common.model;
import java.util.Date; import java.util.Date;
......
...@@ -14,13 +14,10 @@ ...@@ -14,13 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package cn.escheduler.server; package cn.escheduler.common.utils;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.model.MasterServer;
import java.util.Date; import java.util.Date;
......
...@@ -18,12 +18,16 @@ package cn.escheduler.common.zk; ...@@ -18,12 +18,16 @@ package cn.escheduler.common.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.IStoppable; import cn.escheduler.common.IStoppable;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.enums.ServerEnum; import cn.escheduler.common.enums.ServerEnum;
import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.utils.ResInfo;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
...@@ -32,7 +36,6 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex; ...@@ -32,7 +36,6 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -208,48 +211,11 @@ public abstract class AbstractZKClient { ...@@ -208,48 +211,11 @@ public abstract class AbstractZKClient {
return false; return false;
} }
/**
* init system znode
*/
protected void initSystemZNode(){
try {
// read master node parent path from conf
masterZNodeParentPath = getMasterZNodeParentPath();
// read worker node parent path from conf
workerZNodeParentPath = getWorkerZNodeParentPath();
// read server node parent path from conf
deadServerZNodeParentPath = conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS);
if(zkClient.checkExists().forPath(deadServerZNodeParentPath) == null){
// create persistent dead server parent node
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(deadServerZNodeParentPath);
}
if(zkClient.checkExists().forPath(masterZNodeParentPath) == null){
// create persistent master parent node
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(masterZNodeParentPath);
}
if(zkClient.checkExists().forPath(workerZNodeParentPath) == null){
// create persistent worker parent node
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(workerZNodeParentPath);
}
} catch (Exception e) {
logger.error("init system znode failed : " + e.getMessage(),e);
}
}
public void removeDeadServerByHost(String host, String serverType) throws Exception { public void removeDeadServerByHost(String host, String serverType) throws Exception {
List<String> deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath); List<String> deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath);
for(String serverPath : deadServers){ for(String serverPath : deadServers){
if(serverPath.startsWith(serverType+UNDERLINE+host)){ if(serverPath.startsWith(serverType+UNDERLINE+host)){
String server = deadServerZNodeParentPath + SINGLE_SLASH + serverPath; String server = deadServerZNodeParentPath + SINGLE_SLASH + serverPath;
zkClient.delete().forPath(server); zkClient.delete().forPath(server);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host); logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
...@@ -257,28 +223,68 @@ public abstract class AbstractZKClient { ...@@ -257,28 +223,68 @@ public abstract class AbstractZKClient {
} }
} }
/**
* create zookeeper path according the zk node type.
* @param zkNodeType
* @return
* @throws Exception
*/
private String createZNodePath(ZKNodeType zkNodeType) throws Exception {
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
// create temporary sequence nodes for master znode
String parentPath = getZNodeParentPath(zkNodeType);
String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
serverPathPrefix + "_", heartbeatZKInfo.getBytes());
logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
return registerPath;
}
/**
* register server, if server already exists, return null.
* @param zkNodeType
* @return register server path in zookeeper
*/
public String registerServer(ZKNodeType zkNodeType) throws Exception {
String registerPath = null;
String host = OSUtils.getHost();
if(checkZKNodeExists(host, zkNodeType)){
logger.error("register failure , {} server already started on host : {}" ,
zkNodeType.toString(), host);
return registerPath;
}
registerPath = createZNodePath(ZKNodeType.MASTER);
// handle dead server
handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
return registerPath;
}
/** /**
* opType(add): if find dead server , then add to zk deadServerPath * opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk * opType(delete): delete path from zk
* *
* @param zNode node path * @param zNode node path
* @param serverType master or worker prefix * @param zkNodeType master or worker
* @param opType delete or add * @param opType delete or add
* @throws Exception * @throws Exception
*/ */
public void handleDeadServer(String zNode, String serverType, String opType) throws Exception { public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
//ip_sequenceno //ip_sequenceno
String[] zNodesPath = zNode.split("\\/"); String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1]; String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete //check server restart, if restart , dead server path in zk should be delete
if(opType.equals(DELETE_ZK_OP)){ if(opType.equals(DELETE_ZK_OP)){
String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE); String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE);
String ip = ipAndSeqNo[0]; String ip = ipAndSeqNo[0];
removeDeadServerByHost(ip, serverType); removeDeadServerByHost(ip, type);
}else if(opType.equals(ADD_ZK_OP)){ }else if(opType.equals(ADD_ZK_OP)){
String deadServerPath = deadServerZNodeParentPath + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; String deadServerPath = deadServerZNodeParentPath + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
...@@ -287,7 +293,8 @@ public abstract class AbstractZKClient { ...@@ -287,7 +293,8 @@ public abstract class AbstractZKClient {
zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes()); zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes());
logger.info("{} server dead , and {} added to zk dead server path success" , serverType, zNode); logger.info("{} server dead , and {} added to zk dead server path success" ,
zkNodeType.toString(), zNode);
} }
} }
...@@ -343,16 +350,34 @@ public abstract class AbstractZKClient { ...@@ -343,16 +350,34 @@ public abstract class AbstractZKClient {
return sb.toString(); return sb.toString();
} }
/**
* get server list.
* @param zkNodeType
* @return
*/
public List<MasterServer> getServersList(ZKNodeType zkNodeType){
Map<String, String> masterMap = getServerMaps(zkNodeType);
String parentPath = getZNodeParentPath(zkNodeType);
List<MasterServer> masterServers = new ArrayList<>();
for(String path : masterMap.keySet()){
MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path));
masterServer.setZkDirectory( parentPath + "/"+ path);
masterServers.add(masterServer);
}
return masterServers;
}
/** /**
* get master server list map. * get master server list map.
* result : {host : resource info} * result : {host : resource info}
* @return * @return
*/ */
public Map<String, String> getServerList(boolean isMaster ){ public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
Map<String, String> masterMap = new HashMap<>(); Map<String, String> masterMap = new HashMap<>();
try { try {
String path = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath(); String path = getZNodeParentPath(zkNodeType);
List<String> serverList = getZkClient().getChildren().forPath(path); List<String> serverList = getZkClient().getChildren().forPath(path);
for(String server : serverList){ for(String server : serverList){
byte[] bytes = getZkClient().getData().forPath(path + "/" + server); byte[] bytes = getZkClient().getData().forPath(path + "/" + server);
...@@ -365,6 +390,29 @@ public abstract class AbstractZKClient { ...@@ -365,6 +390,29 @@ public abstract class AbstractZKClient {
return masterMap; return masterMap;
} }
/**
* check the zookeeper node already exists
* @param host
* @param zkNodeType
* @return
* @throws Exception
*/
public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
String path = getZNodeParentPath(zkNodeType);
if(StringUtils.isEmpty(path)){
logger.error("check zk node exists error, host:{}, zk node type:{}",
host, zkNodeType.toString());
return false;
}
Map<String, String> serverMaps = getServerMaps(zkNodeType);
for(String hostKey : serverMaps.keySet()){
if(hostKey.startsWith(host)){
return true;
}
}
return false;
}
/** /**
* get zkclient * get zkclient
* @return * @return
...@@ -393,6 +441,34 @@ public abstract class AbstractZKClient { ...@@ -393,6 +441,34 @@ public abstract class AbstractZKClient {
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS); return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS);
} }
/**
* get zookeeper node parent path
* @param zkNodeType
* @return
*/
public String getZNodeParentPath(ZKNodeType zkNodeType) {
String path = "";
switch (zkNodeType){
case MASTER:
return getMasterZNodeParentPath();
case WORKER:
return getWorkerZNodeParentPath();
case DEAD_SERVER:
return getDeadZNodeParentPath();
default:
break;
}
return path;
}
/**
* get dead server node parent path
* @return
*/
protected String getDeadZNodeParentPath(){
return conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS);
}
/** /**
* get master start up lock path * get master start up lock path
* @return * @return
...@@ -417,6 +493,82 @@ public abstract class AbstractZKClient { ...@@ -417,6 +493,82 @@ public abstract class AbstractZKClient {
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS); return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS);
} }
/**
* release mutex
* @param mutex
*/
public static void releaseMutex(InterProcessMutex mutex) {
if (mutex != null){
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
logger.warn("lock release");
}else{
logger.error("lock release failed : " + e.getMessage(),e);
}
}
}
}
/**
* init system znode
*/
protected void initSystemZNode(){
try {
createNodePath(getMasterZNodeParentPath());
createNodePath(getWorkerZNodeParentPath());
createNodePath(getDeadZNodeParentPath());
} catch (Exception e) {
logger.error("init system znode failed : " + e.getMessage(),e);
}
}
/**
* create zookeeper node path if not exists
* @param zNodeParentPath
* @throws Exception
*/
private void createNodePath(String zNodeParentPath) throws Exception {
if(null == zkClient.checkExists().forPath(zNodeParentPath)){
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
}
}
/**
* server self dead, stop all threads
* @param serverHost
* @param zkNodeType
*/
protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
if (serverHost.equals(OSUtils.getHost())) {
logger.error("{} server({}) of myself dead , stopping...",
zkNodeType.toString(), serverHost);
stoppable.stop(String.format(" {} server {} of myself dead , stopping...",
zkNodeType.toString(), serverHost));
return true;
}
return false;
}
/**
* get host ip, string format: masterParentPath/ip_000001/value
* @param path
* @return
*/
protected String getHostByEventDataPath(String path) {
int startIndex = path.lastIndexOf("/")+1;
int endIndex = path.lastIndexOf("_");
if(startIndex >= endIndex){
logger.error("parse ip error");
return "";
}
return path.substring(startIndex, endIndex);
}
/** /**
* acquire zk lock * acquire zk lock
* @param zkClient * @param zkClient
......
...@@ -18,7 +18,7 @@ package cn.escheduler.dao; ...@@ -18,7 +18,7 @@ package cn.escheduler.dao;
import cn.escheduler.dao.mapper.MasterServerMapper; import cn.escheduler.dao.mapper.MasterServerMapper;
import cn.escheduler.dao.mapper.WorkerServerMapper; import cn.escheduler.dao.mapper.WorkerServerMapper;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.WorkerServer; import cn.escheduler.dao.model.WorkerServer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
*/ */
package cn.escheduler.dao.mapper; package cn.escheduler.dao.mapper;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import org.apache.ibatis.annotations.*; import org.apache.ibatis.annotations.*;
import org.apache.ibatis.type.JdbcType; import org.apache.ibatis.type.JdbcType;
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package cn.escheduler.dao.mapper; package cn.escheduler.dao.mapper;
import cn.escheduler.dao.datasource.ConnectionFactory; import cn.escheduler.dao.datasource.ConnectionFactory;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
......
...@@ -119,8 +119,6 @@ public class MasterServer implements CommandLineRunner, IStoppable { ...@@ -119,8 +119,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
public MasterServer(ProcessDao processDao){ public MasterServer(ProcessDao processDao){
zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); zkMasterClient = ZKMasterClient.getZKMasterClient(processDao);
this.serverDao = zkMasterClient.getServerDao();
this.alertDao = zkMasterClient.getAlertDao();
} }
public void run(ProcessDao processDao){ public void run(ProcessDao processDao){
...@@ -128,6 +126,11 @@ public class MasterServer implements CommandLineRunner, IStoppable { ...@@ -128,6 +126,11 @@ public class MasterServer implements CommandLineRunner, IStoppable {
heartBeatInterval = conf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL, heartBeatInterval = conf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL,
Constants.defaultMasterHeartbeatInterval); Constants.defaultMasterHeartbeatInterval);
// master exec thread pool num
int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS,
Constants.defaultMasterExecThreadNum);
heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum); heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum);
// heartbeat thread implement // heartbeat thread implement
...@@ -140,10 +143,6 @@ public class MasterServer implements CommandLineRunner, IStoppable { ...@@ -140,10 +143,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
heartbeatMasterService. heartbeatMasterService.
scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS); scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS);
// master exec thread pool num
int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS,
Constants.defaultMasterExecThreadNum);
// master scheduler thread // master scheduler thread
MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread( MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread(
zkMasterClient, zkMasterClient,
...@@ -154,6 +153,8 @@ public class MasterServer implements CommandLineRunner, IStoppable { ...@@ -154,6 +153,8 @@ public class MasterServer implements CommandLineRunner, IStoppable {
masterSchedulerService.execute(masterSchedulerThread); masterSchedulerService.execute(masterSchedulerThread);
// start QuartzExecutors // start QuartzExecutors
// TODO...
// what system should do if exception
try { try {
ProcessScheduleJob.init(processDao); ProcessScheduleJob.init(processDao);
QuartzExecutors.getInstance().start(); QuartzExecutors.getInstance().start();
...@@ -173,13 +174,11 @@ public class MasterServer implements CommandLineRunner, IStoppable { ...@@ -173,13 +174,11 @@ public class MasterServer implements CommandLineRunner, IStoppable {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
String host = OSUtils.getHost();
// clear master table register info
serverDao.deleteMaster(host);
logger.info("master server stopped"); logger.info("master server stopped");
if (zkMasterClient.getActiveMasterNum() <= 1) { if (zkMasterClient.getActiveMasterNum() <= 1) {
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
alertDao.sendServerStopedAlert(1, host, "Master-Server"); zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
} }
} }
} }
......
...@@ -20,6 +20,7 @@ import cn.escheduler.common.Constants; ...@@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.thread.Stopper; import cn.escheduler.common.thread.Stopper;
import cn.escheduler.common.thread.ThreadUtils; import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.server.zk.ZKMasterClient; import cn.escheduler.server.zk.ZKMasterClient;
...@@ -98,18 +99,7 @@ public class MasterSchedulerThread implements Runnable { ...@@ -98,18 +99,7 @@ public class MasterSchedulerThread implements Runnable {
}catch (Exception e){ }catch (Exception e){
logger.error("master scheduler thread exception : " + e.getMessage(),e); logger.error("master scheduler thread exception : " + e.getMessage(),e);
}finally{ }finally{
if (mutex != null){ AbstractZKClient.releaseMutex(mutex);
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
logger.warn("lock release");
}else{
logger.error("lock release failed : " + e.getMessage(),e);
}
}
}
} }
} }
} }
......
...@@ -23,6 +23,7 @@ import cn.escheduler.common.thread.ThreadUtils; ...@@ -23,6 +23,7 @@ import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.FileUtils; import cn.escheduler.common.utils.FileUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.*; import cn.escheduler.dao.model.*;
import cn.escheduler.server.zk.ZKWorkerClient; import cn.escheduler.server.zk.ZKWorkerClient;
...@@ -226,13 +227,7 @@ public class FetchTaskThread implements Runnable{ ...@@ -226,13 +227,7 @@ public class FetchTaskThread implements Runnable{
}catch (Exception e){ }catch (Exception e){
logger.error("fetch task thread failure" ,e); logger.error("fetch task thread failure" ,e);
}finally { }finally {
if (mutex != null){ AbstractZKClient.releaseMutex(mutex);
try {
mutex.release();
} catch (Exception e) {
logger.error("fetch task lock release failure ",e);
}
}
} }
} }
} }
...@@ -247,6 +242,7 @@ public class FetchTaskThread implements Runnable{ ...@@ -247,6 +242,7 @@ public class FetchTaskThread implements Runnable{
taskInstance.getProcessInstance().getId(), taskInstance.getProcessInstance().getId(),
taskInstance.getId()); taskInstance.getId());
} }
/** /**
* check * check
* @param poolExecutor * @param poolExecutor
......
...@@ -19,9 +19,7 @@ package cn.escheduler.server.zk; ...@@ -19,9 +19,7 @@ package cn.escheduler.server.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.DaoFactory;
...@@ -29,8 +27,6 @@ import cn.escheduler.dao.ProcessDao; ...@@ -29,8 +27,6 @@ import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.ServerDao;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerServer;
import cn.escheduler.server.ResInfo;
import cn.escheduler.server.utils.ProcessUtils; import cn.escheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
...@@ -39,7 +35,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; ...@@ -39,7 +35,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -68,6 +63,7 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -68,6 +63,7 @@ public class ZKMasterClient extends AbstractZKClient {
* master database access * master database access
*/ */
private ServerDao serverDao = null; private ServerDao serverDao = null;
/** /**
* alert database access * alert database access
*/ */
...@@ -77,9 +73,6 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -77,9 +73,6 @@ public class ZKMasterClient extends AbstractZKClient {
*/ */
private ProcessDao processDao; private ProcessDao processDao;
private Date createTime = null;
/** /**
* zkMasterClient * zkMasterClient
*/ */
...@@ -118,7 +111,6 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -118,7 +111,6 @@ public class ZKMasterClient extends AbstractZKClient {
try { try {
// create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master // create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master
String znodeLock = getMasterStartUpLockPath(); String znodeLock = getMasterStartUpLockPath();
mutex = new InterProcessMutex(zkClient, znodeLock); mutex = new InterProcessMutex(zkClient, znodeLock);
mutex.acquire(); mutex.acquire();
...@@ -132,34 +124,24 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -132,34 +124,24 @@ public class ZKMasterClient extends AbstractZKClient {
this.listenerWorker(); this.listenerWorker();
// register master // register master
this.registMaster(); this.registerMaster();
// check if fault tolerance is required,failure and tolerance // check if fault tolerance is required,failure and tolerance
if (getActiveMasterNum() == 1) { if (getActiveMasterNum() == 1) {
failoverWorker(null, true); failoverWorker(null, true);
// processDao.masterStartupFaultTolerant();
failoverMaster(null); failoverMaster(null);
} }
}catch (Exception e){ }catch (Exception e){
logger.error("master start up exception : " + e.getMessage(),e); logger.error("master start up exception : " + e.getMessage(),e);
}finally { }finally {
if (mutex != null){ releaseMutex(mutex);
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
logger.warn("lock release");
}else{
logger.error("lock release failed : " + e.getMessage(),e);
}
}
}
} }
} }
/** /**
* init dao * init dao
*/ */
...@@ -168,15 +150,6 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -168,15 +150,6 @@ public class ZKMasterClient extends AbstractZKClient {
this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
} }
/**
* get maste dao
* @return
*/
public ServerDao getServerDao(){
return serverDao;
}
/** /**
* get alert dao * get alert dao
* @return * @return
...@@ -185,91 +158,25 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -185,91 +158,25 @@ public class ZKMasterClient extends AbstractZKClient {
return alertDao; return alertDao;
} }
/** /**
* register master znode * register master znode
*/ */
public void registMaster(){ public void registerMaster(){
// get current date
Date now = new Date();
createTime = now ;
try { try {
String osHost = OSUtils.getHost(); String serverPath = registerServer(ZKNodeType.MASTER);
if(StringUtils.isEmpty(serverPath)){
// zookeeper node exists, cannot start a new one. System.exit(-1);
if(checkZKNodeExists(osHost, ZKNodeType.MASTER)){
logger.error("register failure , master already started on host : {}" , osHost);
// exit system
System.exit(-1);
} }
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now);
// create temporary sequence nodes for master znode
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes());
logger.info("register master node {} success" , masterZNode);
// handle dead server
handleDeadServer(masterZNode, Constants.MASTER_PREFIX, Constants.DELETE_ZK_OP);
// delete master server from database
serverDao.deleteMaster(OSUtils.getHost());
// register master znode
serverDao.registerMaster(OSUtils.getHost(),
OSUtils.getProcessID(),
masterZNode,
ResInfo.getResInfoJson(),
createTime,
createTime);
} catch (Exception e) { } catch (Exception e) {
logger.error("register master failure : " + e.getMessage(),e); logger.error("register master failure : " + e.getMessage(),e);
System.exit(-1);
} }
} }
/**
* check the zookeeper node already exists
* @param host
* @param zkNodeType
* @return
* @throws Exception
*/
private boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception {
String path = null;
switch (zkNodeType){
case MASTER:
path = masterZNodeParentPath;
break;
case WORKER:
path = workerZNodeParentPath;
break;
case DEAD_SERVER:
path = deadServerZNodeParentPath;
break;
default:
break;
}
if(StringUtils.isEmpty(path)){
logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString());
return false;
}
List<String> serverList = null;
serverList = zkClient.getChildren().forPath(path);
if (CollectionUtils.isNotEmpty(serverList)){
for (String masterZNode : serverList){
if (masterZNode.startsWith(host)){
return true;
}
}
}
return false;
}
/** /**
* monitor master * monitor master
...@@ -278,8 +185,6 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -278,8 +185,6 @@ public class ZKMasterClient extends AbstractZKClient {
PathChildrenCache masterPc = new PathChildrenCache(zkClient, masterZNodeParentPath, true ,defaultThreadFactory); PathChildrenCache masterPc = new PathChildrenCache(zkClient, masterZNodeParentPath, true ,defaultThreadFactory);
try { try {
Date now = new Date();
createTime = now ;
masterPc.start(); masterPc.start();
masterPc.getListenable().addListener(new PathChildrenCacheListener() { masterPc.getListenable().addListener(new PathChildrenCacheListener() {
@Override @Override
...@@ -290,60 +195,13 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -290,60 +195,13 @@ public class ZKMasterClient extends AbstractZKClient {
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
String path = event.getData().getPath(); String path = event.getData().getPath();
logger.info("master node deleted : {}",event.getData().getPath()); String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.MASTER)){
InterProcessMutex mutexLock = null; return;
try {
// handle dead server, add to zk dead server pth
handleDeadServer(path, Constants.MASTER_PREFIX, Constants.ADD_ZK_OP);
if(masterZNode.equals(path)){
logger.error("master server({}) of myself dead , stopping...", path);
stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path));
break;
}
// create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master
String znodeLock = zkMasterClient.getMasterFailoverLockPath();
mutexLock = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutexLock.acquire();
String masterHost = getHostByEventDataPath(path);
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
alertDao.sendServerStopedAlert(1, masterHost, "Master-Server");
}
if(StringUtils.isNotEmpty(masterHost)){
failoverMaster(masterHost);
}
}catch (Exception e){
logger.error("master failover failed : " + e.getMessage(),e);
}finally {
if (mutexLock != null){
try {
mutexLock.release();
} catch (Exception e) {
logger.error("lock relase failed : " + e.getMessage(),e);
}
}
} }
removeZKNodePath(path, ZKNodeType.MASTER, true);
break; break;
case CHILD_UPDATED: case CHILD_UPDATED:
if (event.getData().getPath().contains(OSUtils.getHost())){
byte[] bytes = zkClient.getData().forPath(event.getData().getPath());
String resInfoStr = new String(bytes);
String[] splits = resInfoStr.split(Constants.COMMA);
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
return;
}
// updateProcessInstance Master information in database according to host
serverDao.updateMaster(OSUtils.getHost(),
OSUtils.getProcessID(),
ResInfo.getResInfoJson(Double.parseDouble(splits[2]),
Double.parseDouble(splits[3])),
DateUtils.stringToDate(splits[5]));
logger.debug("master zk node updated : {}",event.getData().getPath());
}
break; break;
default: default:
break; break;
...@@ -353,10 +211,69 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -353,10 +211,69 @@ public class ZKMasterClient extends AbstractZKClient {
}catch (Exception e){ }catch (Exception e){
logger.error("monitor master failed : " + e.getMessage(),e); logger.error("monitor master failed : " + e.getMessage(),e);
} }
}
private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
logger.info("{} node deleted : {}", zkNodeType.toString(), path);
InterProcessMutex mutex = null;
try {
String failoverPath = getFailoverLockPath(zkNodeType);
// create a distributed lock
mutex = new InterProcessMutex(getZkClient(), failoverPath);
mutex.acquire();
String serverHost = getHostByEventDataPath(path);
// handle dead server
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
//alert server down.
alertServerDown(serverHost, zkNodeType);
//failover server
if(failover){
failoverServerWhenDown(serverHost, zkNodeType);
}
}catch (Exception e){
logger.error("{} server failover failed.", zkNodeType.toString());
logger.error("failover exception : " + e.getMessage(),e);
}
finally {
releaseMutex(mutex);
}
}
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
if(StringUtils.isEmpty(serverHost)){
return ;
}
switch (zkNodeType){
case MASTER:
failoverMaster(serverHost);
break;
case WORKER:
failoverWorker(serverHost, true);
default:
break;
}
} }
private String getFailoverLockPath(ZKNodeType zkNodeType){
switch (zkNodeType){
case MASTER:
return getMasterFailoverLockPath();
case WORKER:
return getWorkerFailoverLockPath();
default:
return "";
}
}
private void alertServerDown(String serverHost, ZKNodeType zkNodeType) {
String serverType = zkNodeType.toString();
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER; i++) {
alertDao.sendServerStopedAlert(1, serverHost, serverType);
}
}
/** /**
* monitor worker * monitor worker
...@@ -365,8 +282,6 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -365,8 +282,6 @@ public class ZKMasterClient extends AbstractZKClient {
PathChildrenCache workerPc = new PathChildrenCache(zkClient,workerZNodeParentPath,true ,defaultThreadFactory); PathChildrenCache workerPc = new PathChildrenCache(zkClient,workerZNodeParentPath,true ,defaultThreadFactory);
try { try {
Date now = new Date();
createTime = now ;
workerPc.start(); workerPc.start();
workerPc.getListenable().addListener(new PathChildrenCacheListener() { workerPc.getListenable().addListener(new PathChildrenCacheListener() {
@Override @Override
...@@ -377,40 +292,8 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -377,40 +292,8 @@ public class ZKMasterClient extends AbstractZKClient {
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
String path = event.getData().getPath(); String path = event.getData().getPath();
logger.info("node deleted : {}",event.getData().getPath()); logger.info("node deleted : {}",event.getData().getPath());
removeZKNodePath(path, ZKNodeType.WORKER, true);
InterProcessMutex mutex = null;
try {
// handle dead server
handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP);
// create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/worker
String znodeLock = zkMasterClient.getWorkerFailoverLockPath();
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutex.acquire();
String workerHost = getHostByEventDataPath(path);
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server");
}
if(StringUtils.isNotEmpty(workerHost)){
failoverWorker(workerHost, true);
}
}catch (Exception e){
logger.error("worker failover failed : " + e.getMessage(),e);
}
finally {
if (mutex != null){
try {
mutex.release();
} catch (Exception e) {
logger.error("lock relase failed : " + e.getMessage(),e);
}
}
}
break; break;
default: default:
break; break;
...@@ -420,9 +303,9 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -420,9 +303,9 @@ public class ZKMasterClient extends AbstractZKClient {
}catch (Exception e){ }catch (Exception e){
logger.error("listener worker failed : " + e.getMessage(),e); logger.error("listener worker failed : " + e.getMessage(),e);
} }
} }
/** /**
* get master znode * get master znode
* @return * @return
...@@ -431,9 +314,6 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -431,9 +314,6 @@ public class ZKMasterClient extends AbstractZKClient {
return masterZNode; return masterZNode;
} }
/** /**
* task needs failover if task start before worker starts * task needs failover if task start before worker starts
* *
...@@ -460,15 +340,20 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -460,15 +340,20 @@ public class ZKMasterClient extends AbstractZKClient {
* @return * @return
*/ */
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
if(StringUtils.isEmpty(taskInstance.getHost())){
return false;
}
Date workerServerStartDate = null; Date workerServerStartDate = null;
List<WorkerServer> workerServers = processDao.queryWorkerServerByHost(taskInstance.getHost()); List<MasterServer> workerServers= getServersList(ZKNodeType.WORKER);
if(workerServers.size() > 0){ for(MasterServer server : workerServers){
workerServerStartDate = workerServers.get(0).getCreateTime(); if(server.getHost().equals(taskInstance.getHost())){
workerServerStartDate = server.getCreateTime();
break;
}
} }
if(workerServerStartDate != null){ if(workerServerStartDate != null){
return taskInstance.getStartTime().after(workerServerStartDate); return taskInstance.getStartTime().after(workerServerStartDate);
}else{ }else{
return false; return false;
} }
...@@ -478,6 +363,7 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -478,6 +363,7 @@ public class ZKMasterClient extends AbstractZKClient {
* failover worker tasks * failover worker tasks
* 1. kill yarn job if there are yarn jobs in tasks. * 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover. * 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null
* @param workerHost * @param workerHost
*/ */
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
...@@ -501,9 +387,6 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -501,9 +387,6 @@ public class ZKMasterClient extends AbstractZKClient {
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processDao.saveTaskInstance(taskInstance); processDao.saveTaskInstance(taskInstance);
} }
//update task Instance state value is NEED_FAULT_TOLERANCE
// processDao.updateNeedFailoverTaskInstances(workerHost);
logger.info("end worker[{}] failover ...", workerHost); logger.info("end worker[{}] failover ...", workerHost);
} }
...@@ -524,24 +407,4 @@ public class ZKMasterClient extends AbstractZKClient { ...@@ -524,24 +407,4 @@ public class ZKMasterClient extends AbstractZKClient {
logger.info("master failover end"); logger.info("master failover end");
} }
/**
* get host ip, string format: masterParentPath/ip_000001/value
* @param path
* @return
*/
private String getHostByEventDataPath(String path) {
int startIndex = path.lastIndexOf("/")+1;
int endIndex = path.lastIndexOf("_");
if(startIndex >= endIndex){
logger.error("parse ip error");
return "";
}
return path.substring(startIndex, endIndex);
}
} }
...@@ -17,13 +17,13 @@ ...@@ -17,13 +17,13 @@
package cn.escheduler.server.zk; package cn.escheduler.server.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.ServerDao;
import cn.escheduler.server.ResInfo; import cn.escheduler.common.utils.ResInfo;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
...@@ -34,7 +34,6 @@ import org.slf4j.Logger; ...@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
...@@ -130,52 +129,14 @@ public class ZKWorkerClient extends AbstractZKClient { ...@@ -130,52 +129,14 @@ public class ZKWorkerClient extends AbstractZKClient {
* register worker * register worker
*/ */
private void registWorker(){ private void registWorker(){
// get current date
Date now = new Date();
createTime = now ;
try { try {
String serverPath = registerServer(ZKNodeType.WORKER);
// encapsulation worker znnode if(StringUtils.isEmpty(serverPath)){
workerZNode = workerZNodeParentPath + "/" + OSUtils.getHost() + "_"; System.exit(-1);
List<String> workerZNodeList = zkClient.getChildren().forPath(workerZNodeParentPath);
if (CollectionUtils.isNotEmpty(workerZNodeList)){
boolean flag = false;
for (String workerZNode : workerZNodeList){
if (workerZNode.startsWith(OSUtils.getHost())){
flag = true;
break;
}
}
if (flag){
logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost());
// exit system
System.exit(-1);
}
} }
// String heartbeatZKInfo = getOsInfo(now);
// workerZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(workerZNode,
// heartbeatZKInfo.getBytes());
initWorkZNode();
// handle dead server
handleDeadServer(workerZNode, Constants.WORKER_PREFIX, Constants.DELETE_ZK_OP);
// delete worker server from database
serverDao.deleteWorker(OSUtils.getHost());
// register worker znode
serverDao.registerWorker(OSUtils.getHost(),
OSUtils.getProcessID(),
workerZNode,
ResInfo.getResInfoJson(),
createTime,
createTime);
} catch (Exception e) { } catch (Exception e) {
logger.error("register worker failure : " + e.getMessage(),e); logger.error("register worker failure : " + e.getMessage(),e);
System.exit(-1);
} }
} }
...@@ -198,35 +159,13 @@ public class ZKWorkerClient extends AbstractZKClient { ...@@ -198,35 +159,13 @@ public class ZKWorkerClient extends AbstractZKClient {
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
String path = event.getData().getPath(); String path = event.getData().getPath();
// handle dead server, add to zk dead server path
handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP);
//find myself dead //find myself dead
if(workerZNode.equals(path)){ String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
logger.warn(" worker server({}) of myself dead , stopping...", path); return;
stoppable.stop(String.format("worker server(%s) of myself dead , stopping",path)); }
}
logger.info("node deleted : {}", event.getData().getPath());
break; break;
case CHILD_UPDATED: case CHILD_UPDATED:
if (event.getData().getPath().contains(OSUtils.getHost())){
byte[] bytes = zkClient.getData().forPath(event.getData().getPath());
String resInfoStr = new String(bytes);
String[] splits = resInfoStr.split(Constants.COMMA);
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
return;
}
// updateProcessInstance master info in database according to host
serverDao.updateWorker(OSUtils.getHost(),
OSUtils.getProcessID(),
ResInfo.getResInfoJson(Double.parseDouble(splits[2])
,Double.parseDouble(splits[3])),
DateUtils.stringToDate(splits[5]));
logger.debug("node updated : {}",event.getData().getPath());
}
break; break;
default: default:
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册