提交 4acb9864 编写于 作者: L lidongdai

refactor worker server to springboot mode

fix exception when failover worker if task host is null
fix process instance is null when task instance id == 0
上级 fb90fcea
......@@ -17,6 +17,7 @@
package cn.escheduler.api;
import cn.escheduler.alert.AlertServer;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.server.master.MasterServer;
import cn.escheduler.server.rpc.LoggerServer;
......@@ -39,11 +40,13 @@ public class CombinedApplicationServer extends SpringBootServletInitializer {
ConfigurableApplicationContext context = SpringApplication.run(ApiApplicationServer.class, args);
ProcessDao processDao = context.getBean(ProcessDao.class);
AlertDao alertDao = context.getBean(AlertDao.class);
MasterServer master = new MasterServer(processDao);
master.run(processDao);
WorkerServer workerServer = new WorkerServer();
workerServer.run();
WorkerServer workerServer = new WorkerServer(processDao, alertDao);
workerServer.run(processDao, alertDao);
LoggerServer server = new LoggerServer();
server.start();
......
......@@ -20,7 +20,6 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.IStoppable;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.enums.ServerEnum;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.utils.ResInfo;
......@@ -308,9 +307,12 @@ public abstract class AbstractZKClient {
childrenList = zkClient.getChildren().forPath(getZNodeParentPath(ZKNodeType.MASTER));
}
} catch (Exception e) {
if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
logger.warn(e.getMessage(),e);
if(e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
logger.error("zookeeper service not started",e);
}else{
logger.error(e.getMessage(),e);
}
}finally {
return childrenList.size();
}
......
......@@ -16,10 +16,10 @@
*/
package cn.escheduler.dao;
import cn.escheduler.dao.utils.BeanContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -48,11 +48,12 @@ public class DaoFactory {
synchronized (daoMap) {
if (!daoMap.containsKey(className)) {
try {
T t = clazz.getConstructor().newInstance();
// 实例初始化
t.init();
T t = BeanContext.getBean(clazz);
// T t = clazz.getConstructor().newInstance();
// // 实例初始化
// t.init();
daoMap.put(className, t);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) {
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
......
......@@ -1772,7 +1772,10 @@ public class ProcessDao extends AbstractBaseDao {
*/
public int getTaskWorkerGroupId(TaskInstance taskInstance) {
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId());
int processInstanceId = taskInstance.getProcessInstanceId();
ProcessInstance processInstance = findProcessInstanceById(processInstanceId);
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
return Constants.DEFAULT_WORKER_ID;
......
......@@ -16,19 +16,21 @@
*/
package cn.escheduler.dao;
import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.mapper.MasterServerMapper;
import cn.escheduler.dao.mapper.WorkerServerMapper;
import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.WorkerServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import static cn.escheduler.dao.datasource.ConnectionFactory.getMapper;
/**
* master server
* server dao
*/
@Component
public class ServerDao extends AbstractBaseDao {
@Autowired
......
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.server;
package cn.escheduler.dao.utils;
import org.springframework.beans.BeansException;
......
......@@ -16,20 +16,16 @@
*/
package cn.escheduler.server.master;
import cn.escheduler.server.quartz.ProcessScheduleJob;
import cn.escheduler.server.quartz.QuartzExecutors;
import cn.escheduler.common.Constants;
import cn.escheduler.common.IStoppable;
import cn.escheduler.common.thread.Stopper;
import cn.escheduler.common.thread.ThreadPoolExecutors;
import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.ServerDao;
import cn.escheduler.server.master.runner.MasterSchedulerThread;
import cn.escheduler.server.quartz.ProcessScheduleJob;
import cn.escheduler.server.quartz.QuartzExecutors;
import cn.escheduler.server.zk.ZKMasterClient;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
......@@ -37,8 +33,8 @@ import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.ExecutorService;
......@@ -49,77 +45,71 @@ import java.util.concurrent.TimeUnit;
* master server
*/
@ComponentScan("cn.escheduler")
public class MasterServer implements CommandLineRunner, IStoppable {
public class MasterServer extends AbstractServer {
private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
/**
* conf
*/
private static Configuration conf;
/**
* object lock
*/
private final Object lock = new Object();
/**
* whether or not to close the state
*/
private boolean terminated = false;
/**
* zk master client
*/
private static ZKMasterClient zkMasterClient=null;
/**
* master dao database access
*/
private ServerDao serverDao = null;
private static ZKMasterClient zkMasterClient = null;
/**
* alert database access
* heartbeat thread pool
*/
private AlertDao alertDao = null;
private ScheduledExecutorService heartbeatMasterService;
/**
* escheduler database interface
*/
@Autowired
private ProcessDao processDao;
/**
* heartbeat thread pool
*/
private ScheduledExecutorService heartbeatMasterService;
protected ProcessDao processDao;
/**
* master exec thread pool
*/
private final ExecutorService masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
private ExecutorService masterSchedulerService;
/**
* heartbeat interval, unit second
*/
private int heartBeatInterval;
public MasterServer(){}
static {
public MasterServer(ProcessDao processDao){
try {
conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.error("load configuration failed : " + e.getMessage(),e);
System.exit(1);
}
zkMasterClient = ZKMasterClient.getZKMasterClient(processDao);
this.masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
}
public MasterServer(){}
public MasterServer(ProcessDao processDao){
zkMasterClient = ZKMasterClient.getZKMasterClient(processDao);
/**
* master server startup
*
* master server not use web service
*/
public static void main(String[] args) {
SpringApplicationBuilder app = new SpringApplicationBuilder(MasterServer.class);
app.web(WebApplicationType.NONE)
.run(args);
}
@Override
public void run(String... strings) throws Exception {
MasterServer masterServer = new MasterServer(processDao);
masterServer.run(processDao);
logger.info("master server started");
// blocking
masterServer.awaitTermination();
}
public void run(ProcessDao processDao){
// heartbeat interval
......@@ -153,7 +143,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
masterSchedulerService.execute(masterSchedulerThread);
// start QuartzExecutors
// TODO...
// what system should do if exception
try {
ProcessScheduleJob.init(processDao);
......@@ -186,60 +175,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
}
public static void main(String[] args) {
SpringApplication app = new SpringApplication(MasterServer.class);
app.run(args);
}
/**
* blocking implement
* @throws InterruptedException
*/
public void awaitTermination() throws InterruptedException {
synchronized (lock) {
while (!terminated) {
lock.wait();
}
}
}
/**
* heartbeat thread implement
* @return
*/
public Runnable heartBeatThread(){
Runnable heartBeatThread = new Runnable() {
@Override
public void run() {
if(Stopper.isRunning()) {
// send heartbeat to zk
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
return;
}
zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
}
}
};
return heartBeatThread;
}
@Override
public void run(String... strings) throws Exception {
MasterServer masterServer = new MasterServer(processDao);
masterServer.run(processDao);
logger.info("master server started");
// blocking
masterServer.awaitTermination();
}
/**
* gracefully stop
* @param cause why stopping
......@@ -315,5 +250,28 @@ public class MasterServer implements CommandLineRunner, IStoppable {
System.exit(-1);
}
}
/**
* heartbeat thread implement
* @return
*/
private Runnable heartBeatThread(){
Runnable heartBeatThread = new Runnable() {
@Override
public void run() {
if(Stopper.isRunning()) {
// send heartbeat to zk
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
return;
}
zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
}
}
};
return heartBeatThread;
}
}
......@@ -23,7 +23,7 @@ import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.BeanContext;
import cn.escheduler.dao.utils.BeanContext;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
......
......@@ -17,7 +17,6 @@
package cn.escheduler.server.worker;
import cn.escheduler.common.Constants;
import cn.escheduler.common.IStoppable;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.TaskType;
import cn.escheduler.common.queue.ITaskQueue;
......@@ -28,20 +27,21 @@ import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.ServerDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.master.AbstractServer;
import cn.escheduler.server.utils.ProcessUtils;
import cn.escheduler.server.worker.runner.FetchTaskThread;
import cn.escheduler.server.zk.ZKWorkerClient;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import java.util.Set;
import java.util.concurrent.ExecutorService;
......@@ -51,55 +51,34 @@ import java.util.concurrent.TimeUnit;
/**
* worker server
*/
public class WorkerServer implements IStoppable {
@ComponentScan("cn.escheduler")
public class WorkerServer extends AbstractServer {
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/**
* conf
*/
private static Configuration conf;
/**
* object lock
*/
private final Object lock = new Object();
/**
* whether or not to close the state
*/
private boolean terminated = false;
/**
* zk worker client
*/
private static ZKWorkerClient zkWorkerClient = null;
/**
* worker dao database access
*/
private ServerDao serverDao = null;
/**
* process database access
*/
private final ProcessDao processDao;
@Autowired
private ProcessDao processDao;
/**
* alert database access
*/
private final AlertDao alertDao;
@Autowired
private AlertDao alertDao;
/**
* heartbeat thread pool
*/
private ScheduledExecutorService heartbeatWorerService;
/**
* heartbeat interval, unit second
*/
private int heartBeatInterval;
/**
* task queue impl
*/
......@@ -115,29 +94,57 @@ public class WorkerServer implements IStoppable {
*/
private ExecutorService fetchTaskExecutorService;
static {
public WorkerServer(){}
public WorkerServer(ProcessDao processDao, AlertDao alertDao){
try {
conf = new PropertiesConfiguration(Constants.WORKER_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.error("load configuration failed",e);
System.exit(1);
}
}
public WorkerServer(){
zkWorkerClient = ZKWorkerClient.getZKWorkerClient();
this.serverDao = zkWorkerClient.getServerDao();
this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
taskQueue = TaskQueueFactory.getTaskQueueInstance();
killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor");
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor");
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
}
public void run(){
/**
* master server startup
*
* master server not use web service
*/
public static void main(String[] args) {
SpringApplicationBuilder app = new SpringApplicationBuilder(WorkerServer.class);
app.web(WebApplicationType.NONE)
.run(args);
}
@Override
public void run(String... args) throws Exception {
// set the name of the current thread
Thread.currentThread().setName("Worker-Main-Thread");
WorkerServer workerServer = new WorkerServer(processDao,alertDao);
workerServer.run(processDao,alertDao);
logger.info("worker server started");
// blocking
workerServer.awaitTermination();
}
public void run(ProcessDao processDao, AlertDao alertDao){
// heartbeat interval
heartBeatInterval = conf.getInt(Constants.WORKER_HEARTBEAT_INTERVAL,
......@@ -187,45 +194,82 @@ public class WorkerServer implements IStoppable {
// submit fetch task thread
fetchTaskExecutorService.execute(fetchTaskThread);
}
@Override
public synchronized void stop(String cause) {
}
try {
//execute only once
if(Stopper.isStoped()){
return;
}
public static void main(String[] args)throws Exception{
logger.info("worker server is stopping ..., cause : {}", cause);
// set the name of the current thread
Thread.currentThread().setName("Worker-Main-Thread");
// set stop signal is true
Stopper.stop();
WorkerServer workerServer = new WorkerServer();
try {
//thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
}catch (Exception e){
logger.warn("thread sleep exception:" + e.getMessage(), e);
}
workerServer.run();
try {
heartbeatWorerService.shutdownNow();
}catch (Exception e){
logger.warn("heartbeat service stopped exception");
}
logger.info("heartbeat service stopped");
logger.info("worker server started");
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("threadpool service stopped exception:{}",e.getMessage());
}
// blocking
workerServer.awaitTermination();
logger.info("threadpool service stopped");
try {
killExecutorService.shutdownNow();
}catch (Exception e){
logger.warn("worker kill executor service stopped exception:{}",e.getMessage());
}
logger.info("worker kill executor service stopped");
}
try {
fetchTaskExecutorService.shutdownNow();
}catch (Exception e){
logger.warn("worker fetch task service stopped exception:{}",e.getMessage());
}
logger.info("worker fetch task service stopped");
try{
zkWorkerClient.close();
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
}
logger.info("zookeeper service stopped");
/**
* blocking implement
* @throws InterruptedException
*/
public void awaitTermination() throws InterruptedException {
synchronized (lock) {
while (!terminated) {
lock.wait();
//notify
synchronized (lock) {
terminated = true;
lock.notifyAll();
}
} catch (Exception e) {
logger.error("worker server stop exception : " + e.getMessage(), e);
System.exit(-1);
}
}
/**
* heartbeat thread implement
* @return
*/
public Runnable heartBeatThread(){
private Runnable heartBeatThread(){
Runnable heartBeatThread = new Runnable() {
@Override
public void run() {
......@@ -240,11 +284,12 @@ public class WorkerServer implements IStoppable {
return heartBeatThread;
}
/**
* kill process thread implement
* @return
*/
public Runnable getKillProcessThread(){
private Runnable getKillProcessThread(){
Runnable killProcessThread = new Runnable() {
@Override
public void run() {
......@@ -286,74 +331,5 @@ public class WorkerServer implements IStoppable {
return killProcessThread;
}
@Override
public synchronized void stop(String cause) {
try {
//execute only once
if(Stopper.isStoped()){
return;
}
logger.info("worker server is stopping ..., cause : {}", cause);
// set stop signal is true
Stopper.stop();
try {
//thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
}catch (Exception e){
logger.warn("thread sleep exception:" + e.getMessage(), e);
}
try {
heartbeatWorerService.shutdownNow();
}catch (Exception e){
logger.warn("heartbeat service stopped exception");
}
logger.info("heartbeat service stopped");
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("threadpool service stopped exception:{}",e.getMessage());
}
logger.info("threadpool service stopped");
try {
killExecutorService.shutdownNow();
}catch (Exception e){
logger.warn("worker kill executor service stopped exception:{}",e.getMessage());
}
logger.info("worker kill executor service stopped");
try {
fetchTaskExecutorService.shutdownNow();
}catch (Exception e){
logger.warn("worker fetch task service stopped exception:{}",e.getMessage());
}
logger.info("worker fetch task service stopped");
try{
zkWorkerClient.close();
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
}
logger.info("zookeeper service stopped");
//notify
synchronized (lock) {
terminated = true;
lock.notifyAll();
}
} catch (Exception e) {
logger.error("worker server stop exception : " + e.getMessage(), e);
System.exit(-1);
}
}
}
......@@ -327,6 +327,11 @@ public class ZKMasterClient extends AbstractZKClient {
boolean taskNeedFailover = true;
//now no host will execute this task instance,so no need to failover the task
if(taskInstance.getHost() == null){
return false;
}
// if the worker node exists in zookeeper, we must check the task starts after the worker
if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
//if task start after worker starts, there is no need to failover the task.
......
......@@ -18,18 +18,13 @@ package cn.escheduler.server.zk;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ServerDao;
import cn.escheduler.common.utils.ResInfo;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -54,11 +49,6 @@ public class ZKWorkerClient extends AbstractZKClient {
*/
private String workerZNode = null;
/**
* worker database access
*/
private ServerDao serverDao = null;
/**
* create time
*/
......@@ -77,8 +67,6 @@ public class ZKWorkerClient extends AbstractZKClient {
* init
*/
private void init(){
// init worker dao
serverDao = DaoFactory.getDaoInstance(ServerDao.class);
// init system znode
this.initSystemZNode();
......@@ -103,13 +91,6 @@ public class ZKWorkerClient extends AbstractZKClient {
return zkWorkerClient;
}
/**
* get worker dao
* @return
*/
public ServerDao getServerDao(){
return serverDao;
}
/**
* register worker
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册