提交 1a4b2886 编写于 作者: T Tboy 提交者: qiaozhanwei

refactor masterServer and workerServer (#1244)

* move updateTaskState into try/catch block in case of exception

* fix NPE

* using conf.getInt instead of getString

* for AbstractZKClient, remove the log, for it will print the same log message in createZNodePath.
for AlertDao, correct the spelling.

* duplicate

* refactor getTaskWorkerGroupId

* add friendly log

* update hearbeat thread num = 1

* fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread

* 1. move verifyTaskInstanceIsNull after taskInstance
2. keep verifyTenantIsNull/verifyTaskInstanceIsNull clean and readable

* fix the message

* delete before check to avoid KeeperException$NoNodeException

* fix the message

* check processInstance state before delete tenant

* check processInstance state before delete worker group

* refactor

* merge api constants into common constatns

* update the resource perm

* update the dataSource perm

* fix CheckUtils.checkUserParams method

* update AlertGroupService, extends from BaseService, remove duplicate methods

* refactor

* modify method name

* add hasProjectAndPerm method

* using checkProject instead of getResultStatus

* delete checkAuth method, using hasProjectAndPerm instead.

* correct spelling

* add transactional for deleteWorkerGroupById

* add Transactional for deleteProcessInstanceById method

* change sqlSessionTemplate singleton

* change sqlSessionTemplate singleton and reformat code

* fix unsuitable error message

* update shutdownhook methods

* fix worker log bug

* fix api server debug mode bug

* upgrade zk version

* delete this line ,for zkClient.close() will do the whole thing

* fix master server shutdown error

* degrade zk version and add FourLetterWordMain class

* fix PathChildrenCache not close

* add Transactional for createSession method

* add more message for java-doc

* delete App, let spring manage connectionFactory

* add license

* add class Application for test support

* refactor masterServer and workerServer

* add args
上级 d962dee6
......@@ -17,16 +17,12 @@
package org.apache.dolphinscheduler.api;
import org.apache.dolphinscheduler.alert.AlertServer;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.master.MasterServer;
import org.apache.dolphinscheduler.server.rpc.LoggerServer;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
......@@ -38,15 +34,11 @@ public class CombinedApplicationServer extends SpringBootServletInitializer {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ApiApplicationServer.class, args);
ProcessDao processDao = context.getBean(ProcessDao.class);
AlertDao alertDao = context.getBean(AlertDao.class);
ApiApplicationServer.main(args);
MasterServer master = new MasterServer(processDao);
master.run(processDao);
MasterServer.main(args);
WorkerServer workerServer = new WorkerServer(processDao);
workerServer.run(processDao);
WorkerServer.main(args);
LoggerServer server = new LoggerServer();
server.start();
......
......@@ -26,54 +26,19 @@ import org.springframework.context.annotation.ComponentScan;
/**
* master server
*/
@ComponentScan("org.apache.dolphinscheduler")
public abstract class AbstractServer implements CommandLineRunner, IStoppable {
public abstract class AbstractServer implements IStoppable {
/**
* logger of AbstractServer
*/
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
/**
* abstract server onfiguration
*/
protected static Configuration conf;
/**
* object lock
*/
protected final Object lock = new Object();
/**
* whether or not to close the state
*/
protected boolean terminated = false;
/**
* heartbeat interval, unit second
*/
protected int heartBeatInterval;
/**
* blocking implement
* @throws InterruptedException reasonInter
*/
public void awaitTermination() throws InterruptedException {
synchronized (lock) {
while (!terminated) {
lock.wait();
}
}
}
/**
* Callback used to run the bean.
* @param args incoming main method arguments
* @throws Exception on error
*/
@Override
public abstract void run(String... args) throws Exception;
/**
* gracefully stop
* @param cause why stopping
......
......@@ -36,6 +36,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -54,7 +55,7 @@ public class MasterServer extends AbstractServer {
/**
* zk master client
*/
private static ZKMasterClient zkMasterClient = null;
private ZKMasterClient zkMasterClient = null;
/**
* heartbeat thread pool
......@@ -72,25 +73,6 @@ public class MasterServer extends AbstractServer {
*/
private ExecutorService masterSchedulerService;
/**
* default constructor
*/
public MasterServer(){}
/**
* constructor of MasterServers
* @param processDao process dao
*/
public MasterServer(ProcessDao processDao){
try {
conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.error("load configuration failed : " + e.getMessage(),e);
System.exit(1);
}
zkMasterClient = ZKMasterClient.getZKMasterClient(processDao);
this.masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
}
/**
* master server startup
......@@ -99,29 +81,26 @@ public class MasterServer extends AbstractServer {
* @param args arguments
*/
public static void main(String[] args) {
SpringApplication app = new SpringApplication(MasterServer.class);
SpringApplication.run(MasterServer.class, args);
app.run(args);
}
@Override
public void run(String... strings) throws Exception {
MasterServer masterServer = new MasterServer(processDao);
masterServer.run(processDao);
logger.info("master server started");
// blocking
masterServer.awaitTermination();
}
/**
* run master server
* @param processDao process dao
*/
public void run(ProcessDao processDao){
@PostConstruct
public void run(){
try {
conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.error("load configuration failed : " + e.getMessage(),e);
System.exit(1);
}
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
zkMasterClient = ZKMasterClient.getZKMasterClient(processDao);
// heartbeat interval
heartBeatInterval = conf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL,
......@@ -251,10 +230,6 @@ public class MasterServer extends AbstractServer {
logger.info("zookeeper service stopped");
synchronized (lock) {
terminated = true;
lock.notifyAll();
}
} catch (Exception e) {
logger.error("master server stop exception : " + e.getMessage(), e);
......
......@@ -23,13 +23,13 @@ import org.springframework.stereotype.Component;
@Component
public class SpringApplication implements ApplicationContextAware {
public class SpringApplicationContext implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringApplication.applicationContext = applicationContext;
SpringApplicationContext.applicationContext = applicationContext;
}
public static <T> T getBean(Class<T> requiredType){
......
......@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.AbstractServer;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.slf4j.Logger;
......@@ -44,7 +45,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -64,7 +67,8 @@ public class WorkerServer extends AbstractServer {
/**
* zk worker client
*/
private static ZKWorkerClient zkWorkerClient = null;
private ZKWorkerClient zkWorkerClient = null;
/**
* process database access
......@@ -81,7 +85,7 @@ public class WorkerServer extends AbstractServer {
/**
* heartbeat thread pool
*/
private ScheduledExecutorService heartbeatWorerService;
private ScheduledExecutorService heartbeatWorkerService;
/**
* task queue impl
......@@ -98,25 +102,17 @@ public class WorkerServer extends AbstractServer {
*/
private ExecutorService fetchTaskExecutorService;
public WorkerServer(){
}
public WorkerServer(ProcessDao processDao){
try {
conf = new PropertiesConfiguration(Constants.WORKER_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.error("load configuration failed",e);
System.exit(1);
}
zkWorkerClient = ZKWorkerClient.getZKWorkerClient();
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor");
/**
* spring application context
* only use it for initialization
*/
@Autowired
private SpringApplicationContext springApplicationContext;
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
}
/**
* CountDownLatch latch
*/
private CountDownLatch latch;
/**
* master server startup
......@@ -125,38 +121,36 @@ public class WorkerServer extends AbstractServer {
* @param args arguments
*/
public static void main(String[] args) {
SpringApplication app = new SpringApplication(WorkerServer.class);
app.run(args);
SpringApplication.run(WorkerServer.class,args);
}
@Override
public void run(String... args) throws Exception {
// set the name of the current thread
Thread.currentThread().setName("Worker-Main-Thread");
/**
* worker server run
*/
@PostConstruct
public void run(){
WorkerServer workerServer = new WorkerServer(processDao);
try {
conf = new PropertiesConfiguration(Constants.WORKER_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.error("load configuration failed",e);
System.exit(1);
}
workerServer.run(processDao);
zkWorkerClient = ZKWorkerClient.getZKWorkerClient();
logger.info("worker server started");
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
// blocking
workerServer.awaitTermination();
}
this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor");
/**
* worker server run
* @param processDao process dao
*/
public void run(ProcessDao processDao){
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
// heartbeat interval
heartBeatInterval = conf.getInt(Constants.WORKER_HEARTBEAT_INTERVAL,
Constants.defaultWorkerHeartbeatInterval);
heartbeatWorerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.defaulWorkerHeartbeatThreadNum);
heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.defaulWorkerHeartbeatThreadNum);
// heartbeat thread implement
Runnable heartBeatThread = heartBeatThread();
......@@ -165,15 +159,25 @@ public class WorkerServer extends AbstractServer {
// regular heartbeat
// delay 5 seconds, send heartbeat every 30 seconds
heartbeatWorerService.
scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS);
heartbeatWorkerService.scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS);
// kill process thread implement
Runnable killProcessThread = getKillProcessThread(processDao);
Runnable killProcessThread = getKillProcessThread();
// submit kill process thread
killExecutorService.execute(killProcessThread);
// get worker number of concurrent tasks
int taskNum = conf.getInt(Constants.WORKER_FETCH_TASK_NUM,Constants.defaultWorkerFetchTaskNum);
// new fetch task thread
FetchTaskThread fetchTaskThread = new FetchTaskThread(taskNum,zkWorkerClient, processDao,conf, taskQueue);
// submit fetch task thread
fetchTaskExecutorService.execute(fetchTaskThread);
/**
* register hooks, which are called before the process exits
*/
......@@ -190,14 +194,12 @@ public class WorkerServer extends AbstractServer {
}
}));
// get worker number of concurrent tasks
int taskNum = conf.getInt(Constants.WORKER_FETCH_TASK_NUM,Constants.defaultWorkerFetchTaskNum);
// new fetch task thread
FetchTaskThread fetchTaskThread = new FetchTaskThread(taskNum,zkWorkerClient, processDao,conf, taskQueue);
// submit fetch task thread
fetchTaskExecutorService.execute(fetchTaskThread);
//let the main thread await
latch = new CountDownLatch(1);
try {
latch.await();
} catch (InterruptedException ignore) {
}
}
@Override
......@@ -222,7 +224,7 @@ public class WorkerServer extends AbstractServer {
}
try {
heartbeatWorerService.shutdownNow();
heartbeatWorkerService.shutdownNow();
}catch (Exception e){
logger.warn("heartbeat service stopped exception");
}
......@@ -255,13 +257,9 @@ public class WorkerServer extends AbstractServer {
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
}
latch.countDown();
logger.info("zookeeper service stopped");
//notify
synchronized (lock) {
terminated = true;
lock.notifyAll();
}
} catch (Exception e) {
logger.error("worker server stop exception : " + e.getMessage(), e);
System.exit(-1);
......@@ -295,7 +293,7 @@ public class WorkerServer extends AbstractServer {
*
* @return kill process thread
*/
private Runnable getKillProcessThread(ProcessDao processDao){
private Runnable getKillProcessThread(){
Runnable killProcessThread = new Runnable() {
@Override
public void run() {
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.slf4j.Logger;
/**
......@@ -48,7 +48,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
*/
public AbstractYarnTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
this.processDao = SpringApplication.getBean(ProcessDao.class);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskProps.getTaskDir(),
taskProps.getTaskAppId(),
......
......@@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -39,7 +39,7 @@ public class DependentExecute {
/**
* process dao
*/
private static final ProcessDao processDao = SpringApplication.getBean(ProcessDao.class);
private final ProcessDao processDao = SpringApplicationContext.getBean(ProcessDao.class);
/**
* depend item list
......
......@@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
......@@ -88,7 +88,7 @@ public class DependentTask extends AbstractTask {
taskModel.getDependItemList(), taskModel.getRelation()));
}
this.processDao = SpringApplication.getBean(ProcessDao.class);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
if(taskProps.getScheduleTime() != null){
this.dependentDate = taskProps.getScheduleTime();
......
......@@ -33,7 +33,7 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.http.HttpEntity;
......@@ -92,7 +92,7 @@ public class HttpTask extends AbstractTask {
*/
public HttpTask(TaskProps props, Logger logger) {
super(props, logger);
this.processDao = SpringApplication.getBean(ProcessDao.class);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
}
@Override
......
......@@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
......@@ -82,7 +82,7 @@ public class ProcedureTask extends AbstractTask {
throw new RuntimeException("procedure task params is not valid");
}
this.processDao = SpringApplication.getBean(ProcessDao.class);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
}
@Override
......
......@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
......@@ -76,7 +76,7 @@ public class PythonTask extends AbstractTask {
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
this.processDao = SpringApplication.getBean(ProcessDao.class);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
}
@Override
......
......@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
......@@ -84,7 +84,7 @@ public class ShellTask extends AbstractTask {
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
this.processDao = SpringApplication.getBean(ProcessDao.class);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
}
@Override
......
......@@ -43,7 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
......@@ -97,8 +97,8 @@ public class SqlTask extends AbstractTask {
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
this.processDao = SpringApplication.getBean(ProcessDao.class);
this.alertDao = SpringApplication.getBean(AlertDao.class);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
}
@Override
......
......@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
......@@ -47,7 +47,7 @@ public class ShellCommandExecutorTest {
@Before
public void before(){
processDao = SpringApplication.getBean(ProcessDao.class);
processDao = SpringApplicationContext.getBean(ProcessDao.class);
}
@Test
......
......@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
......@@ -48,7 +48,7 @@ public class SqlExecutorTest {
@Before
public void before(){
processDao = SpringApplication.getBean(ProcessDao.class);
processDao = SpringApplicationContext.getBean(ProcessDao.class);
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册