From 4acb9864a56cf4cce8b53eeccfec397fba63a0ec Mon Sep 17 00:00:00 2001 From: lidongdai Date: Wed, 4 Sep 2019 20:28:04 +0800 Subject: [PATCH] refactor worker server to springboot mode fix exception when failover worker if task host is null fix process instance is null when task instance id == 0 --- .../api/CombinedApplicationServer.java | 7 +- .../common/zk/AbstractZKClient.java | 8 +- .../java/cn/escheduler/dao/DaoFactory.java | 11 +- .../java/cn/escheduler/dao/ProcessDao.java | 5 +- .../java/cn/escheduler/dao/ServerDao.java | 6 +- .../cn/escheduler/dao/utils}/BeanContext.java | 2 +- .../server/master/MasterServer.java | 166 +++++------- .../runner/MasterBaseTaskExecThread.java | 2 +- .../server/worker/WorkerServer.java | 238 ++++++++---------- .../escheduler/server/zk/ZKMasterClient.java | 5 + .../escheduler/server/zk/ZKWorkerClient.java | 19 -- 11 files changed, 200 insertions(+), 269 deletions(-) rename {escheduler-server/src/main/java/cn/escheduler/server => escheduler-dao/src/main/java/cn/escheduler/dao/utils}/BeanContext.java (98%) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/CombinedApplicationServer.java b/escheduler-api/src/main/java/cn/escheduler/api/CombinedApplicationServer.java index 5f5927ea6..fc421944e 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/CombinedApplicationServer.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/CombinedApplicationServer.java @@ -17,6 +17,7 @@ package cn.escheduler.api; import cn.escheduler.alert.AlertServer; +import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.ProcessDao; import cn.escheduler.server.master.MasterServer; import cn.escheduler.server.rpc.LoggerServer; @@ -39,11 +40,13 @@ public class CombinedApplicationServer extends SpringBootServletInitializer { ConfigurableApplicationContext context = SpringApplication.run(ApiApplicationServer.class, args); ProcessDao processDao = context.getBean(ProcessDao.class); + AlertDao alertDao = context.getBean(AlertDao.class); + MasterServer master = new MasterServer(processDao); master.run(processDao); - WorkerServer workerServer = new WorkerServer(); - workerServer.run(); + WorkerServer workerServer = new WorkerServer(processDao, alertDao); + workerServer.run(processDao, alertDao); LoggerServer server = new LoggerServer(); server.start(); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index aa88d8e2c..7a1d63cdf 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -20,7 +20,6 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.IStoppable; import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.model.MasterServer; -import cn.escheduler.common.enums.ServerEnum; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.ResInfo; @@ -308,9 +307,12 @@ public abstract class AbstractZKClient { childrenList = zkClient.getChildren().forPath(getZNodeParentPath(ZKNodeType.MASTER)); } } catch (Exception e) { - if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){ - logger.warn(e.getMessage(),e); + if(e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){ + logger.error("zookeeper service not started",e); + }else{ + logger.error(e.getMessage(),e); } + }finally { return childrenList.size(); } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/DaoFactory.java b/escheduler-dao/src/main/java/cn/escheduler/dao/DaoFactory.java index 0a523e3b8..97e65115d 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/DaoFactory.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/DaoFactory.java @@ -16,10 +16,10 @@ */ package cn.escheduler.dao; +import cn.escheduler.dao.utils.BeanContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -48,11 +48,12 @@ public class DaoFactory { synchronized (daoMap) { if (!daoMap.containsKey(className)) { try { - T t = clazz.getConstructor().newInstance(); - // 实例初始化 - t.init(); + T t = BeanContext.getBean(clazz); +// T t = clazz.getConstructor().newInstance(); +// // 实例初始化 +// t.init(); daoMap.put(className, t); - } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) { + } catch (Exception e) { logger.error(e.getMessage(), e); } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index b07cd92d8..ee0dd4857 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -1772,7 +1772,10 @@ public class ProcessDao extends AbstractBaseDao { */ public int getTaskWorkerGroupId(TaskInstance taskInstance) { int taskWorkerGroupId = taskInstance.getWorkerGroupId(); - ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId()); + int processInstanceId = taskInstance.getProcessInstanceId(); + + ProcessInstance processInstance = findProcessInstanceById(processInstanceId); + if(processInstance == null){ logger.error("cannot find the task:{} process instance", taskInstance.getId()); return Constants.DEFAULT_WORKER_ID; diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java index 36823d8b2..651291607 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java @@ -16,19 +16,21 @@ */ package cn.escheduler.dao; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.mapper.MasterServerMapper; import cn.escheduler.dao.mapper.WorkerServerMapper; -import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.WorkerServer; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import java.util.Date; import static cn.escheduler.dao.datasource.ConnectionFactory.getMapper; /** - * master server + * server dao */ +@Component public class ServerDao extends AbstractBaseDao { @Autowired diff --git a/escheduler-server/src/main/java/cn/escheduler/server/BeanContext.java b/escheduler-dao/src/main/java/cn/escheduler/dao/utils/BeanContext.java similarity index 98% rename from escheduler-server/src/main/java/cn/escheduler/server/BeanContext.java rename to escheduler-dao/src/main/java/cn/escheduler/dao/utils/BeanContext.java index 7b653963f..b768c050f 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/BeanContext.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/utils/BeanContext.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cn.escheduler.server; +package cn.escheduler.dao.utils; import org.springframework.beans.BeansException; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java index 457302359..231273e2a 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java @@ -16,20 +16,16 @@ */ package cn.escheduler.server.master; -import cn.escheduler.server.quartz.ProcessScheduleJob; -import cn.escheduler.server.quartz.QuartzExecutors; import cn.escheduler.common.Constants; -import cn.escheduler.common.IStoppable; import cn.escheduler.common.thread.Stopper; import cn.escheduler.common.thread.ThreadPoolExecutors; import cn.escheduler.common.thread.ThreadUtils; import cn.escheduler.common.utils.OSUtils; -import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.ProcessDao; -import cn.escheduler.dao.ServerDao; import cn.escheduler.server.master.runner.MasterSchedulerThread; +import cn.escheduler.server.quartz.ProcessScheduleJob; +import cn.escheduler.server.quartz.QuartzExecutors; import cn.escheduler.server.zk.ZKMasterClient; -import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang3.StringUtils; @@ -37,8 +33,8 @@ import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.SpringApplication; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import java.util.concurrent.ExecutorService; @@ -49,77 +45,71 @@ import java.util.concurrent.TimeUnit; * master server */ @ComponentScan("cn.escheduler") -public class MasterServer implements CommandLineRunner, IStoppable { +public class MasterServer extends AbstractServer { private static final Logger logger = LoggerFactory.getLogger(MasterServer.class); - /** - * conf - */ - private static Configuration conf; - - /** - * object lock - */ - private final Object lock = new Object(); - - /** - * whether or not to close the state - */ - private boolean terminated = false; - /** * zk master client */ - private static ZKMasterClient zkMasterClient=null; - - - /** - * master dao database access - */ - private ServerDao serverDao = null; + private static ZKMasterClient zkMasterClient = null; /** - * alert database access + * heartbeat thread pool */ - private AlertDao alertDao = null; + private ScheduledExecutorService heartbeatMasterService; /** * escheduler database interface */ @Autowired - private ProcessDao processDao; - - /** - * heartbeat thread pool - */ - private ScheduledExecutorService heartbeatMasterService; - + protected ProcessDao processDao; /** * master exec thread pool */ - private final ExecutorService masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); + private ExecutorService masterSchedulerService; - /** - * heartbeat interval, unit second - */ - private int heartBeatInterval; + public MasterServer(){} - static { + public MasterServer(ProcessDao processDao){ try { conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH); }catch (ConfigurationException e){ logger.error("load configuration failed : " + e.getMessage(),e); System.exit(1); } + zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); + this.masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); } - public MasterServer(){} - public MasterServer(ProcessDao processDao){ - zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); + /** + * master server startup + * + * master server not use web service + */ + public static void main(String[] args) { + SpringApplicationBuilder app = new SpringApplicationBuilder(MasterServer.class); + + app.web(WebApplicationType.NONE) + .run(args); + } + + + @Override + public void run(String... strings) throws Exception { + + MasterServer masterServer = new MasterServer(processDao); + + masterServer.run(processDao); + + logger.info("master server started"); + // blocking + masterServer.awaitTermination(); } + + public void run(ProcessDao processDao){ // heartbeat interval @@ -153,7 +143,6 @@ public class MasterServer implements CommandLineRunner, IStoppable { masterSchedulerService.execute(masterSchedulerThread); // start QuartzExecutors - // TODO... // what system should do if exception try { ProcessScheduleJob.init(processDao); @@ -186,60 +175,6 @@ public class MasterServer implements CommandLineRunner, IStoppable { } - public static void main(String[] args) { - SpringApplication app = new SpringApplication(MasterServer.class); - app.run(args); - } - - - /** - * blocking implement - * @throws InterruptedException - */ - public void awaitTermination() throws InterruptedException { - synchronized (lock) { - while (!terminated) { - lock.wait(); - } - } - } - - /** - * heartbeat thread implement - * @return - */ - public Runnable heartBeatThread(){ - Runnable heartBeatThread = new Runnable() { - @Override - public void run() { - if(Stopper.isRunning()) { - // send heartbeat to zk - if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) { - logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server"); - return; - } - - zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX); - } - } - }; - return heartBeatThread; - } - - @Override - public void run(String... strings) throws Exception { - - MasterServer masterServer = new MasterServer(processDao); - - masterServer.run(processDao); - - logger.info("master server started"); - // blocking - masterServer.awaitTermination(); - - - } - /** * gracefully stop * @param cause why stopping @@ -315,5 +250,28 @@ public class MasterServer implements CommandLineRunner, IStoppable { System.exit(-1); } } + + + /** + * heartbeat thread implement + * @return + */ + private Runnable heartBeatThread(){ + Runnable heartBeatThread = new Runnable() { + @Override + public void run() { + if(Stopper.isRunning()) { + // send heartbeat to zk + if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) { + logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server"); + return; + } + + zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX); + } + } + }; + return heartBeatThread; + } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterBaseTaskExecThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterBaseTaskExecThread.java index ed427992b..dc9ac2216 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -23,7 +23,7 @@ import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; -import cn.escheduler.server.BeanContext; +import cn.escheduler.dao.utils.BeanContext; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java index fec8c0f9d..af9a8ee99 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java @@ -17,7 +17,6 @@ package cn.escheduler.server.worker; import cn.escheduler.common.Constants; -import cn.escheduler.common.IStoppable; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.queue.ITaskQueue; @@ -28,20 +27,21 @@ import cn.escheduler.common.thread.ThreadUtils; import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.dao.AlertDao; -import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.ProcessDao; -import cn.escheduler.dao.ServerDao; -import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; +import cn.escheduler.server.master.AbstractServer; import cn.escheduler.server.utils.ProcessUtils; import cn.escheduler.server.worker.runner.FetchTaskThread; import cn.escheduler.server.zk.ZKWorkerClient; -import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.ComponentScan; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -51,55 +51,34 @@ import java.util.concurrent.TimeUnit; /** * worker server */ -public class WorkerServer implements IStoppable { +@ComponentScan("cn.escheduler") +public class WorkerServer extends AbstractServer { private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); - /** - * conf - */ - private static Configuration conf; - - /** - * object lock - */ - private final Object lock = new Object(); - - /** - * whether or not to close the state - */ - private boolean terminated = false; /** * zk worker client */ private static ZKWorkerClient zkWorkerClient = null; - /** - * worker dao database access - */ - private ServerDao serverDao = null; - /** * process database access */ - private final ProcessDao processDao; + @Autowired + private ProcessDao processDao; /** * alert database access */ - private final AlertDao alertDao; + @Autowired + private AlertDao alertDao; /** * heartbeat thread pool */ private ScheduledExecutorService heartbeatWorerService; - /** - * heartbeat interval, unit second - */ - private int heartBeatInterval; - /** * task queue impl */ @@ -115,29 +94,57 @@ public class WorkerServer implements IStoppable { */ private ExecutorService fetchTaskExecutorService; - static { + public WorkerServer(){} + + public WorkerServer(ProcessDao processDao, AlertDao alertDao){ try { conf = new PropertiesConfiguration(Constants.WORKER_PROPERTIES_PATH); }catch (ConfigurationException e){ logger.error("load configuration failed",e); System.exit(1); } - } - public WorkerServer(){ zkWorkerClient = ZKWorkerClient.getZKWorkerClient(); - this.serverDao = zkWorkerClient.getServerDao(); - this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); - this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); - taskQueue = TaskQueueFactory.getTaskQueueInstance(); - killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor"); + this.taskQueue = TaskQueueFactory.getTaskQueueInstance(); - fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); + this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor"); + this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); } - public void run(){ + + /** + * master server startup + * + * master server not use web service + */ + public static void main(String[] args) { + + SpringApplicationBuilder app = new SpringApplicationBuilder(WorkerServer.class); + + app.web(WebApplicationType.NONE) + .run(args); + } + + + @Override + public void run(String... args) throws Exception { + // set the name of the current thread + Thread.currentThread().setName("Worker-Main-Thread"); + + WorkerServer workerServer = new WorkerServer(processDao,alertDao); + + workerServer.run(processDao,alertDao); + + logger.info("worker server started"); + + // blocking + workerServer.awaitTermination(); + } + + + public void run(ProcessDao processDao, AlertDao alertDao){ // heartbeat interval heartBeatInterval = conf.getInt(Constants.WORKER_HEARTBEAT_INTERVAL, @@ -187,45 +194,82 @@ public class WorkerServer implements IStoppable { // submit fetch task thread fetchTaskExecutorService.execute(fetchTaskThread); + } + @Override + public synchronized void stop(String cause) { - } + try { + //execute only once + if(Stopper.isStoped()){ + return; + } - public static void main(String[] args)throws Exception{ + logger.info("worker server is stopping ..., cause : {}", cause); - // set the name of the current thread - Thread.currentThread().setName("Worker-Main-Thread"); + // set stop signal is true + Stopper.stop(); - WorkerServer workerServer = new WorkerServer(); + try { + //thread sleep 3 seconds for thread quitely stop + Thread.sleep(3000L); + }catch (Exception e){ + logger.warn("thread sleep exception:" + e.getMessage(), e); + } - workerServer.run(); + try { + heartbeatWorerService.shutdownNow(); + }catch (Exception e){ + logger.warn("heartbeat service stopped exception"); + } + logger.info("heartbeat service stopped"); - logger.info("worker server started"); + try { + ThreadPoolExecutors.getInstance().shutdown(); + }catch (Exception e){ + logger.warn("threadpool service stopped exception:{}",e.getMessage()); + } - // blocking - workerServer.awaitTermination(); + logger.info("threadpool service stopped"); + try { + killExecutorService.shutdownNow(); + }catch (Exception e){ + logger.warn("worker kill executor service stopped exception:{}",e.getMessage()); + } + logger.info("worker kill executor service stopped"); - } + try { + fetchTaskExecutorService.shutdownNow(); + }catch (Exception e){ + logger.warn("worker fetch task service stopped exception:{}",e.getMessage()); + } + logger.info("worker fetch task service stopped"); + try{ + zkWorkerClient.close(); + }catch (Exception e){ + logger.warn("zookeeper service stopped exception:{}",e.getMessage()); + } + logger.info("zookeeper service stopped"); - /** - * blocking implement - * @throws InterruptedException - */ - public void awaitTermination() throws InterruptedException { - synchronized (lock) { - while (!terminated) { - lock.wait(); + //notify + synchronized (lock) { + terminated = true; + lock.notifyAll(); } + } catch (Exception e) { + logger.error("worker server stop exception : " + e.getMessage(), e); + System.exit(-1); } } + /** * heartbeat thread implement * @return */ - public Runnable heartBeatThread(){ + private Runnable heartBeatThread(){ Runnable heartBeatThread = new Runnable() { @Override public void run() { @@ -240,11 +284,12 @@ public class WorkerServer implements IStoppable { return heartBeatThread; } + /** * kill process thread implement * @return */ - public Runnable getKillProcessThread(){ + private Runnable getKillProcessThread(){ Runnable killProcessThread = new Runnable() { @Override public void run() { @@ -286,74 +331,5 @@ public class WorkerServer implements IStoppable { return killProcessThread; } - - - @Override - public synchronized void stop(String cause) { - - try { - //execute only once - if(Stopper.isStoped()){ - return; - } - - logger.info("worker server is stopping ..., cause : {}", cause); - - // set stop signal is true - Stopper.stop(); - - try { - //thread sleep 3 seconds for thread quitely stop - Thread.sleep(3000L); - }catch (Exception e){ - logger.warn("thread sleep exception:" + e.getMessage(), e); - } - - try { - heartbeatWorerService.shutdownNow(); - }catch (Exception e){ - logger.warn("heartbeat service stopped exception"); - } - logger.info("heartbeat service stopped"); - - try { - ThreadPoolExecutors.getInstance().shutdown(); - }catch (Exception e){ - logger.warn("threadpool service stopped exception:{}",e.getMessage()); - } - - logger.info("threadpool service stopped"); - - try { - killExecutorService.shutdownNow(); - }catch (Exception e){ - logger.warn("worker kill executor service stopped exception:{}",e.getMessage()); - } - logger.info("worker kill executor service stopped"); - - try { - fetchTaskExecutorService.shutdownNow(); - }catch (Exception e){ - logger.warn("worker fetch task service stopped exception:{}",e.getMessage()); - } - logger.info("worker fetch task service stopped"); - - try{ - zkWorkerClient.close(); - }catch (Exception e){ - logger.warn("zookeeper service stopped exception:{}",e.getMessage()); - } - logger.info("zookeeper service stopped"); - - //notify - synchronized (lock) { - terminated = true; - lock.notifyAll(); - } - } catch (Exception e) { - logger.error("worker server stop exception : " + e.getMessage(), e); - System.exit(-1); - } - } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index f4cec7303..d06918b0c 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -327,6 +327,11 @@ public class ZKMasterClient extends AbstractZKClient { boolean taskNeedFailover = true; + //now no host will execute this task instance,so no need to failover the task + if(taskInstance.getHost() == null){ + return false; + } + // if the worker node exists in zookeeper, we must check the task starts after the worker if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){ //if task start after worker starts, there is no need to failover the task. diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java index f97d95965..aeea65f2b 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java @@ -18,18 +18,13 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ZKNodeType; -import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; -import cn.escheduler.dao.DaoFactory; -import cn.escheduler.dao.ServerDao; -import cn.escheduler.common.utils.ResInfo; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ThreadUtils; -import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,11 +49,6 @@ public class ZKWorkerClient extends AbstractZKClient { */ private String workerZNode = null; - /** - * worker database access - */ - private ServerDao serverDao = null; - /** * create time */ @@ -77,8 +67,6 @@ public class ZKWorkerClient extends AbstractZKClient { * init */ private void init(){ - // init worker dao - serverDao = DaoFactory.getDaoInstance(ServerDao.class); // init system znode this.initSystemZNode(); @@ -103,13 +91,6 @@ public class ZKWorkerClient extends AbstractZKClient { return zkWorkerClient; } - /** - * get worker dao - * @return - */ - public ServerDao getServerDao(){ - return serverDao; - } /** * register worker -- GitLab