diff --git a/README.md b/README.md
index f16e16f8ae737b4691e9f315c6f7b4965d8f0f1e..04b4629d93a7eeb43dbbfc549e31df301630b637 100644
--- a/README.md
+++ b/README.md
@@ -33,27 +33,13 @@ Its main objectives are as follows:
- There are more waiting partners to explore
-### Comparison with similar scheduler systems
-
-
- | EasyScheduler | Azkaban | Airflow
--- | -- | -- | --
-**Stability** | | |
-Single point of failure | Decentralized multi-master and multi-worker | Yes
Single Web and Scheduler Combination Node | Yes
Single Scheduler
-Additional HA requirements | Not required (HA is supported by itself) | DB | Celery / Dask / Mesos + Load Balancer + DB
-Overload processing | Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | Jammed the server when there are too many tasks | Jammed the server when there are too many tasks
-**Easy to use** | | |
-DAG Monitoring Interface | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables and so on at a glance. | Only task status can be seen | Can't visually distinguish task types
-Visual process definition | Yes
All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the api mode operation is provided. | No
DAG and custom upload via custom DSL | No
DAG is drawn through Python code, which is inconvenient to use, especially for business people who can't write code.
-Quick deployment | One-click deployment | Complex clustering deployment | Complex clustering deployment
-**Features** | | |
-Suspend and resume | Support pause, recover operation | No
Can only kill the workflow first and then re-run | No
Can only kill the workflow first and then re-run
-Whether to support multiple tenants | Users on easyscheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. " Supports traditional shell tasks, while supporting large data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | No | No
-Task type | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | shell、gobblin、hadoopJava、java、hive、pig、spark、hdfsToTeradata、teradataToHdfs | BashOperator、DummyOperator、MySqlOperator、HiveOperator、EmailOperator、HTTPOperator、SqlOperator
-Compatibility | Support the scheduling of big data jobs like spark, hive, Mr. At the same time, it is more compatible with big data business because it supports multiple tenants. | Because it does not support multi-tenant, it is not flexible enough to use business in big data platform. | Because it does not support multi-tenant, it is not flexible enough to use business in big data platform.
-**Scalability** | | |
-Whether to support custom task types | Yes | Yes | Yes
-Is Cluster Extension Supported? | Yes
The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline. | Yes
but complicated Executor horizontal extend | Yes
but complicated Executor horizontal extend
+### What's in Easy Scheduler
+
+ Stability | Easy to use | Features | Scalability |
+ -- | -- | -- | --
+Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables and so on at a glance. | Support pause, recover operation | support custom task types
+HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the api mode operation is provided. | Users on easyscheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. " Supports traditional shell tasks, while supporting large data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline.
+Overload processing: Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | One-click deployment | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | |
diff --git a/docs/zh_CN/SUMMARY.md b/docs/zh_CN/SUMMARY.md
index 01f3acc6b4c32ff9b702482843f8986a25c11d97..2b153b60c5910cf57ce8fd9be473e51a1ac84647 100644
--- a/docs/zh_CN/SUMMARY.md
+++ b/docs/zh_CN/SUMMARY.md
@@ -29,7 +29,7 @@
* [开发环境搭建](后端开发文档.md#项目编译)
* [自定义任务插件文档](任务插件开发.md#任务插件开发)
-* [接口文档](http://52.82.13.76:8888/easyscheduler/doc.html?language=zh_CN&lang=cn)
+* [接口文档](http://52.82.13.76:8888/escheduler/doc.html?language=zh_CN&lang=cn)
* FAQ
* [FAQ](EasyScheduler-FAQ.md)
* 系统版本升级文档
diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java
index cba39d5403c5243b8ecaa3f412d5eee069434bea..8d48129f5124b3cd0ad854790b20c13aefe4ce09 100644
--- a/escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java
+++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java
@@ -23,8 +23,6 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.dao.model.User;
import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java
index 08e8bf576e9f5cda020d3343c407cd5dd8aee890..cc6a847578ff11914c4f733724a23252fd6d990c 100644
--- a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java
+++ b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java
@@ -19,12 +19,12 @@ package cn.escheduler.api.service;
import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.ZookeeperMonitor;
+import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.dao.MonitorDBDao;
-import cn.escheduler.dao.model.MasterServer;
+import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.MonitorRecord;
import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.ZookeeperRecord;
-import org.apache.hadoop.mapred.Master;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
@@ -65,7 +65,7 @@ public class MonitorService extends BaseService{
Map result = new HashMap<>(5);
- List masterServers = new ZookeeperMonitor().getMasterServers();
+ List masterServers = getServerListFromZK(true);
result.put(Constants.DATA_LIST, masterServers);
putMsg(result,Status.SUCCESS);
@@ -99,11 +99,29 @@ public class MonitorService extends BaseService{
public Map queryWorker(User loginUser) {
Map result = new HashMap<>(5);
+ List workerServers = getServerListFromZK(false);
- List workerServers = new ZookeeperMonitor().getWorkerServers();
result.put(Constants.DATA_LIST, workerServers);
putMsg(result,Status.SUCCESS);
return result;
}
+
+ private List getServerListFromZK(boolean isMaster){
+ List servers = new ArrayList<>();
+ ZookeeperMonitor zookeeperMonitor = null;
+ try{
+ zookeeperMonitor = new ZookeeperMonitor();
+ ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
+ servers = zookeeperMonitor.getServersList(zkNodeType);
+ }catch (Exception e){
+ throw e;
+ }finally {
+ if(zookeeperMonitor != null){
+ zookeeperMonitor.close();
+ }
+ }
+ return servers;
+ }
+
}
diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java
index d4515c79f326e646d886950754a7df707cffea84..64fc8c76eb5ebe4d86cf5b68dc449b6e2c6bac99 100644
--- a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java
+++ b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java
@@ -25,6 +25,7 @@ import cn.escheduler.common.enums.FailureStrategy;
import cn.escheduler.common.enums.Priority;
import cn.escheduler.common.enums.ReleaseState;
import cn.escheduler.common.enums.WarningType;
+import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.dao.ProcessDao;
diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java
index 6957075dda3bde91bfc399b3d6335335c992383b..a54b385b0dfd5a4c8bba92558a564492583a181b 100644
--- a/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java
+++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java
@@ -20,7 +20,7 @@ import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.dao.mapper.MasterServerMapper;
import cn.escheduler.dao.mapper.WorkerServerMapper;
-import cn.escheduler.dao.model.MasterServer;
+import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.WorkerServer;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java
index 0f44b5f7db0f9bc5b4e0333df4989bcd423634b0..dc6f95d7ec97b6d3c150e91d579e41cf87597700 100644
--- a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java
+++ b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java
@@ -1,9 +1,9 @@
package cn.escheduler.api.utils;
+import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.zk.AbstractZKClient;
-import cn.escheduler.dao.model.MasterServer;
+import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.ZookeeperRecord;
-import cn.escheduler.server.ResInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-import java.util.Map;
/**
@@ -36,29 +35,12 @@ public class ZookeeperMonitor extends AbstractZKClient{
return null;
}
- /**
- * get server list.
- * @param isMaster
- * @return
- */
- public List getServers(boolean isMaster){
- List masterServers = new ArrayList<>();
- Map masterMap = getServerList(isMaster);
- String parentPath = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath();
- for(String path : masterMap.keySet()){
- MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path));
- masterServer.setZkDirectory( parentPath + "/"+ path);
- masterServers.add(masterServer);
- }
- return masterServers;
- }
-
/**
* get master servers
* @return
*/
public List getMasterServers(){
- return getServers(true);
+ return getServersList(ZKNodeType.MASTER);
}
/**
@@ -66,7 +48,7 @@ public class ZookeeperMonitor extends AbstractZKClient{
* @return
*/
public List getWorkerServers(){
- return getServers(false);
+ return getServersList(ZKNodeType.WORKER);
}
private static List zookeeperInfoList(String zookeeperServers) {
diff --git a/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java b/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java
index 87a26ba449ad0f4a31c3f1037cdcd777ab4bbcfe..4feb06ad5ccba257b5180e3e5490d7a20cba89f0 100644
--- a/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java
+++ b/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java
@@ -1,6 +1,6 @@
package cn.escheduler.api.utils;
-import cn.escheduler.dao.model.MasterServer;
+import cn.escheduler.common.model.MasterServer;
import org.junit.Assert;
import org.junit.Test;
diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java b/escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java
similarity index 98%
rename from escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java
rename to escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java
index 1ccb15b5ccec5123382810631f25af5906bb51c1..bb2f38cb147911c1209b10bb7e3571aafb3e47a5 100644
--- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java
+++ b/escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package cn.escheduler.dao.model;
+package cn.escheduler.common.model;
import java.util.Date;
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java
similarity index 95%
rename from escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java
rename to escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java
index 844c7be8b0e79798e134496d2b4e269901a26e9b..6a48d6bc89704aaa7e7e6419127d3072f3f538ea 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java
+++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java
@@ -14,13 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package cn.escheduler.server;
+package cn.escheduler.common.utils;
import cn.escheduler.common.Constants;
-import cn.escheduler.common.utils.DateUtils;
-import cn.escheduler.common.utils.JSONUtils;
-import cn.escheduler.common.utils.OSUtils;
-import cn.escheduler.dao.model.MasterServer;
+import cn.escheduler.common.model.MasterServer;
import java.util.Date;
diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
index 468efb326db1cba64c080edab953c874793990a2..0706efc76741bc6894ba8e065189e55d53358450 100644
--- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
+++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
@@ -18,12 +18,16 @@ package cn.escheduler.common.zk;
import cn.escheduler.common.Constants;
import cn.escheduler.common.IStoppable;
+import cn.escheduler.common.enums.ZKNodeType;
+import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.enums.ServerEnum;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils;
+import cn.escheduler.common.utils.ResInfo;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -32,7 +36,6 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -208,48 +211,11 @@ public abstract class AbstractZKClient {
return false;
}
- /**
- * init system znode
- */
- protected void initSystemZNode(){
- try {
- // read master node parent path from conf
- masterZNodeParentPath = getMasterZNodeParentPath();
- // read worker node parent path from conf
- workerZNodeParentPath = getWorkerZNodeParentPath();
-
- // read server node parent path from conf
- deadServerZNodeParentPath = conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS);
-
- if(zkClient.checkExists().forPath(deadServerZNodeParentPath) == null){
- // create persistent dead server parent node
- zkClient.create().creatingParentContainersIfNeeded()
- .withMode(CreateMode.PERSISTENT).forPath(deadServerZNodeParentPath);
- }
-
- if(zkClient.checkExists().forPath(masterZNodeParentPath) == null){
- // create persistent master parent node
- zkClient.create().creatingParentContainersIfNeeded()
- .withMode(CreateMode.PERSISTENT).forPath(masterZNodeParentPath);
- }
-
- if(zkClient.checkExists().forPath(workerZNodeParentPath) == null){
- // create persistent worker parent node
- zkClient.create().creatingParentContainersIfNeeded()
- .withMode(CreateMode.PERSISTENT).forPath(workerZNodeParentPath);
- }
-
- } catch (Exception e) {
- logger.error("init system znode failed : " + e.getMessage(),e);
- }
- }
-
public void removeDeadServerByHost(String host, String serverType) throws Exception {
List deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath);
for(String serverPath : deadServers){
if(serverPath.startsWith(serverType+UNDERLINE+host)){
-
String server = deadServerZNodeParentPath + SINGLE_SLASH + serverPath;
zkClient.delete().forPath(server);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
@@ -257,28 +223,68 @@ public abstract class AbstractZKClient {
}
}
+
+ /**
+ * create zookeeper path according the zk node type.
+ * @param zkNodeType
+ * @return
+ * @throws Exception
+ */
+ private String createZNodePath(ZKNodeType zkNodeType) throws Exception {
+ // specify the format of stored data in ZK nodes
+ String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
+ // create temporary sequence nodes for master znode
+ String parentPath = getZNodeParentPath(zkNodeType);
+ String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
+ String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
+ serverPathPrefix + "_", heartbeatZKInfo.getBytes());
+ logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
+ return registerPath;
+ }
+
+ /**
+ * register server, if server already exists, return null.
+ * @param zkNodeType
+ * @return register server path in zookeeper
+ */
+ public String registerServer(ZKNodeType zkNodeType) throws Exception {
+ String registerPath = null;
+ String host = OSUtils.getHost();
+ if(checkZKNodeExists(host, zkNodeType)){
+ logger.error("register failure , {} server already started on host : {}" ,
+ zkNodeType.toString(), host);
+ return registerPath;
+ }
+ registerPath = createZNodePath(ZKNodeType.MASTER);
+
+ // handle dead server
+ handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
+
+ return registerPath;
+ }
+
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
* @param zNode node path
- * @param serverType master or worker prefix
+ * @param zkNodeType master or worker
* @param opType delete or add
* @throws Exception
*/
- public void handleDeadServer(String zNode, String serverType, String opType) throws Exception {
+ public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
//ip_sequenceno
String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
- String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
+ String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete
if(opType.equals(DELETE_ZK_OP)){
String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE);
String ip = ipAndSeqNo[0];
- removeDeadServerByHost(ip, serverType);
+ removeDeadServerByHost(ip, type);
}else if(opType.equals(ADD_ZK_OP)){
String deadServerPath = deadServerZNodeParentPath + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
@@ -287,7 +293,8 @@ public abstract class AbstractZKClient {
zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes());
- logger.info("{} server dead , and {} added to zk dead server path success" , serverType, zNode);
+ logger.info("{} server dead , and {} added to zk dead server path success" ,
+ zkNodeType.toString(), zNode);
}
}
@@ -343,16 +350,34 @@ public abstract class AbstractZKClient {
return sb.toString();
}
+ /**
+ * get server list.
+ * @param zkNodeType
+ * @return
+ */
+ public List getServersList(ZKNodeType zkNodeType){
+ Map masterMap = getServerMaps(zkNodeType);
+ String parentPath = getZNodeParentPath(zkNodeType);
+
+ List masterServers = new ArrayList<>();
+ for(String path : masterMap.keySet()){
+ MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path));
+ masterServer.setZkDirectory( parentPath + "/"+ path);
+ masterServers.add(masterServer);
+ }
+ return masterServers;
+ }
+
/**
* get master server list map.
* result : {host : resource info}
* @return
*/
- public Map getServerList(boolean isMaster ){
+ public Map getServerMaps(ZKNodeType zkNodeType){
Map masterMap = new HashMap<>();
try {
- String path = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath();
+ String path = getZNodeParentPath(zkNodeType);
List serverList = getZkClient().getChildren().forPath(path);
for(String server : serverList){
byte[] bytes = getZkClient().getData().forPath(path + "/" + server);
@@ -365,6 +390,29 @@ public abstract class AbstractZKClient {
return masterMap;
}
+ /**
+ * check the zookeeper node already exists
+ * @param host
+ * @param zkNodeType
+ * @return
+ * @throws Exception
+ */
+ public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
+ String path = getZNodeParentPath(zkNodeType);
+ if(StringUtils.isEmpty(path)){
+ logger.error("check zk node exists error, host:{}, zk node type:{}",
+ host, zkNodeType.toString());
+ return false;
+ }
+ Map serverMaps = getServerMaps(zkNodeType);
+ for(String hostKey : serverMaps.keySet()){
+ if(hostKey.startsWith(host)){
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* get zkclient
* @return
@@ -393,6 +441,34 @@ public abstract class AbstractZKClient {
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS);
}
+ /**
+ * get zookeeper node parent path
+ * @param zkNodeType
+ * @return
+ */
+ public String getZNodeParentPath(ZKNodeType zkNodeType) {
+ String path = "";
+ switch (zkNodeType){
+ case MASTER:
+ return getMasterZNodeParentPath();
+ case WORKER:
+ return getWorkerZNodeParentPath();
+ case DEAD_SERVER:
+ return getDeadZNodeParentPath();
+ default:
+ break;
+ }
+ return path;
+ }
+
+ /**
+ * get dead server node parent path
+ * @return
+ */
+ protected String getDeadZNodeParentPath(){
+ return conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS);
+ }
+
/**
* get master start up lock path
* @return
@@ -417,6 +493,82 @@ public abstract class AbstractZKClient {
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS);
}
+ /**
+ * release mutex
+ * @param mutex
+ */
+ public static void releaseMutex(InterProcessMutex mutex) {
+ if (mutex != null){
+ try {
+ mutex.release();
+ } catch (Exception e) {
+ if(e.getMessage().equals("instance must be started before calling this method")){
+ logger.warn("lock release");
+ }else{
+ logger.error("lock release failed : " + e.getMessage(),e);
+ }
+
+ }
+ }
+ }
+
+ /**
+ * init system znode
+ */
+ protected void initSystemZNode(){
+ try {
+ createNodePath(getMasterZNodeParentPath());
+ createNodePath(getWorkerZNodeParentPath());
+ createNodePath(getDeadZNodeParentPath());
+
+ } catch (Exception e) {
+ logger.error("init system znode failed : " + e.getMessage(),e);
+ }
+ }
+
+ /**
+ * create zookeeper node path if not exists
+ * @param zNodeParentPath
+ * @throws Exception
+ */
+ private void createNodePath(String zNodeParentPath) throws Exception {
+ if(null == zkClient.checkExists().forPath(zNodeParentPath)){
+ zkClient.create().creatingParentContainersIfNeeded()
+ .withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
+ }
+ }
+
+ /**
+ * server self dead, stop all threads
+ * @param serverHost
+ * @param zkNodeType
+ */
+ protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
+ if (serverHost.equals(OSUtils.getHost())) {
+ logger.error("{} server({}) of myself dead , stopping...",
+ zkNodeType.toString(), serverHost);
+ stoppable.stop(String.format(" {} server {} of myself dead , stopping...",
+ zkNodeType.toString(), serverHost));
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * get host ip, string format: masterParentPath/ip_000001/value
+ * @param path
+ * @return
+ */
+ protected String getHostByEventDataPath(String path) {
+ int startIndex = path.lastIndexOf("/")+1;
+ int endIndex = path.lastIndexOf("_");
+
+ if(startIndex >= endIndex){
+ logger.error("parse ip error");
+ return "";
+ }
+ return path.substring(startIndex, endIndex);
+ }
/**
* acquire zk lock
* @param zkClient
diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java
index 82c71e2855bf0e0e179997b19e269aac13e9c1e7..36823d8b2541b29a9c44fad9328be77b49b85752 100644
--- a/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java
+++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java
@@ -18,7 +18,7 @@ package cn.escheduler.dao;
import cn.escheduler.dao.mapper.MasterServerMapper;
import cn.escheduler.dao.mapper.WorkerServerMapper;
-import cn.escheduler.dao.model.MasterServer;
+import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.WorkerServer;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java
index 4fec8092de5d0b1c873dadc1bd0bf540a2a294f8..8fbfb5298e32acd4380161c1dbd84f7b0cb4eb05 100644
--- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java
+++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java
@@ -16,7 +16,7 @@
*/
package cn.escheduler.dao.mapper;
-import cn.escheduler.dao.model.MasterServer;
+import cn.escheduler.common.model.MasterServer;
import org.apache.ibatis.annotations.*;
import org.apache.ibatis.type.JdbcType;
diff --git a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java
index 9f66069eeb0fe3c3a650c48f79d98031bdce22be..f74683d149c83ffa3ad4745ef57f9e7ad101e348 100644
--- a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java
+++ b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java
@@ -17,7 +17,7 @@
package cn.escheduler.dao.mapper;
import cn.escheduler.dao.datasource.ConnectionFactory;
-import cn.escheduler.dao.model.MasterServer;
+import cn.escheduler.common.model.MasterServer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
index bf0dcbfe75b8c36d9bd8ebce9a2a62decfdfeeaf..562b6509e5877af07aa4625d97da6f82255f5725 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
@@ -119,8 +119,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
public MasterServer(ProcessDao processDao){
zkMasterClient = ZKMasterClient.getZKMasterClient(processDao);
- this.serverDao = zkMasterClient.getServerDao();
- this.alertDao = zkMasterClient.getAlertDao();
}
public void run(ProcessDao processDao){
@@ -128,6 +126,11 @@ public class MasterServer implements CommandLineRunner, IStoppable {
heartBeatInterval = conf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL,
Constants.defaultMasterHeartbeatInterval);
+ // master exec thread pool num
+ int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS,
+ Constants.defaultMasterExecThreadNum);
+
+
heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum);
// heartbeat thread implement
@@ -140,10 +143,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
heartbeatMasterService.
scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS);
- // master exec thread pool num
- int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS,
- Constants.defaultMasterExecThreadNum);
-
// master scheduler thread
MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread(
zkMasterClient,
@@ -154,6 +153,8 @@ public class MasterServer implements CommandLineRunner, IStoppable {
masterSchedulerService.execute(masterSchedulerThread);
// start QuartzExecutors
+ // TODO...
+ // what system should do if exception
try {
ProcessScheduleJob.init(processDao);
QuartzExecutors.getInstance().start();
@@ -173,13 +174,11 @@ public class MasterServer implements CommandLineRunner, IStoppable {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- String host = OSUtils.getHost();
- // clear master table register info
- serverDao.deleteMaster(host);
logger.info("master server stopped");
if (zkMasterClient.getActiveMasterNum() <= 1) {
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
- alertDao.sendServerStopedAlert(1, host, "Master-Server");
+ zkMasterClient.getAlertDao().sendServerStopedAlert(
+ 1, OSUtils.getHost(), "Master-Server");
}
}
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java
index b845e19ae0030e5e43601662aee410b6101df869..85623375544e218e140c22d84f193e63080f380f 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java
@@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.thread.Stopper;
import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.OSUtils;
+import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.server.zk.ZKMasterClient;
@@ -98,18 +99,7 @@ public class MasterSchedulerThread implements Runnable {
}catch (Exception e){
logger.error("master scheduler thread exception : " + e.getMessage(),e);
}finally{
- if (mutex != null){
- try {
- mutex.release();
- } catch (Exception e) {
- if(e.getMessage().equals("instance must be started before calling this method")){
- logger.warn("lock release");
- }else{
- logger.error("lock release failed : " + e.getMessage(),e);
- }
-
- }
- }
+ AbstractZKClient.releaseMutex(mutex);
}
}
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
index f899c62d089921577720ded085e2ae4d31b906a3..2208cf52043f39b7c567deec56a7457f2a3a286d 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
@@ -23,6 +23,7 @@ import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.FileUtils;
import cn.escheduler.common.utils.OSUtils;
+import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.*;
import cn.escheduler.server.zk.ZKWorkerClient;
@@ -226,13 +227,7 @@ public class FetchTaskThread implements Runnable{
}catch (Exception e){
logger.error("fetch task thread failure" ,e);
}finally {
- if (mutex != null){
- try {
- mutex.release();
- } catch (Exception e) {
- logger.error("fetch task lock release failure ",e);
- }
- }
+ AbstractZKClient.releaseMutex(mutex);
}
}
}
@@ -247,6 +242,7 @@ public class FetchTaskThread implements Runnable{
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
}
+
/**
* check
* @param poolExecutor
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
index 3596155dd3173b83f5df042f979342b24ad291c7..c972580e314cfeb4c8e121aeb12129431bb47696 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
@@ -19,9 +19,7 @@ package cn.escheduler.server.zk;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.ZKNodeType;
-import cn.escheduler.common.utils.CollectionUtils;
-import cn.escheduler.common.utils.DateUtils;
-import cn.escheduler.common.utils.OSUtils;
+import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory;
@@ -29,8 +27,6 @@ import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.ServerDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
-import cn.escheduler.dao.model.WorkerServer;
-import cn.escheduler.server.ResInfo;
import cn.escheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -39,7 +35,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils;
-import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +63,7 @@ public class ZKMasterClient extends AbstractZKClient {
* master database access
*/
private ServerDao serverDao = null;
+
/**
* alert database access
*/
@@ -77,9 +73,6 @@ public class ZKMasterClient extends AbstractZKClient {
*/
private ProcessDao processDao;
-
- private Date createTime = null;
-
/**
* zkMasterClient
*/
@@ -118,7 +111,6 @@ public class ZKMasterClient extends AbstractZKClient {
try {
// create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master
String znodeLock = getMasterStartUpLockPath();
-
mutex = new InterProcessMutex(zkClient, znodeLock);
mutex.acquire();
@@ -132,34 +124,24 @@ public class ZKMasterClient extends AbstractZKClient {
this.listenerWorker();
// register master
- this.registMaster();
+ this.registerMaster();
// check if fault tolerance is required,failure and tolerance
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
-// processDao.masterStartupFaultTolerant();
failoverMaster(null);
}
}catch (Exception e){
logger.error("master start up exception : " + e.getMessage(),e);
}finally {
- if (mutex != null){
- try {
- mutex.release();
- } catch (Exception e) {
- if(e.getMessage().equals("instance must be started before calling this method")){
- logger.warn("lock release");
- }else{
- logger.error("lock release failed : " + e.getMessage(),e);
- }
-
- }
- }
+ releaseMutex(mutex);
}
}
+
+
/**
* init dao
*/
@@ -168,15 +150,6 @@ public class ZKMasterClient extends AbstractZKClient {
this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
-
- /**
- * get maste dao
- * @return
- */
- public ServerDao getServerDao(){
- return serverDao;
- }
-
/**
* get alert dao
* @return
@@ -185,91 +158,25 @@ public class ZKMasterClient extends AbstractZKClient {
return alertDao;
}
+
+
+
/**
* register master znode
*/
- public void registMaster(){
-
- // get current date
- Date now = new Date();
- createTime = now ;
+ public void registerMaster(){
try {
- String osHost = OSUtils.getHost();
-
- // zookeeper node exists, cannot start a new one.
- if(checkZKNodeExists(osHost, ZKNodeType.MASTER)){
- logger.error("register failure , master already started on host : {}" , osHost);
- // exit system
- System.exit(-1);
+ String serverPath = registerServer(ZKNodeType.MASTER);
+ if(StringUtils.isEmpty(serverPath)){
+ System.exit(-1);
}
-
- // specify the format of stored data in ZK nodes
- String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now);
- // create temporary sequence nodes for master znode
- masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
- masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes());
-
- logger.info("register master node {} success" , masterZNode);
-
- // handle dead server
- handleDeadServer(masterZNode, Constants.MASTER_PREFIX, Constants.DELETE_ZK_OP);
-
- // delete master server from database
- serverDao.deleteMaster(OSUtils.getHost());
-
- // register master znode
- serverDao.registerMaster(OSUtils.getHost(),
- OSUtils.getProcessID(),
- masterZNode,
- ResInfo.getResInfoJson(),
- createTime,
- createTime);
-
} catch (Exception e) {
logger.error("register master failure : " + e.getMessage(),e);
+ System.exit(-1);
}
}
- /**
- * check the zookeeper node already exists
- * @param host
- * @param zkNodeType
- * @return
- * @throws Exception
- */
- private boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception {
-
- String path = null;
- switch (zkNodeType){
- case MASTER:
- path = masterZNodeParentPath;
- break;
- case WORKER:
- path = workerZNodeParentPath;
- break;
- case DEAD_SERVER:
- path = deadServerZNodeParentPath;
- break;
- default:
- break;
- }
- if(StringUtils.isEmpty(path)){
- logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString());
- return false;
- }
-
- List serverList = null;
- serverList = zkClient.getChildren().forPath(path);
- if (CollectionUtils.isNotEmpty(serverList)){
- for (String masterZNode : serverList){
- if (masterZNode.startsWith(host)){
- return true;
- }
- }
- }
- return false;
- }
/**
* monitor master
@@ -278,8 +185,6 @@ public class ZKMasterClient extends AbstractZKClient {
PathChildrenCache masterPc = new PathChildrenCache(zkClient, masterZNodeParentPath, true ,defaultThreadFactory);
try {
- Date now = new Date();
- createTime = now ;
masterPc.start();
masterPc.getListenable().addListener(new PathChildrenCacheListener() {
@Override
@@ -290,60 +195,13 @@ public class ZKMasterClient extends AbstractZKClient {
break;
case CHILD_REMOVED:
String path = event.getData().getPath();
- logger.info("master node deleted : {}",event.getData().getPath());
-
- InterProcessMutex mutexLock = null;
- try {
- // handle dead server, add to zk dead server pth
- handleDeadServer(path, Constants.MASTER_PREFIX, Constants.ADD_ZK_OP);
-
- if(masterZNode.equals(path)){
- logger.error("master server({}) of myself dead , stopping...", path);
- stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path));
- break;
- }
-
- // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master
- String znodeLock = zkMasterClient.getMasterFailoverLockPath();
- mutexLock = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
- mutexLock.acquire();
-
- String masterHost = getHostByEventDataPath(path);
- for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
- alertDao.sendServerStopedAlert(1, masterHost, "Master-Server");
- }
- if(StringUtils.isNotEmpty(masterHost)){
- failoverMaster(masterHost);
- }
- }catch (Exception e){
- logger.error("master failover failed : " + e.getMessage(),e);
- }finally {
- if (mutexLock != null){
- try {
- mutexLock.release();
- } catch (Exception e) {
- logger.error("lock relase failed : " + e.getMessage(),e);
- }
- }
+ String serverHost = getHostByEventDataPath(path);
+ if(checkServerSelfDead(serverHost, ZKNodeType.MASTER)){
+ return;
}
+ removeZKNodePath(path, ZKNodeType.MASTER, true);
break;
case CHILD_UPDATED:
- if (event.getData().getPath().contains(OSUtils.getHost())){
- byte[] bytes = zkClient.getData().forPath(event.getData().getPath());
- String resInfoStr = new String(bytes);
- String[] splits = resInfoStr.split(Constants.COMMA);
- if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
- return;
- }
- // updateProcessInstance Master information in database according to host
- serverDao.updateMaster(OSUtils.getHost(),
- OSUtils.getProcessID(),
- ResInfo.getResInfoJson(Double.parseDouble(splits[2]),
- Double.parseDouble(splits[3])),
- DateUtils.stringToDate(splits[5]));
-
- logger.debug("master zk node updated : {}",event.getData().getPath());
- }
break;
default:
break;
@@ -353,10 +211,69 @@ public class ZKMasterClient extends AbstractZKClient {
}catch (Exception e){
logger.error("monitor master failed : " + e.getMessage(),e);
}
+}
+
+ private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
+ logger.info("{} node deleted : {}", zkNodeType.toString(), path);
+ InterProcessMutex mutex = null;
+ try {
+ String failoverPath = getFailoverLockPath(zkNodeType);
+ // create a distributed lock
+ mutex = new InterProcessMutex(getZkClient(), failoverPath);
+ mutex.acquire();
+
+ String serverHost = getHostByEventDataPath(path);
+ // handle dead server
+ handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
+ //alert server down.
+ alertServerDown(serverHost, zkNodeType);
+ //failover server
+ if(failover){
+ failoverServerWhenDown(serverHost, zkNodeType);
+ }
+ }catch (Exception e){
+ logger.error("{} server failover failed.", zkNodeType.toString());
+ logger.error("failover exception : " + e.getMessage(),e);
+ }
+ finally {
+ releaseMutex(mutex);
+ }
+ }
+ private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
+ if(StringUtils.isEmpty(serverHost)){
+ return ;
+ }
+ switch (zkNodeType){
+ case MASTER:
+ failoverMaster(serverHost);
+ break;
+ case WORKER:
+ failoverWorker(serverHost, true);
+ default:
+ break;
+ }
}
+ private String getFailoverLockPath(ZKNodeType zkNodeType){
+ switch (zkNodeType){
+ case MASTER:
+ return getMasterFailoverLockPath();
+ case WORKER:
+ return getWorkerFailoverLockPath();
+ default:
+ return "";
+ }
+ }
+
+ private void alertServerDown(String serverHost, ZKNodeType zkNodeType) {
+
+ String serverType = zkNodeType.toString();
+ for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER; i++) {
+ alertDao.sendServerStopedAlert(1, serverHost, serverType);
+ }
+ }
/**
* monitor worker
@@ -365,8 +282,6 @@ public class ZKMasterClient extends AbstractZKClient {
PathChildrenCache workerPc = new PathChildrenCache(zkClient,workerZNodeParentPath,true ,defaultThreadFactory);
try {
- Date now = new Date();
- createTime = now ;
workerPc.start();
workerPc.getListenable().addListener(new PathChildrenCacheListener() {
@Override
@@ -377,40 +292,8 @@ public class ZKMasterClient extends AbstractZKClient {
break;
case CHILD_REMOVED:
String path = event.getData().getPath();
-
logger.info("node deleted : {}",event.getData().getPath());
-
- InterProcessMutex mutex = null;
- try {
-
- // handle dead server
- handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP);
-
- // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/worker
- String znodeLock = zkMasterClient.getWorkerFailoverLockPath();
- mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
- mutex.acquire();
-
- String workerHost = getHostByEventDataPath(path);
- for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
- alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server");
- }
-
- if(StringUtils.isNotEmpty(workerHost)){
- failoverWorker(workerHost, true);
- }
- }catch (Exception e){
- logger.error("worker failover failed : " + e.getMessage(),e);
- }
- finally {
- if (mutex != null){
- try {
- mutex.release();
- } catch (Exception e) {
- logger.error("lock relase failed : " + e.getMessage(),e);
- }
- }
- }
+ removeZKNodePath(path, ZKNodeType.WORKER, true);
break;
default:
break;
@@ -420,9 +303,9 @@ public class ZKMasterClient extends AbstractZKClient {
}catch (Exception e){
logger.error("listener worker failed : " + e.getMessage(),e);
}
-
}
+
/**
* get master znode
* @return
@@ -431,9 +314,6 @@ public class ZKMasterClient extends AbstractZKClient {
return masterZNode;
}
-
-
-
/**
* task needs failover if task start before worker starts
*
@@ -460,15 +340,20 @@ public class ZKMasterClient extends AbstractZKClient {
* @return
*/
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
+ if(StringUtils.isEmpty(taskInstance.getHost())){
+ return false;
+ }
Date workerServerStartDate = null;
- List workerServers = processDao.queryWorkerServerByHost(taskInstance.getHost());
- if(workerServers.size() > 0){
- workerServerStartDate = workerServers.get(0).getCreateTime();
+ List workerServers= getServersList(ZKNodeType.WORKER);
+ for(MasterServer server : workerServers){
+ if(server.getHost().equals(taskInstance.getHost())){
+ workerServerStartDate = server.getCreateTime();
+ break;
+ }
}
if(workerServerStartDate != null){
return taskInstance.getStartTime().after(workerServerStartDate);
-
}else{
return false;
}
@@ -478,6 +363,7 @@ public class ZKMasterClient extends AbstractZKClient {
* failover worker tasks
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
+ * 3. failover all tasks when workerHost is null
* @param workerHost
*/
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
@@ -501,9 +387,6 @@ public class ZKMasterClient extends AbstractZKClient {
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processDao.saveTaskInstance(taskInstance);
}
-
- //update task Instance state value is NEED_FAULT_TOLERANCE
- // processDao.updateNeedFailoverTaskInstances(workerHost);
logger.info("end worker[{}] failover ...", workerHost);
}
@@ -524,24 +407,4 @@ public class ZKMasterClient extends AbstractZKClient {
logger.info("master failover end");
}
- /**
- * get host ip, string format: masterParentPath/ip_000001/value
- * @param path
- * @return
- */
- private String getHostByEventDataPath(String path) {
- int startIndex = path.lastIndexOf("/")+1;
- int endIndex = path.lastIndexOf("_");
-
- if(startIndex >= endIndex){
- logger.error("parse ip error");
- return "";
- }
- return path.substring(startIndex, endIndex);
- }
-
-
-
-
-
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java
index e00d72da244d2d582180a89c2c69be68b7463c6b..0c35728f84dae12d54381f0dd757ea84c120a804 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java
@@ -17,13 +17,13 @@
package cn.escheduler.server.zk;
import cn.escheduler.common.Constants;
-import cn.escheduler.common.utils.CollectionUtils;
-import cn.escheduler.common.utils.DateUtils;
+import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ServerDao;
-import cn.escheduler.server.ResInfo;
+import cn.escheduler.common.utils.ResInfo;
+import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
-import java.util.List;
import java.util.concurrent.ThreadFactory;
@@ -130,52 +129,14 @@ public class ZKWorkerClient extends AbstractZKClient {
* register worker
*/
private void registWorker(){
-
- // get current date
- Date now = new Date();
- createTime = now ;
try {
-
- // encapsulation worker znnode
- workerZNode = workerZNodeParentPath + "/" + OSUtils.getHost() + "_";
- List workerZNodeList = zkClient.getChildren().forPath(workerZNodeParentPath);
-
- if (CollectionUtils.isNotEmpty(workerZNodeList)){
- boolean flag = false;
- for (String workerZNode : workerZNodeList){
- if (workerZNode.startsWith(OSUtils.getHost())){
- flag = true;
- break;
- }
- }
-
- if (flag){
- logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost());
- // exit system
- System.exit(-1);
- }
+ String serverPath = registerServer(ZKNodeType.WORKER);
+ if(StringUtils.isEmpty(serverPath)){
+ System.exit(-1);
}
-
-// String heartbeatZKInfo = getOsInfo(now);
-// workerZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(workerZNode,
-// heartbeatZKInfo.getBytes());
-
- initWorkZNode();
- // handle dead server
- handleDeadServer(workerZNode, Constants.WORKER_PREFIX, Constants.DELETE_ZK_OP);
-
- // delete worker server from database
- serverDao.deleteWorker(OSUtils.getHost());
-
- // register worker znode
- serverDao.registerWorker(OSUtils.getHost(),
- OSUtils.getProcessID(),
- workerZNode,
- ResInfo.getResInfoJson(),
- createTime,
- createTime);
} catch (Exception e) {
logger.error("register worker failure : " + e.getMessage(),e);
+ System.exit(-1);
}
}
@@ -198,35 +159,13 @@ public class ZKWorkerClient extends AbstractZKClient {
break;
case CHILD_REMOVED:
String path = event.getData().getPath();
-
- // handle dead server, add to zk dead server path
- handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP);
-
//find myself dead
- if(workerZNode.equals(path)){
-
- logger.warn(" worker server({}) of myself dead , stopping...", path);
- stoppable.stop(String.format("worker server(%s) of myself dead , stopping",path));
- }
- logger.info("node deleted : {}", event.getData().getPath());
+ String serverHost = getHostByEventDataPath(path);
+ if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
+ return;
+ }
break;
case CHILD_UPDATED:
- if (event.getData().getPath().contains(OSUtils.getHost())){
- byte[] bytes = zkClient.getData().forPath(event.getData().getPath());
- String resInfoStr = new String(bytes);
- String[] splits = resInfoStr.split(Constants.COMMA);
- if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
- return;
- }
-
- // updateProcessInstance master info in database according to host
- serverDao.updateWorker(OSUtils.getHost(),
- OSUtils.getProcessID(),
- ResInfo.getResInfoJson(Double.parseDouble(splits[2])
- ,Double.parseDouble(splits[3])),
- DateUtils.stringToDate(splits[5]));
- logger.debug("node updated : {}",event.getData().getPath());
- }
break;
default:
break;