Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
三久
DolphinScheduler
提交
ddf6ad27
DolphinScheduler
项目概览
三久
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
ddf6ad27
编写于
9月 04, 2019
作者:
wu-sheng
提交者:
GitHub
9月 04, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #766 from dailidong/dev
refactor worker server to springboot mode
上级
a420a012
56fc32e5
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
200 addition
and
269 deletion
+200
-269
escheduler-api/src/main/java/cn/escheduler/api/CombinedApplicationServer.java
...ain/java/cn/escheduler/api/CombinedApplicationServer.java
+5
-2
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
...c/main/java/cn/escheduler/common/zk/AbstractZKClient.java
+5
-3
escheduler-dao/src/main/java/cn/escheduler/dao/DaoFactory.java
...duler-dao/src/main/java/cn/escheduler/dao/DaoFactory.java
+6
-5
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
...duler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
+4
-1
escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java
...eduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java
+4
-2
escheduler-dao/src/main/java/cn/escheduler/dao/utils/BeanContext.java
...ao/src/main/java/cn/escheduler/dao/utils/BeanContext.java
+1
-1
escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
...c/main/java/cn/escheduler/server/master/MasterServer.java
+62
-104
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterBaseTaskExecThread.java
...eduler/server/master/runner/MasterBaseTaskExecThread.java
+1
-1
escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java
...c/main/java/cn/escheduler/server/worker/WorkerServer.java
+107
-131
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
...src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
+5
-0
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java
...src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java
+0
-19
未找到文件。
escheduler-api/src/main/java/cn/escheduler/api/CombinedApplicationServer.java
浏览文件 @
ddf6ad27
...
...
@@ -17,6 +17,7 @@
package
cn.escheduler.api
;
import
cn.escheduler.alert.AlertServer
;
import
cn.escheduler.dao.AlertDao
;
import
cn.escheduler.dao.ProcessDao
;
import
cn.escheduler.server.master.MasterServer
;
import
cn.escheduler.server.rpc.LoggerServer
;
...
...
@@ -39,11 +40,13 @@ public class CombinedApplicationServer extends SpringBootServletInitializer {
ConfigurableApplicationContext
context
=
SpringApplication
.
run
(
ApiApplicationServer
.
class
,
args
);
ProcessDao
processDao
=
context
.
getBean
(
ProcessDao
.
class
);
AlertDao
alertDao
=
context
.
getBean
(
AlertDao
.
class
);
MasterServer
master
=
new
MasterServer
(
processDao
);
master
.
run
(
processDao
);
WorkerServer
workerServer
=
new
WorkerServer
();
workerServer
.
run
();
WorkerServer
workerServer
=
new
WorkerServer
(
processDao
,
alertDao
);
workerServer
.
run
(
processDao
,
alertDao
);
LoggerServer
server
=
new
LoggerServer
();
server
.
start
();
...
...
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
浏览文件 @
ddf6ad27
...
...
@@ -20,7 +20,6 @@ import cn.escheduler.common.Constants;
import
cn.escheduler.common.IStoppable
;
import
cn.escheduler.common.enums.ZKNodeType
;
import
cn.escheduler.common.model.MasterServer
;
import
cn.escheduler.common.enums.ServerEnum
;
import
cn.escheduler.common.utils.DateUtils
;
import
cn.escheduler.common.utils.OSUtils
;
import
cn.escheduler.common.utils.ResInfo
;
...
...
@@ -308,9 +307,12 @@ public abstract class AbstractZKClient {
childrenList
=
zkClient
.
getChildren
().
forPath
(
getZNodeParentPath
(
ZKNodeType
.
MASTER
));
}
}
catch
(
Exception
e
)
{
if
(!
e
.
getMessage
().
contains
(
"java.lang.IllegalStateException: instance must be started"
)){
logger
.
warn
(
e
.
getMessage
(),
e
);
if
(
e
.
getMessage
().
contains
(
"java.lang.IllegalStateException: instance must be started"
)){
logger
.
error
(
"zookeeper service not started"
,
e
);
}
else
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
finally
{
return
childrenList
.
size
();
}
...
...
escheduler-dao/src/main/java/cn/escheduler/dao/DaoFactory.java
浏览文件 @
ddf6ad27
...
...
@@ -16,10 +16,10 @@
*/
package
cn.escheduler.dao
;
import
cn.escheduler.dao.utils.BeanContext
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.lang.reflect.InvocationTargetException
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
...
...
@@ -48,11 +48,12 @@ public class DaoFactory {
synchronized
(
daoMap
)
{
if
(!
daoMap
.
containsKey
(
className
))
{
try
{
T
t
=
clazz
.
getConstructor
().
newInstance
();
// 实例初始化
t
.
init
();
T
t
=
BeanContext
.
getBean
(
clazz
);
// T t = clazz.getConstructor().newInstance();
// // 实例初始化
// t.init();
daoMap
.
put
(
className
,
t
);
}
catch
(
InstantiationException
|
IllegalAccessException
|
IllegalArgumentException
|
InvocationTargetException
|
NoSuchMethodException
|
Security
Exception
e
)
{
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
...
...
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
浏览文件 @
ddf6ad27
...
...
@@ -1772,7 +1772,10 @@ public class ProcessDao extends AbstractBaseDao {
*/
public
int
getTaskWorkerGroupId
(
TaskInstance
taskInstance
)
{
int
taskWorkerGroupId
=
taskInstance
.
getWorkerGroupId
();
ProcessInstance
processInstance
=
findProcessInstanceByTaskId
(
taskInstance
.
getId
());
int
processInstanceId
=
taskInstance
.
getProcessInstanceId
();
ProcessInstance
processInstance
=
findProcessInstanceById
(
processInstanceId
);
if
(
processInstance
==
null
){
logger
.
error
(
"cannot find the task:{} process instance"
,
taskInstance
.
getId
());
return
Constants
.
DEFAULT_WORKER_ID
;
...
...
escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java
浏览文件 @
ddf6ad27
...
...
@@ -16,19 +16,21 @@
*/
package
cn.escheduler.dao
;
import
cn.escheduler.common.model.MasterServer
;
import
cn.escheduler.dao.mapper.MasterServerMapper
;
import
cn.escheduler.dao.mapper.WorkerServerMapper
;
import
cn.escheduler.common.model.MasterServer
;
import
cn.escheduler.dao.model.WorkerServer
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
java.util.Date
;
import
static
cn
.
escheduler
.
dao
.
datasource
.
ConnectionFactory
.
getMapper
;
/**
*
master server
*
server dao
*/
@Component
public
class
ServerDao
extends
AbstractBaseDao
{
@Autowired
...
...
escheduler-
server/src/main/java/cn/escheduler/server
/BeanContext.java
→
escheduler-
dao/src/main/java/cn/escheduler/dao/utils
/BeanContext.java
浏览文件 @
ddf6ad27
...
...
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
cn.escheduler.
server
;
package
cn.escheduler.
dao.utils
;
import
org.springframework.beans.BeansException
;
...
...
escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
浏览文件 @
ddf6ad27
...
...
@@ -16,20 +16,16 @@
*/
package
cn.escheduler.server.master
;
import
cn.escheduler.server.quartz.ProcessScheduleJob
;
import
cn.escheduler.server.quartz.QuartzExecutors
;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.IStoppable
;
import
cn.escheduler.common.thread.Stopper
;
import
cn.escheduler.common.thread.ThreadPoolExecutors
;
import
cn.escheduler.common.thread.ThreadUtils
;
import
cn.escheduler.common.utils.OSUtils
;
import
cn.escheduler.dao.AlertDao
;
import
cn.escheduler.dao.ProcessDao
;
import
cn.escheduler.dao.ServerDao
;
import
cn.escheduler.server.master.runner.MasterSchedulerThread
;
import
cn.escheduler.server.quartz.ProcessScheduleJob
;
import
cn.escheduler.server.quartz.QuartzExecutors
;
import
cn.escheduler.server.zk.ZKMasterClient
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.configuration.ConfigurationException
;
import
org.apache.commons.configuration.PropertiesConfiguration
;
import
org.apache.commons.lang3.StringUtils
;
...
...
@@ -37,8 +33,8 @@ import org.quartz.SchedulerException;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.
CommandLineRunner
;
import
org.springframework.boot.
SpringApplication
;
import
org.springframework.boot.
WebApplicationType
;
import
org.springframework.boot.
builder.SpringApplicationBuilder
;
import
org.springframework.context.annotation.ComponentScan
;
import
java.util.concurrent.ExecutorService
;
...
...
@@ -49,77 +45,71 @@ import java.util.concurrent.TimeUnit;
* master server
*/
@ComponentScan
(
"cn.escheduler"
)
public
class
MasterServer
implements
CommandLineRunner
,
IStoppable
{
public
class
MasterServer
extends
AbstractServer
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
MasterServer
.
class
);
/**
* conf
*/
private
static
Configuration
conf
;
/**
* object lock
*/
private
final
Object
lock
=
new
Object
();
/**
* whether or not to close the state
*/
private
boolean
terminated
=
false
;
/**
* zk master client
*/
private
static
ZKMasterClient
zkMasterClient
=
null
;
/**
* master dao database access
*/
private
ServerDao
serverDao
=
null
;
private
static
ZKMasterClient
zkMasterClient
=
null
;
/**
*
alert database access
*
heartbeat thread pool
*/
private
AlertDao
alertDao
=
null
;
private
ScheduledExecutorService
heartbeatMasterService
;
/**
* escheduler database interface
*/
@Autowired
private
ProcessDao
processDao
;
/**
* heartbeat thread pool
*/
private
ScheduledExecutorService
heartbeatMasterService
;
protected
ProcessDao
processDao
;
/**
* master exec thread pool
*/
private
final
ExecutorService
masterSchedulerService
=
ThreadUtils
.
newDaemonSingleThreadExecutor
(
"Master-Scheduler-Thread"
)
;
private
ExecutorService
masterSchedulerService
;
/**
* heartbeat interval, unit second
*/
private
int
heartBeatInterval
;
public
MasterServer
(){}
static
{
public
MasterServer
(
ProcessDao
processDao
)
{
try
{
conf
=
new
PropertiesConfiguration
(
Constants
.
MASTER_PROPERTIES_PATH
);
}
catch
(
ConfigurationException
e
){
logger
.
error
(
"load configuration failed : "
+
e
.
getMessage
(),
e
);
System
.
exit
(
1
);
}
zkMasterClient
=
ZKMasterClient
.
getZKMasterClient
(
processDao
);
this
.
masterSchedulerService
=
ThreadUtils
.
newDaemonSingleThreadExecutor
(
"Master-Scheduler-Thread"
);
}
public
MasterServer
(){}
public
MasterServer
(
ProcessDao
processDao
){
zkMasterClient
=
ZKMasterClient
.
getZKMasterClient
(
processDao
);
/**
* master server startup
*
* master server not use web service
*/
public
static
void
main
(
String
[]
args
)
{
SpringApplicationBuilder
app
=
new
SpringApplicationBuilder
(
MasterServer
.
class
);
app
.
web
(
WebApplicationType
.
NONE
)
.
run
(
args
);
}
@Override
public
void
run
(
String
...
strings
)
throws
Exception
{
MasterServer
masterServer
=
new
MasterServer
(
processDao
);
masterServer
.
run
(
processDao
);
logger
.
info
(
"master server started"
);
// blocking
masterServer
.
awaitTermination
();
}
public
void
run
(
ProcessDao
processDao
){
// heartbeat interval
...
...
@@ -153,7 +143,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
masterSchedulerService
.
execute
(
masterSchedulerThread
);
// start QuartzExecutors
// TODO...
// what system should do if exception
try
{
ProcessScheduleJob
.
init
(
processDao
);
...
...
@@ -186,60 +175,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
}
public
static
void
main
(
String
[]
args
)
{
SpringApplication
app
=
new
SpringApplication
(
MasterServer
.
class
);
app
.
run
(
args
);
}
/**
* blocking implement
* @throws InterruptedException
*/
public
void
awaitTermination
()
throws
InterruptedException
{
synchronized
(
lock
)
{
while
(!
terminated
)
{
lock
.
wait
();
}
}
}
/**
* heartbeat thread implement
* @return
*/
public
Runnable
heartBeatThread
(){
Runnable
heartBeatThread
=
new
Runnable
()
{
@Override
public
void
run
()
{
if
(
Stopper
.
isRunning
())
{
// send heartbeat to zk
if
(
StringUtils
.
isBlank
(
zkMasterClient
.
getMasterZNode
()))
{
logger
.
error
(
"master send heartbeat to zk failed: can't find zookeeper path of master server"
);
return
;
}
zkMasterClient
.
heartBeatForZk
(
zkMasterClient
.
getMasterZNode
(),
Constants
.
MASTER_PREFIX
);
}
}
};
return
heartBeatThread
;
}
@Override
public
void
run
(
String
...
strings
)
throws
Exception
{
MasterServer
masterServer
=
new
MasterServer
(
processDao
);
masterServer
.
run
(
processDao
);
logger
.
info
(
"master server started"
);
// blocking
masterServer
.
awaitTermination
();
}
/**
* gracefully stop
* @param cause why stopping
...
...
@@ -315,5 +250,28 @@ public class MasterServer implements CommandLineRunner, IStoppable {
System
.
exit
(-
1
);
}
}
/**
* heartbeat thread implement
* @return
*/
private
Runnable
heartBeatThread
(){
Runnable
heartBeatThread
=
new
Runnable
()
{
@Override
public
void
run
()
{
if
(
Stopper
.
isRunning
())
{
// send heartbeat to zk
if
(
StringUtils
.
isBlank
(
zkMasterClient
.
getMasterZNode
()))
{
logger
.
error
(
"master send heartbeat to zk failed: can't find zookeeper path of master server"
);
return
;
}
zkMasterClient
.
heartBeatForZk
(
zkMasterClient
.
getMasterZNode
(),
Constants
.
MASTER_PREFIX
);
}
}
};
return
heartBeatThread
;
}
}
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterBaseTaskExecThread.java
浏览文件 @
ddf6ad27
...
...
@@ -23,7 +23,7 @@ import cn.escheduler.dao.AlertDao;
import
cn.escheduler.dao.ProcessDao
;
import
cn.escheduler.dao.model.ProcessInstance
;
import
cn.escheduler.dao.model.TaskInstance
;
import
cn.escheduler.
server
.BeanContext
;
import
cn.escheduler.
dao.utils
.BeanContext
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.configuration.ConfigurationException
;
import
org.apache.commons.configuration.PropertiesConfiguration
;
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java
浏览文件 @
ddf6ad27
...
...
@@ -17,7 +17,6 @@
package
cn.escheduler.server.worker
;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.IStoppable
;
import
cn.escheduler.common.enums.ExecutionStatus
;
import
cn.escheduler.common.enums.TaskType
;
import
cn.escheduler.common.queue.ITaskQueue
;
...
...
@@ -28,20 +27,21 @@ import cn.escheduler.common.thread.ThreadUtils;
import
cn.escheduler.common.utils.CollectionUtils
;
import
cn.escheduler.common.utils.OSUtils
;
import
cn.escheduler.dao.AlertDao
;
import
cn.escheduler.dao.DaoFactory
;
import
cn.escheduler.dao.ProcessDao
;
import
cn.escheduler.dao.ServerDao
;
import
cn.escheduler.dao.model.ProcessInstance
;
import
cn.escheduler.dao.model.TaskInstance
;
import
cn.escheduler.server.master.AbstractServer
;
import
cn.escheduler.server.utils.ProcessUtils
;
import
cn.escheduler.server.worker.runner.FetchTaskThread
;
import
cn.escheduler.server.zk.ZKWorkerClient
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.configuration.ConfigurationException
;
import
org.apache.commons.configuration.PropertiesConfiguration
;
import
org.apache.commons.lang.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.WebApplicationType
;
import
org.springframework.boot.builder.SpringApplicationBuilder
;
import
org.springframework.context.annotation.ComponentScan
;
import
java.util.Set
;
import
java.util.concurrent.ExecutorService
;
...
...
@@ -51,55 +51,34 @@ import java.util.concurrent.TimeUnit;
/**
* worker server
*/
public
class
WorkerServer
implements
IStoppable
{
@ComponentScan
(
"cn.escheduler"
)
public
class
WorkerServer
extends
AbstractServer
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
WorkerServer
.
class
);
/**
* conf
*/
private
static
Configuration
conf
;
/**
* object lock
*/
private
final
Object
lock
=
new
Object
();
/**
* whether or not to close the state
*/
private
boolean
terminated
=
false
;
/**
* zk worker client
*/
private
static
ZKWorkerClient
zkWorkerClient
=
null
;
/**
* worker dao database access
*/
private
ServerDao
serverDao
=
null
;
/**
* process database access
*/
private
final
ProcessDao
processDao
;
@Autowired
private
ProcessDao
processDao
;
/**
* alert database access
*/
private
final
AlertDao
alertDao
;
@Autowired
private
AlertDao
alertDao
;
/**
* heartbeat thread pool
*/
private
ScheduledExecutorService
heartbeatWorerService
;
/**
* heartbeat interval, unit second
*/
private
int
heartBeatInterval
;
/**
* task queue impl
*/
...
...
@@ -115,29 +94,57 @@ public class WorkerServer implements IStoppable {
*/
private
ExecutorService
fetchTaskExecutorService
;
static
{
public
WorkerServer
(){}
public
WorkerServer
(
ProcessDao
processDao
,
AlertDao
alertDao
){
try
{
conf
=
new
PropertiesConfiguration
(
Constants
.
WORKER_PROPERTIES_PATH
);
}
catch
(
ConfigurationException
e
){
logger
.
error
(
"load configuration failed"
,
e
);
System
.
exit
(
1
);
}
}
public
WorkerServer
(){
zkWorkerClient
=
ZKWorkerClient
.
getZKWorkerClient
();
this
.
serverDao
=
zkWorkerClient
.
getServerDao
();
this
.
alertDao
=
DaoFactory
.
getDaoInstance
(
AlertDao
.
class
);
this
.
processDao
=
DaoFactory
.
getDaoInstance
(
ProcessDao
.
class
);
taskQueue
=
TaskQueueFactory
.
getTaskQueueInstance
();
killExecutorService
=
ThreadUtils
.
newDaemonSingleThreadExecutor
(
"Worker-Kill-Thread-Executor"
);
this
.
taskQueue
=
TaskQueueFactory
.
getTaskQueueInstance
(
);
fetchTaskExecutorService
=
ThreadUtils
.
newDaemonSingleThreadExecutor
(
"Worker-Fetch
-Thread-Executor"
);
this
.
killExecutorService
=
ThreadUtils
.
newDaemonSingleThreadExecutor
(
"Worker-Kill
-Thread-Executor"
);
this
.
fetchTaskExecutorService
=
ThreadUtils
.
newDaemonSingleThreadExecutor
(
"Worker-Fetch-Thread-Executor"
);
}
public
void
run
(){
/**
* master server startup
*
* master server not use web service
*/
public
static
void
main
(
String
[]
args
)
{
SpringApplicationBuilder
app
=
new
SpringApplicationBuilder
(
WorkerServer
.
class
);
app
.
web
(
WebApplicationType
.
NONE
)
.
run
(
args
);
}
@Override
public
void
run
(
String
...
args
)
throws
Exception
{
// set the name of the current thread
Thread
.
currentThread
().
setName
(
"Worker-Main-Thread"
);
WorkerServer
workerServer
=
new
WorkerServer
(
processDao
,
alertDao
);
workerServer
.
run
(
processDao
,
alertDao
);
logger
.
info
(
"worker server started"
);
// blocking
workerServer
.
awaitTermination
();
}
public
void
run
(
ProcessDao
processDao
,
AlertDao
alertDao
){
// heartbeat interval
heartBeatInterval
=
conf
.
getInt
(
Constants
.
WORKER_HEARTBEAT_INTERVAL
,
...
...
@@ -187,45 +194,82 @@ public class WorkerServer implements IStoppable {
// submit fetch task thread
fetchTaskExecutorService
.
execute
(
fetchTaskThread
);
}
@Override
public
synchronized
void
stop
(
String
cause
)
{
}
try
{
//execute only once
if
(
Stopper
.
isStoped
()){
return
;
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
logger
.
info
(
"worker server is stopping ..., cause : {}"
,
cause
);
// set the name of the current thread
Thread
.
currentThread
().
setName
(
"Worker-Main-Thread"
);
// set stop signal is true
Stopper
.
stop
(
);
WorkerServer
workerServer
=
new
WorkerServer
();
try
{
//thread sleep 3 seconds for thread quitely stop
Thread
.
sleep
(
3000L
);
}
catch
(
Exception
e
){
logger
.
warn
(
"thread sleep exception:"
+
e
.
getMessage
(),
e
);
}
workerServer
.
run
();
try
{
heartbeatWorerService
.
shutdownNow
();
}
catch
(
Exception
e
){
logger
.
warn
(
"heartbeat service stopped exception"
);
}
logger
.
info
(
"heartbeat service stopped"
);
logger
.
info
(
"worker server started"
);
try
{
ThreadPoolExecutors
.
getInstance
().
shutdown
();
}
catch
(
Exception
e
){
logger
.
warn
(
"threadpool service stopped exception:{}"
,
e
.
getMessage
());
}
// blocking
workerServer
.
awaitTermination
();
logger
.
info
(
"threadpool service stopped"
);
try
{
killExecutorService
.
shutdownNow
();
}
catch
(
Exception
e
){
logger
.
warn
(
"worker kill executor service stopped exception:{}"
,
e
.
getMessage
());
}
logger
.
info
(
"worker kill executor service stopped"
);
}
try
{
fetchTaskExecutorService
.
shutdownNow
();
}
catch
(
Exception
e
){
logger
.
warn
(
"worker fetch task service stopped exception:{}"
,
e
.
getMessage
());
}
logger
.
info
(
"worker fetch task service stopped"
);
try
{
zkWorkerClient
.
close
();
}
catch
(
Exception
e
){
logger
.
warn
(
"zookeeper service stopped exception:{}"
,
e
.
getMessage
());
}
logger
.
info
(
"zookeeper service stopped"
);
/**
* blocking implement
* @throws InterruptedException
*/
public
void
awaitTermination
()
throws
InterruptedException
{
synchronized
(
lock
)
{
while
(!
terminated
)
{
lock
.
wait
();
//notify
synchronized
(
lock
)
{
terminated
=
true
;
lock
.
notifyAll
();
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"worker server stop exception : "
+
e
.
getMessage
(),
e
);
System
.
exit
(-
1
);
}
}
/**
* heartbeat thread implement
* @return
*/
p
ublic
Runnable
heartBeatThread
(){
p
rivate
Runnable
heartBeatThread
(){
Runnable
heartBeatThread
=
new
Runnable
()
{
@Override
public
void
run
()
{
...
...
@@ -240,11 +284,12 @@ public class WorkerServer implements IStoppable {
return
heartBeatThread
;
}
/**
* kill process thread implement
* @return
*/
p
ublic
Runnable
getKillProcessThread
(){
p
rivate
Runnable
getKillProcessThread
(){
Runnable
killProcessThread
=
new
Runnable
()
{
@Override
public
void
run
()
{
...
...
@@ -286,74 +331,5 @@ public class WorkerServer implements IStoppable {
return
killProcessThread
;
}
@Override
public
synchronized
void
stop
(
String
cause
)
{
try
{
//execute only once
if
(
Stopper
.
isStoped
()){
return
;
}
logger
.
info
(
"worker server is stopping ..., cause : {}"
,
cause
);
// set stop signal is true
Stopper
.
stop
();
try
{
//thread sleep 3 seconds for thread quitely stop
Thread
.
sleep
(
3000L
);
}
catch
(
Exception
e
){
logger
.
warn
(
"thread sleep exception:"
+
e
.
getMessage
(),
e
);
}
try
{
heartbeatWorerService
.
shutdownNow
();
}
catch
(
Exception
e
){
logger
.
warn
(
"heartbeat service stopped exception"
);
}
logger
.
info
(
"heartbeat service stopped"
);
try
{
ThreadPoolExecutors
.
getInstance
().
shutdown
();
}
catch
(
Exception
e
){
logger
.
warn
(
"threadpool service stopped exception:{}"
,
e
.
getMessage
());
}
logger
.
info
(
"threadpool service stopped"
);
try
{
killExecutorService
.
shutdownNow
();
}
catch
(
Exception
e
){
logger
.
warn
(
"worker kill executor service stopped exception:{}"
,
e
.
getMessage
());
}
logger
.
info
(
"worker kill executor service stopped"
);
try
{
fetchTaskExecutorService
.
shutdownNow
();
}
catch
(
Exception
e
){
logger
.
warn
(
"worker fetch task service stopped exception:{}"
,
e
.
getMessage
());
}
logger
.
info
(
"worker fetch task service stopped"
);
try
{
zkWorkerClient
.
close
();
}
catch
(
Exception
e
){
logger
.
warn
(
"zookeeper service stopped exception:{}"
,
e
.
getMessage
());
}
logger
.
info
(
"zookeeper service stopped"
);
//notify
synchronized
(
lock
)
{
terminated
=
true
;
lock
.
notifyAll
();
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"worker server stop exception : "
+
e
.
getMessage
(),
e
);
System
.
exit
(-
1
);
}
}
}
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
浏览文件 @
ddf6ad27
...
...
@@ -327,6 +327,11 @@ public class ZKMasterClient extends AbstractZKClient {
boolean
taskNeedFailover
=
true
;
//now no host will execute this task instance,so no need to failover the task
if
(
taskInstance
.
getHost
()
==
null
){
return
false
;
}
// if the worker node exists in zookeeper, we must check the task starts after the worker
if
(
checkZKNodeExists
(
taskInstance
.
getHost
(),
ZKNodeType
.
WORKER
)){
//if task start after worker starts, there is no need to failover the task.
...
...
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java
浏览文件 @
ddf6ad27
...
...
@@ -18,18 +18,13 @@ package cn.escheduler.server.zk;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.enums.ZKNodeType
;
import
cn.escheduler.common.utils.OSUtils
;
import
cn.escheduler.common.zk.AbstractZKClient
;
import
cn.escheduler.dao.DaoFactory
;
import
cn.escheduler.dao.ServerDao
;
import
cn.escheduler.common.utils.ResInfo
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.recipes.cache.PathChildrenCache
;
import
org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent
;
import
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
;
import
org.apache.curator.utils.ThreadUtils
;
import
org.apache.zookeeper.CreateMode
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -54,11 +49,6 @@ public class ZKWorkerClient extends AbstractZKClient {
*/
private
String
workerZNode
=
null
;
/**
* worker database access
*/
private
ServerDao
serverDao
=
null
;
/**
* create time
*/
...
...
@@ -77,8 +67,6 @@ public class ZKWorkerClient extends AbstractZKClient {
* init
*/
private
void
init
(){
// init worker dao
serverDao
=
DaoFactory
.
getDaoInstance
(
ServerDao
.
class
);
// init system znode
this
.
initSystemZNode
();
...
...
@@ -103,13 +91,6 @@ public class ZKWorkerClient extends AbstractZKClient {
return
zkWorkerClient
;
}
/**
* get worker dao
* @return
*/
public
ServerDao
getServerDao
(){
return
serverDao
;
}
/**
* register worker
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录