Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
三久
DolphinScheduler
提交
5b338361
DolphinScheduler
项目概览
三久
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5b338361
编写于
7月 05, 2019
作者:
L
lgcareer
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'remotes/upstream/dev-1.1.0' into dev-1.1.0
上级
edd65e74
527291c1
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
520 addition
and
221 deletion
+520
-221
README.md
README.md
+1
-1
escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java
...java/cn/escheduler/api/controller/ExecutorController.java
+42
-9
escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java
.../main/java/cn/escheduler/api/service/ExecutorService.java
+11
-1
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java
...ava/cn/escheduler/api/service/ProcessInstanceService.java
+13
-4
escheduler-common/src/main/java/cn/escheduler/common/Constants.java
...-common/src/main/java/cn/escheduler/common/Constants.java
+12
-0
escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java
.../src/main/java/cn/escheduler/common/queue/ITaskQueue.java
+3
-6
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java
...ain/java/cn/escheduler/common/queue/TaskQueueFactory.java
+1
-1
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
...main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
+65
-42
escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java
...mon/src/main/java/cn/escheduler/common/utils/IpUtils.java
+70
-0
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
...c/main/java/cn/escheduler/common/zk/AbstractZKClient.java
+5
-1
escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java
...on/src/test/java/cn/escheduler/common/os/OSUtilsTest.java
+6
-0
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
...st/java/cn/escheduler/common/queue/TaskQueueImplTest.java
+43
-42
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
...duler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
+71
-4
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
...rc/main/java/cn/escheduler/dao/model/ProcessInstance.java
+39
-0
escheduler-server/pom.xml
escheduler-server/pom.xml
+1
-1
escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
...c/main/java/cn/escheduler/server/master/MasterServer.java
+1
-1
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
...a/cn/escheduler/server/worker/runner/FetchTaskThread.java
+84
-72
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
...in/java/cn/escheduler/server/worker/task/sql/SqlTask.java
+1
-1
escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue
...rc/js/conf/home/pages/dag/_source/startingParam/index.vue
+0
-2
escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue
...i/src/js/conf/home/pages/monitor/pages/servers/master.vue
+2
-2
escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
...i/src/js/conf/home/pages/monitor/pages/servers/worker.vue
+2
-2
escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js
...ages/projects/pages/definition/pages/list/_source/util.js
+1
-1
escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
...pages/projects/pages/instance/pages/list/_source/list.vue
+42
-24
escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue
...nf/home/pages/security/pages/users/_source/createUser.vue
+1
-1
escheduler-ui/src/js/module/i18n/locale/zh_CN.js
escheduler-ui/src/js/module/i18n/locale/zh_CN.js
+2
-2
install.sh
install.sh
+1
-1
未找到文件。
README.md
浏览文件 @
5b338361
...
@@ -50,7 +50,7 @@ Easy Scheduler
...
@@ -50,7 +50,7 @@ Easy Scheduler
### 近期研发计划
### 近期研发计划
EasyScheduler的工作计划:
<a
href=
"https://github.com/analysys/EasyScheduler/projects/1"
target=
"_blank"
>
研发计划
</a>
,其中 In Develop卡片下是1.
0.2
版本的功能,TODO卡片是待做事项(包括 feature ideas)
EasyScheduler的工作计划:
<a
href=
"https://github.com/analysys/EasyScheduler/projects/1"
target=
"_blank"
>
研发计划
</a>
,其中 In Develop卡片下是1.
1.0
版本的功能,TODO卡片是待做事项(包括 feature ideas)
### 贡献代码
### 贡献代码
...
...
escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java
浏览文件 @
5b338361
...
@@ -24,7 +24,7 @@ import cn.escheduler.api.utils.Constants;
...
@@ -24,7 +24,7 @@ import cn.escheduler.api.utils.Constants;
import
cn.escheduler.api.utils.Result
;
import
cn.escheduler.api.utils.Result
;
import
cn.escheduler.common.enums.*
;
import
cn.escheduler.common.enums.*
;
import
cn.escheduler.dao.model.User
;
import
cn.escheduler.dao.model.User
;
import
io.swagger.annotations.
Api
;
import
io.swagger.annotations.
*
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
@@ -38,9 +38,9 @@ import static cn.escheduler.api.enums.Status.*;
...
@@ -38,9 +38,9 @@ import static cn.escheduler.api.enums.Status.*;
/**
/**
* execute
task
controller
* execute
process
controller
*/
*/
@Api
Ignore
@Api
(
tags
=
"PROCESS_INSTANCE_EXECUTOR_TAG"
,
position
=
1
)
@RestController
@RestController
@RequestMapping
(
"projects/{projectName}/executors"
)
@RequestMapping
(
"projects/{projectName}/executors"
)
public
class
ExecutorController
extends
BaseController
{
public
class
ExecutorController
extends
BaseController
{
...
@@ -53,10 +53,27 @@ public class ExecutorController extends BaseController {
...
@@ -53,10 +53,27 @@ public class ExecutorController extends BaseController {
/**
/**
* execute process instance
* execute process instance
*/
*/
@ApiOperation
(
value
=
"startProcessInstance"
,
notes
=
"RUN_PROCESS_INSTANCE_NOTES"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"processDefinitionId"
,
value
=
"PROCESS_DEFINITION_ID"
,
required
=
true
,
dataType
=
"Int"
,
example
=
"100"
),
@ApiImplicitParam
(
name
=
"scheduleTime"
,
value
=
"SCHEDULE_TIME"
,
required
=
true
,
dataType
=
"String"
),
@ApiImplicitParam
(
name
=
"failureStrategy"
,
value
=
"FAILURE_STRATEGY"
,
required
=
true
,
dataType
=
"FailureStrategy"
),
@ApiImplicitParam
(
name
=
"startNodeList"
,
value
=
"START_NODE_LIST"
,
dataType
=
"String"
),
@ApiImplicitParam
(
name
=
"taskDependType"
,
value
=
"TASK_DEPEND_TYPE"
,
dataType
=
"TaskDependType"
),
@ApiImplicitParam
(
name
=
"execType"
,
value
=
"COMMAND_TYPE"
,
dataType
=
"CommandType"
),
@ApiImplicitParam
(
name
=
"warningType"
,
value
=
"WARNING_TYPE"
,
required
=
true
,
dataType
=
"WarningType"
),
@ApiImplicitParam
(
name
=
"warningGroupId"
,
value
=
"WARNING_GROUP_ID"
,
required
=
true
,
dataType
=
"Int"
,
example
=
"100"
),
@ApiImplicitParam
(
name
=
"receivers"
,
value
=
"RECEIVERS"
,
dataType
=
"String"
),
@ApiImplicitParam
(
name
=
"receiversCc"
,
value
=
"RECEIVERS_CC"
,
dataType
=
"String"
),
@ApiImplicitParam
(
name
=
"runMode"
,
value
=
"RUN_MODE"
,
dataType
=
"RunMode"
),
@ApiImplicitParam
(
name
=
"processInstancePriority"
,
value
=
"PROCESS_INSTANCE_PRIORITY"
,
required
=
true
,
dataType
=
"Priority"
),
@ApiImplicitParam
(
name
=
"workerGroupId"
,
value
=
"WORKER_GROUP_ID"
,
dataType
=
"Int"
,
example
=
"100"
),
@ApiImplicitParam
(
name
=
"timeout"
,
value
=
"TIMEOUT"
,
dataType
=
"Int"
,
example
=
"100"
),
})
@PostMapping
(
value
=
"start-process-instance"
)
@PostMapping
(
value
=
"start-process-instance"
)
@ResponseStatus
(
HttpStatus
.
OK
)
@ResponseStatus
(
HttpStatus
.
OK
)
public
Result
startProcessInstance
(
@RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
public
Result
startProcessInstance
(
@
ApiIgnore
@
RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
@PathVariable
String
projectName
,
@
ApiParam
(
name
=
"projectName"
,
value
=
"PROJECT_NAME"
,
required
=
true
)
@
PathVariable
String
projectName
,
@RequestParam
(
value
=
"processDefinitionId"
)
int
processDefinitionId
,
@RequestParam
(
value
=
"processDefinitionId"
)
int
processDefinitionId
,
@RequestParam
(
value
=
"scheduleTime"
,
required
=
false
)
String
scheduleTime
,
@RequestParam
(
value
=
"scheduleTime"
,
required
=
false
)
String
scheduleTime
,
@RequestParam
(
value
=
"failureStrategy"
,
required
=
true
)
FailureStrategy
failureStrategy
,
@RequestParam
(
value
=
"failureStrategy"
,
required
=
true
)
FailureStrategy
failureStrategy
,
...
@@ -102,10 +119,15 @@ public class ExecutorController extends BaseController {
...
@@ -102,10 +119,15 @@ public class ExecutorController extends BaseController {
* @param processInstanceId
* @param processInstanceId
* @return
* @return
*/
*/
@ApiOperation
(
value
=
"execute"
,
notes
=
"EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"processInstanceId"
,
value
=
"PROCESS_INSTANCE_ID"
,
required
=
true
,
dataType
=
"Int"
,
example
=
"100"
),
@ApiImplicitParam
(
name
=
"executeType"
,
value
=
"EXECUTE_TYPE"
,
required
=
true
,
dataType
=
"ExecuteType"
)
})
@PostMapping
(
value
=
"/execute"
)
@PostMapping
(
value
=
"/execute"
)
@ResponseStatus
(
HttpStatus
.
OK
)
@ResponseStatus
(
HttpStatus
.
OK
)
public
Result
execute
(
@RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
public
Result
execute
(
@
ApiIgnore
@
RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
@PathVariable
String
projectName
,
@ApiParam
(
name
=
"projectName"
,
value
=
"PROJECT_NAME"
,
required
=
true
)
@PathVariable
String
projectName
,
@RequestParam
(
"processInstanceId"
)
Integer
processInstanceId
,
@RequestParam
(
"processInstanceId"
)
Integer
processInstanceId
,
@RequestParam
(
"executeType"
)
ExecuteType
executeType
@RequestParam
(
"executeType"
)
ExecuteType
executeType
)
{
)
{
...
@@ -127,9 +149,13 @@ public class ExecutorController extends BaseController {
...
@@ -127,9 +149,13 @@ public class ExecutorController extends BaseController {
* @param processDefinitionId
* @param processDefinitionId
* @return
* @return
*/
*/
@ApiOperation
(
value
=
"startCheckProcessDefinition"
,
notes
=
"START_CHECK_PROCESS_DEFINITION_NOTES"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"processDefinitionId"
,
value
=
"PROCESS_DEFINITION_ID"
,
required
=
true
,
dataType
=
"Int"
,
example
=
"100"
)
})
@PostMapping
(
value
=
"/start-check"
)
@PostMapping
(
value
=
"/start-check"
)
@ResponseStatus
(
HttpStatus
.
OK
)
@ResponseStatus
(
HttpStatus
.
OK
)
public
Result
startCheckProcessDefinition
(
@RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
public
Result
startCheckProcessDefinition
(
@
ApiIgnore
@
RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
@RequestParam
(
value
=
"processDefinitionId"
)
int
processDefinitionId
)
{
@RequestParam
(
value
=
"processDefinitionId"
)
int
processDefinitionId
)
{
logger
.
info
(
"login user {}, check process definition"
,
loginUser
.
getUserName
(),
processDefinitionId
);
logger
.
info
(
"login user {}, check process definition"
,
loginUser
.
getUserName
(),
processDefinitionId
);
try
{
try
{
...
@@ -149,9 +175,16 @@ public class ExecutorController extends BaseController {
...
@@ -149,9 +175,16 @@ public class ExecutorController extends BaseController {
* @param processDefinitionId
* @param processDefinitionId
* @return
* @return
*/
*/
@ApiIgnore
@ApiOperation
(
value
=
"getReceiverCc"
,
notes
=
"GET_RECEIVER_CC_NOTES"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"processDefinitionId"
,
value
=
"PROCESS_DEFINITION_ID"
,
required
=
true
,
dataType
=
"Int"
,
example
=
"100"
),
@ApiImplicitParam
(
name
=
"processInstanceId"
,
value
=
"PROCESS_INSTANCE_ID"
,
required
=
true
,
dataType
=
"Int"
,
example
=
"100"
)
})
@GetMapping
(
value
=
"/get-receiver-cc"
)
@GetMapping
(
value
=
"/get-receiver-cc"
)
@ResponseStatus
(
HttpStatus
.
OK
)
@ResponseStatus
(
HttpStatus
.
OK
)
public
Result
getReceiverCc
(
@RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
public
Result
getReceiverCc
(
@
ApiIgnore
@
RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
@RequestParam
(
value
=
"processDefinitionId"
,
required
=
false
)
Integer
processDefinitionId
,
@RequestParam
(
value
=
"processDefinitionId"
,
required
=
false
)
Integer
processDefinitionId
,
@RequestParam
(
value
=
"processInstanceId"
,
required
=
false
)
Integer
processInstanceId
)
{
@RequestParam
(
value
=
"processInstanceId"
,
required
=
false
)
Integer
processInstanceId
)
{
logger
.
info
(
"login user {}, get process definition receiver and cc"
,
loginUser
.
getUserName
());
logger
.
info
(
"login user {}, get process definition receiver and cc"
,
loginUser
.
getUserName
());
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java
浏览文件 @
5b338361
...
@@ -191,6 +191,16 @@ public class ExecutorService extends BaseService{
...
@@ -191,6 +191,16 @@ public class ExecutorService extends BaseService{
return
checkResult
;
return
checkResult
;
}
}
// checkTenantExists();
Tenant
tenant
=
processDao
.
getTenantForProcess
(
processDefinition
.
getTenantId
(),
processDefinition
.
getUserId
());
if
(
tenant
==
null
){
logger
.
error
(
"there is not any vaild tenant for the process definition: id:{},name:{}, "
,
processDefinition
.
getId
(),
processDefinition
.
getName
());
putMsg
(
result
,
Status
.
PROCESS_INSTANCE_NOT_EXIST
,
processInstanceId
);
return
result
;
}
switch
(
executeType
)
{
switch
(
executeType
)
{
case
REPEAT_RUNNING:
case
REPEAT_RUNNING:
result
=
insertCommand
(
loginUser
,
processInstanceId
,
processDefinition
.
getId
(),
CommandType
.
REPEAT_RUNNING
);
result
=
insertCommand
(
loginUser
,
processInstanceId
,
processDefinition
.
getId
(),
CommandType
.
REPEAT_RUNNING
);
...
@@ -260,7 +270,7 @@ public class ExecutorService extends BaseService{
...
@@ -260,7 +270,7 @@ public class ExecutorService extends BaseService{
}
}
break
;
break
;
case
RECOVER_SUSPENDED_PROCESS:
case
RECOVER_SUSPENDED_PROCESS:
if
(
executionStatus
.
typeIsPause
())
{
if
(
executionStatus
.
typeIsPause
()
||
executionStatus
.
typeIsCancel
()
)
{
checkResult
=
true
;
checkResult
=
true
;
}
}
default
:
default
:
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java
浏览文件 @
5b338361
...
@@ -38,10 +38,7 @@ import cn.escheduler.common.utils.JSONUtils;
...
@@ -38,10 +38,7 @@ import cn.escheduler.common.utils.JSONUtils;
import
cn.escheduler.common.utils.ParameterUtils
;
import
cn.escheduler.common.utils.ParameterUtils
;
import
cn.escheduler.common.utils.placeholder.BusinessTimeUtils
;
import
cn.escheduler.common.utils.placeholder.BusinessTimeUtils
;
import
cn.escheduler.dao.ProcessDao
;
import
cn.escheduler.dao.ProcessDao
;
import
cn.escheduler.dao.mapper.ProcessDefinitionMapper
;
import
cn.escheduler.dao.mapper.*
;
import
cn.escheduler.dao.mapper.ProcessInstanceMapper
;
import
cn.escheduler.dao.mapper.ProjectMapper
;
import
cn.escheduler.dao.mapper.TaskInstanceMapper
;
import
cn.escheduler.dao.model.*
;
import
cn.escheduler.dao.model.*
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
...
@@ -97,6 +94,9 @@ public class ProcessInstanceService extends BaseDAGService {
...
@@ -97,6 +94,9 @@ public class ProcessInstanceService extends BaseDAGService {
@Autowired
@Autowired
LoggerService
loggerService
;
LoggerService
loggerService
;
@Autowired
WorkerGroupMapper
workerGroupMapper
;
/**
/**
* query process instance by id
* query process instance by id
*
*
...
@@ -115,6 +115,15 @@ public class ProcessInstanceService extends BaseDAGService {
...
@@ -115,6 +115,15 @@ public class ProcessInstanceService extends BaseDAGService {
return
checkResult
;
return
checkResult
;
}
}
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceDetailById
(
processId
);
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceDetailById
(
processId
);
if
(
processInstance
.
getWorkerGroupId
()
==
-
1
){
processInstance
.
setWorkerGroupName
(
DEFAULT
);
}
else
{
WorkerGroup
workerGroup
=
workerGroupMapper
.
queryById
(
processInstance
.
getWorkerGroupId
());
processInstance
.
setWorkerGroupName
(
workerGroup
.
getName
());
}
ProcessDefinition
processDefinition
=
processDao
.
findProcessDefineById
(
processInstance
.
getProcessDefinitionId
());
processInstance
.
setReceivers
(
processDefinition
.
getReceivers
());
processInstance
.
setReceiversCc
(
processDefinition
.
getReceiversCc
());
result
.
put
(
Constants
.
DATA_LIST
,
processInstance
);
result
.
put
(
Constants
.
DATA_LIST
,
processInstance
);
putMsg
(
result
,
Status
.
SUCCESS
);
putMsg
(
result
,
Status
.
SUCCESS
);
...
...
escheduler-common/src/main/java/cn/escheduler/common/Constants.java
浏览文件 @
5b338361
...
@@ -219,6 +219,11 @@ public final class Constants {
...
@@ -219,6 +219,11 @@ public final class Constants {
*/
*/
public
static
final
String
SEMICOLON
=
";"
;
public
static
final
String
SEMICOLON
=
";"
;
/**
* DOT .
*/
public
static
final
String
DOT
=
"."
;
/**
/**
* ZOOKEEPER_SESSION_TIMEOUT
* ZOOKEEPER_SESSION_TIMEOUT
*/
*/
...
@@ -483,6 +488,8 @@ public final class Constants {
...
@@ -483,6 +488,8 @@ public final class Constants {
public
static
final
String
TASK_RECORD_PWD
=
"task.record.datasource.password"
;
public
static
final
String
TASK_RECORD_PWD
=
"task.record.datasource.password"
;
public
static
final
String
DEFAULT
=
"Default"
;
public
static
String
TASK_RECORD_TABLE_HIVE_LOG
=
"eamp_hive_log_hd"
;
public
static
String
TASK_RECORD_TABLE_HIVE_LOG
=
"eamp_hive_log_hd"
;
public
static
String
TASK_RECORD_TABLE_HISTORY_HIVE_LOG
=
"eamp_hive_hist_log_hd"
;
public
static
String
TASK_RECORD_TABLE_HISTORY_HIVE_LOG
=
"eamp_hive_hist_log_hd"
;
...
@@ -883,6 +890,11 @@ public final class Constants {
...
@@ -883,6 +890,11 @@ public final class Constants {
*/
*/
public
static
final
String
LOGIN_USER_KEY_TAB_USERNAME
=
"login.user.keytab.username"
;
public
static
final
String
LOGIN_USER_KEY_TAB_USERNAME
=
"login.user.keytab.username"
;
/**
* default worker group id
*/
public
static
final
int
DEFAULT_WORKER_ID
=
-
1
;
/**
/**
* loginUserFromKeytab path
* loginUserFromKeytab path
*/
*/
...
...
escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java
浏览文件 @
5b338361
...
@@ -24,20 +24,17 @@ public interface ITaskQueue {
...
@@ -24,20 +24,17 @@ public interface ITaskQueue {
/**
/**
* take out all the elements
* take out all the elements
*
*
* this method has deprecated
* use checkTaskExists instead
*
*
* @param key
* @param key
* @return
* @return
*/
*/
@Deprecated
List
<
String
>
getAllTasks
(
String
key
);
List
<
String
>
getAllTasks
(
String
key
);
/**
/**
* check task exists in the task queue or not
* check task exists in the task queue or not
*
*
* @param key queue name
* @param key queue name
* @param task ${pr
iority}_${processInstanceId
}_${taskId}
* @param task ${pr
ocessInstancePriority}_${processInstanceId}_${taskInstancePriority
}_${taskId}
* @return true if exists in the queue
* @return true if exists in the queue
*/
*/
boolean
checkTaskExists
(
String
key
,
String
task
);
boolean
checkTaskExists
(
String
key
,
String
task
);
...
@@ -54,10 +51,10 @@ public interface ITaskQueue {
...
@@ -54,10 +51,10 @@ public interface ITaskQueue {
* an element pops out of the queue
* an element pops out of the queue
*
*
* @param key queue name
* @param key queue name
* @param
remove whether remove the element
* @param
n how many elements to poll
* @return
* @return
*/
*/
String
poll
(
String
key
,
boolean
remove
);
List
<
String
>
poll
(
String
key
,
int
n
);
/**
/**
* remove a element from queue
* remove a element from queue
...
...
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java
浏览文件 @
5b338361
...
@@ -42,7 +42,7 @@ public class TaskQueueFactory {
...
@@ -42,7 +42,7 @@ public class TaskQueueFactory {
public
static
ITaskQueue
getTaskQueueInstance
()
{
public
static
ITaskQueue
getTaskQueueInstance
()
{
String
queueImplValue
=
CommonUtils
.
getQueueImplValue
();
String
queueImplValue
=
CommonUtils
.
getQueueImplValue
();
if
(
StringUtils
.
isNotBlank
(
queueImplValue
))
{
if
(
StringUtils
.
isNotBlank
(
queueImplValue
))
{
// queueImplValue =
String
Utils.trim(queueImplValue);
// queueImplValue =
Ip
Utils.trim(queueImplValue);
// if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) {
// if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) {
// logger.info("task queue impl use reids ");
// logger.info("task queue impl use reids ");
...
...
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
浏览文件 @
5b338361
...
@@ -19,6 +19,8 @@ package cn.escheduler.common.queue;
...
@@ -19,6 +19,8 @@ package cn.escheduler.common.queue;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.utils.Bytes
;
import
cn.escheduler.common.utils.Bytes
;
import
cn.escheduler.common.utils.IpUtils
;
import
cn.escheduler.common.utils.OSUtils
;
import
cn.escheduler.common.zk.AbstractZKClient
;
import
cn.escheduler.common.zk.AbstractZKClient
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.zookeeper.CreateMode
;
import
org.apache.zookeeper.CreateMode
;
...
@@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat;
...
@@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
java.util.ArrayList
;
import
java.util.*
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Set
;
/**
/**
* A singleton of a task queue implemented with zookeeper
* A singleton of a task queue implemented with zookeeper
...
@@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
...
@@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* @param key task queue name
* @param key task queue name
* @return
* @return
*/
*/
@Deprecated
@Override
@Override
public
List
<
String
>
getAllTasks
(
String
key
)
{
public
List
<
String
>
getAllTasks
(
String
key
)
{
try
{
try
{
...
@@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
...
@@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* check task exists in the task queue or not
* check task exists in the task queue or not
*
*
* @param key queue name
* @param key queue name
* @param task ${pr
iority}_${processInstanceId
}_${taskId}
* @param task ${pr
ocessInstancePriority}_${processInstanceId}_${taskInstancePriority
}_${taskId}
* @return true if exists in the queue
* @return true if exists in the queue
*/
*/
@Override
@Override
...
@@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
...
@@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* add task to tasks queue
* add task to tasks queue
*
*
* @param key task queue name
* @param key task queue name
* @param value ${pr
iority}_${processInstanceId}_${taskId}
* @param value ${pr
ocessInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
*/
*/
@Override
@Override
public
void
add
(
String
key
,
String
value
)
{
public
void
add
(
String
key
,
String
value
)
{
...
@@ -118,9 +116,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
...
@@ -118,9 +116,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
String
taskIdPath
=
getTasksPath
(
key
)
+
Constants
.
SINGLE_SLASH
+
value
;
String
taskIdPath
=
getTasksPath
(
key
)
+
Constants
.
SINGLE_SLASH
+
value
;
String
result
=
getZkClient
().
create
().
withMode
(
CreateMode
.
PERSISTENT
).
forPath
(
taskIdPath
,
Bytes
.
toBytes
(
value
));
String
result
=
getZkClient
().
create
().
withMode
(
CreateMode
.
PERSISTENT
).
forPath
(
taskIdPath
,
Bytes
.
toBytes
(
value
));
// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value;
// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
// Bytes.toBytes(value));
logger
.
info
(
"add task : {} to tasks queue , result success"
,
result
);
logger
.
info
(
"add task : {} to tasks queue , result success"
,
result
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
logger
.
error
(
"add task to tasks queue exception"
,
e
);
logger
.
error
(
"add task to tasks queue exception"
,
e
);
...
@@ -132,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
...
@@ -132,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
/**
/**
* An element pops out of the queue <p>
* An element pops out of the queue <p>
* note:
* note:
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
_host1,host2,...
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
*
*
* 流程
实例优先级_流程实例id_任务优先级_任务id
high <- low
* 流程
优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,...
high <- low
* @param key task queue name
* @param key task queue name
* @param
remove whether remove the element
* @param
tasksNum how many elements to poll
* @return the task id to be executed
* @return the task id
s
to be executed
*/
*/
@Override
@Override
public
String
poll
(
String
key
,
boolean
remove
)
{
public
List
<
String
>
poll
(
String
key
,
int
tasksNum
)
{
try
{
try
{
CuratorFramework
zk
=
getZkClient
();
CuratorFramework
zk
=
getZkClient
();
String
tasksQueuePath
=
getTasksPath
(
key
)
+
Constants
.
SINGLE_SLASH
;
String
tasksQueuePath
=
getTasksPath
(
key
)
+
Constants
.
SINGLE_SLASH
;
...
@@ -149,55 +144,83 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
...
@@ -149,55 +144,83 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
if
(
list
!=
null
&&
list
.
size
()
>
0
){
if
(
list
!=
null
&&
list
.
size
()
>
0
){
String
workerIp
=
OSUtils
.
getHost
();
String
workerIpLongStr
=
String
.
valueOf
(
IpUtils
.
ipToLong
(
workerIp
));
int
size
=
list
.
size
();
int
size
=
list
.
size
();
String
formatTargetTask
=
null
;
String
targetTaskKey
=
null
;
Set
<
String
>
taskTreeSet
=
new
TreeSet
<>();
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
String
taskDetail
=
list
.
get
(
i
);
String
taskDetail
=
list
.
get
(
i
);
String
[]
taskDetailArrs
=
taskDetail
.
split
(
Constants
.
UNDERLINE
);
String
[]
taskDetailArrs
=
taskDetail
.
split
(
Constants
.
UNDERLINE
);
if
(
taskDetailArrs
.
length
==
4
){
//向前版本兼容
if
(
taskDetailArrs
.
length
>=
4
){
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String
formatTask
=
String
.
format
(
"%s_%010d_%s_%010d"
,
taskDetailArrs
[
0
],
Long
.
parseLong
(
taskDetailArrs
[
1
]),
taskDetailArrs
[
2
],
Long
.
parseLong
(
taskDetailArrs
[
3
]));
String
formatTask
=
String
.
format
(
"%s_%010d_%s_%010d"
,
taskDetailArrs
[
0
],
Long
.
parseLong
(
taskDetailArrs
[
1
]),
taskDetailArrs
[
2
],
Long
.
parseLong
(
taskDetailArrs
[
3
]));
if
(
i
>
0
){
if
(
taskDetailArrs
.
length
>
4
){
int
result
=
formatTask
.
compareTo
(
formatTargetTask
);
String
taskHosts
=
taskDetailArrs
[
4
];
if
(
result
<
0
){
formatTargetTask
=
formatTask
;
//task can assign to any worker host if equals default ip value of worker server
targetTaskKey
=
taskDetail
;
if
(!
taskHosts
.
equals
(
String
.
valueOf
(
Constants
.
DEFAULT_WORKER_ID
))){
String
[]
taskHostsArr
=
taskHosts
.
split
(
Constants
.
COMMA
);
if
(!
Arrays
.
asList
(
taskHostsArr
).
contains
(
workerIpLongStr
)){
continue
;
}
}
}
}
else
{
formatTargetTask
=
formatTask
;
targetTaskKey
=
taskDetail
;
}
}
}
else
{
logger
.
error
(
"task queue poll error, task detail :{} , please check!"
,
taskDetail
);
taskTreeSet
.
add
(
formatTask
);
}
}
}
if
(
formatTargetTask
!=
null
){
}
String
taskIdPath
=
tasksQueuePath
+
targetTaskKey
;
logger
.
info
(
"consume task {}"
,
taskIdPath
);
List
<
String
>
taskslist
=
getTasksListFromTreeSet
(
tasksNum
,
taskTreeSet
);
String
[]
vals
=
targetTaskKey
.
split
(
Constants
.
UNDERLINE
);
logger
.
info
(
"consume tasks: {},there still have {} tasks need to be executed"
,
Arrays
.
toString
(
taskslist
.
toArray
()),
size
-
taskslist
.
size
()
);
if
(
remove
){
return
taskslist
;
removeNode
(
key
,
targetTaskKey
);
}
else
{
}
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
logger
.
info
(
"consume task: {},there still have {} tasks need to be executed"
,
vals
[
vals
.
length
-
1
],
size
-
1
);
return
targetTaskKey
;
}
else
{
logger
.
error
(
"should not go here, task queue poll error, please check!"
);
}
}
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
logger
.
error
(
"add task to tasks queue exception"
,
e
);
logger
.
error
(
"add task to tasks queue exception"
,
e
);
}
}
return
n
ull
;
return
n
ew
ArrayList
<
String
>()
;
}
}
/**
* get task list from tree set
*
* @param tasksNum
* @param taskTreeSet
*/
public
List
<
String
>
getTasksListFromTreeSet
(
int
tasksNum
,
Set
<
String
>
taskTreeSet
)
{
Iterator
<
String
>
iterator
=
taskTreeSet
.
iterator
();
int
j
=
0
;
List
<
String
>
taskslist
=
new
ArrayList
<>(
tasksNum
);
while
(
iterator
.
hasNext
()){
if
(
j
++
<
tasksNum
){
String
task
=
iterator
.
next
();
String
[]
taskArray
=
task
.
split
(
Constants
.
UNDERLINE
);
int
processInstanceId
=
Integer
.
parseInt
(
taskArray
[
1
]);
int
taskId
=
Integer
.
parseInt
(
taskArray
[
3
]);
String
destTask
=
taskArray
[
0
]+
Constants
.
UNDERLINE
+
processInstanceId
+
Constants
.
UNDERLINE
+
taskArray
[
2
]
+
Constants
.
UNDERLINE
+
taskId
;
taskslist
.
add
(
destTask
);
}
}
return
taskslist
;
}
@Override
@Override
public
void
removeNode
(
String
key
,
String
nodeValue
){
public
void
removeNode
(
String
key
,
String
nodeValue
){
...
...
escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java
0 → 100644
浏览文件 @
5b338361
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
cn.escheduler.common.utils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* http utils
*/
public
class
IpUtils
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
IpUtils
.
class
);
public
static
final
String
DOT
=
"."
;
/**
* ip str to long <p>
*
* @param ipStr ip string
*/
public
static
Long
ipToLong
(
String
ipStr
)
{
String
[]
ipSet
=
ipStr
.
split
(
"\\"
+
DOT
);
return
Long
.
parseLong
(
ipSet
[
0
])
<<
24
|
Long
.
parseLong
(
ipSet
[
1
])
<<
16
|
Long
.
parseLong
(
ipSet
[
2
])
<<
8
|
Long
.
parseLong
(
ipSet
[
3
]);
}
/**
* long to ip
* @param ipLong the long number converted from IP
* @return String
*/
public
static
String
longToIp
(
long
ipLong
)
{
long
[]
ipNumbers
=
new
long
[
4
];
long
tmp
=
0xFF
;
ipNumbers
[
0
]
=
ipLong
>>
24
&
tmp
;
ipNumbers
[
1
]
=
ipLong
>>
16
&
tmp
;
ipNumbers
[
2
]
=
ipLong
>>
8
&
tmp
;
ipNumbers
[
3
]
=
ipLong
&
tmp
;
StringBuilder
sb
=
new
StringBuilder
(
16
);
sb
.
append
(
ipNumbers
[
0
]).
append
(
DOT
)
.
append
(
ipNumbers
[
1
]).
append
(
DOT
)
.
append
(
ipNumbers
[
2
]).
append
(
DOT
)
.
append
(
ipNumbers
[
3
]);
return
sb
.
toString
();
}
public
static
void
main
(
String
[]
args
){
long
ipLong
=
ipToLong
(
"11.3.4.5"
);
logger
.
info
(
longToIp
(
ipLong
));
}
}
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
浏览文件 @
5b338361
...
@@ -312,7 +312,11 @@ public abstract class AbstractZKClient {
...
@@ -312,7 +312,11 @@ public abstract class AbstractZKClient {
childrenList
=
zkClient
.
getChildren
().
forPath
(
masterZNodeParentPath
);
childrenList
=
zkClient
.
getChildren
().
forPath
(
masterZNodeParentPath
);
}
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
logger
.
warn
(
e
.
getMessage
(),
e
);
// logger.warn(e.getMessage());
if
(!
e
.
getMessage
().
contains
(
"java.lang.IllegalStateException: instance must be started"
)){
logger
.
warn
(
e
.
getMessage
(),
e
);
}
return
childrenList
.
size
();
return
childrenList
.
size
();
}
}
return
childrenList
.
size
();
return
childrenList
.
size
();
...
...
escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java
浏览文件 @
5b338361
...
@@ -37,6 +37,12 @@ public class OSUtilsTest {
...
@@ -37,6 +37,12 @@ public class OSUtilsTest {
// static HardwareAbstractionLayer hal = si.getHardware();
// static HardwareAbstractionLayer hal = si.getHardware();
@Test
public
void
getHost
(){
logger
.
info
(
OSUtils
.
getHost
());
}
@Test
@Test
public
void
memoryUsage
()
{
public
void
memoryUsage
()
{
logger
.
info
(
"memoryUsage : {}"
,
OSUtils
.
memoryUsage
());
// 0.3361799418926239
logger
.
info
(
"memoryUsage : {}"
,
OSUtils
.
memoryUsage
());
// 0.3361799418926239
...
...
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
浏览文件 @
5b338361
...
@@ -17,12 +17,15 @@
...
@@ -17,12 +17,15 @@
package
cn.escheduler.common.queue
;
package
cn.escheduler.common.queue
;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.Constants
;
import
org.junit.After
;
import
org.junit.Assert
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.Test
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
java.util.Arrays
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.Random
;
import
java.util.Random
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
...
@@ -34,59 +37,62 @@ public class TaskQueueImplTest {
...
@@ -34,59 +37,62 @@ public class TaskQueueImplTest {
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TaskQueueImplTest
.
class
);
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TaskQueueImplTest
.
class
);
ITaskQueue
tasksQueue
=
null
;
@Test
@Before
public
void
testTaskQueue
(){
public
void
before
(){
tasksQueue
=
TaskQueueFactory
.
getTaskQueueInstance
();
//clear all data
tasksQueue
.
delete
();
ITaskQueue
tasksQueue
=
TaskQueueFactory
.
getTaskQueueInstance
();
}
@After
public
void
after
(){
//clear all data
//clear all data
tasksQueue
.
delete
();
tasksQueue
.
delete
();
}
@Test
public
void
testAdd
(){
//add
//add
tasksQueue
.
add
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
"1"
);
tasksQueue
.
add
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
"1_1_1_1_2130706433,3232236775"
);
tasksQueue
.
add
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
"2"
);
tasksQueue
.
add
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
"0_1_1_1_2130706433,3232236775"
);
tasksQueue
.
add
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
"3"
);
tasksQueue
.
add
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
"1_1_0_1_2130706433,3232236775"
);
tasksQueue
.
add
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
"4"
);
tasksQueue
.
add
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
"1_2_1_1_2130706433,3232236775"
);
List
<
String
>
tasks
=
tasksQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
1
);
if
(
tasks
.
size
()
<
0
){
return
;
}
//pop
//pop
String
node1
=
tasksQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
false
);
String
node1
=
tasks
.
get
(
0
);
assertEquals
(
node1
,
"1"
);
String
node2
=
tasksQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
false
);
assertEquals
(
node2
,
"2"
);
//sadd
String
task1
=
"1.1.1.1-1-mr"
;
String
task2
=
"1.1.1.2-2-mr"
;
String
task3
=
"1.1.1.3-3-mr"
;
String
task4
=
"1.1.1.4-4-mr"
;
String
task5
=
"1.1.1.5-5-mr"
;
tasksQueue
.
sadd
(
Constants
.
SCHEDULER_TASKS_KILL
,
task1
);
tasksQueue
.
sadd
(
Constants
.
SCHEDULER_TASKS_KILL
,
task2
);
tasksQueue
.
sadd
(
Constants
.
SCHEDULER_TASKS_KILL
,
task3
);
tasksQueue
.
sadd
(
Constants
.
SCHEDULER_TASKS_KILL
,
task4
);
tasksQueue
.
sadd
(
Constants
.
SCHEDULER_TASKS_KILL
,
task5
);
tasksQueue
.
sadd
(
Constants
.
SCHEDULER_TASKS_KILL
,
task5
);
//repeat task
Assert
.
assertEquals
(
tasksQueue
.
smembers
(
Constants
.
SCHEDULER_TASKS_KILL
).
size
(),
5
);
logger
.
info
(
Arrays
.
toString
(
tasksQueue
.
smembers
(
Constants
.
SCHEDULER_TASKS_KILL
).
toArray
()));
//srem
tasksQueue
.
srem
(
Constants
.
SCHEDULER_TASKS_KILL
,
task5
);
//smembers
Assert
.
assertEquals
(
tasksQueue
.
smembers
(
Constants
.
SCHEDULER_TASKS_KILL
).
size
(),
4
);
logger
.
info
(
Arrays
.
toString
(
tasksQueue
.
smembers
(
Constants
.
SCHEDULER_TASKS_KILL
).
toArray
()));
assertEquals
(
node1
,
"0_0000000001_1_0000000001"
);
tasks
=
tasksQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
1
);
if
(
tasks
.
size
()
<
0
){
return
;
}
String
node2
=
tasks
.
get
(
0
);
assertEquals
(
node2
,
"0_0000000001_1_0000000001"
);
}
}
/**
/**
* test one million data from zookeeper queue
* test one million data from zookeeper queue
*/
*/
@Test
@Test
public
void
extremeTest
(){
public
void
extremeTest
(){
ITaskQueue
tasksQueue
=
TaskQueueFactory
.
getTaskQueueInstance
();
//clear all data
tasksQueue
.
delete
();
int
total
=
30
*
10000
;
int
total
=
30
*
10000
;
for
(
int
i
=
0
;
i
<
total
;
i
++)
for
(
int
i
=
0
;
i
<
total
;
i
++)
...
@@ -99,14 +105,9 @@ public class TaskQueueImplTest {
...
@@ -99,14 +105,9 @@ public class TaskQueueImplTest {
}
}
}
}
String
node1
=
tasksQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
false
);
String
node1
=
tasksQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
1
).
get
(
0
);
assertEquals
(
node1
,
"0"
);
assertEquals
(
node1
,
"0"
);
//clear all data
tasksQueue
.
delete
();
}
}
}
}
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
浏览文件 @
5b338361
...
@@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue;
...
@@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue;
import
cn.escheduler.common.queue.TaskQueueFactory
;
import
cn.escheduler.common.queue.TaskQueueFactory
;
import
cn.escheduler.common.task.subprocess.SubProcessParameters
;
import
cn.escheduler.common.task.subprocess.SubProcessParameters
;
import
cn.escheduler.common.utils.DateUtils
;
import
cn.escheduler.common.utils.DateUtils
;
import
cn.escheduler.common.utils.IpUtils
;
import
cn.escheduler.common.utils.JSONUtils
;
import
cn.escheduler.common.utils.JSONUtils
;
import
cn.escheduler.common.utils.ParameterUtils
;
import
cn.escheduler.common.utils.ParameterUtils
;
import
cn.escheduler.dao.mapper.*
;
import
cn.escheduler.dao.mapper.*
;
...
@@ -117,7 +118,7 @@ public class ProcessDao extends AbstractBaseDao {
...
@@ -117,7 +118,7 @@ public class ProcessDao extends AbstractBaseDao {
*/
*/
@Override
@Override
protected
void
init
()
{
protected
void
init
()
{
userMapper
=
getMapper
(
UserMapper
.
class
);
userMapper
=
getMapper
(
UserMapper
.
class
);
processDefineMapper
=
getMapper
(
ProcessDefinitionMapper
.
class
);
processDefineMapper
=
getMapper
(
ProcessDefinitionMapper
.
class
);
processInstanceMapper
=
getMapper
(
ProcessInstanceMapper
.
class
);
processInstanceMapper
=
getMapper
(
ProcessInstanceMapper
.
class
);
dataSourceMapper
=
getMapper
(
DataSourceMapper
.
class
);
dataSourceMapper
=
getMapper
(
DataSourceMapper
.
class
);
...
@@ -1015,11 +1016,58 @@ public class ProcessDao extends AbstractBaseDao {
...
@@ -1015,11 +1016,58 @@ public class ProcessDao extends AbstractBaseDao {
*
*
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
*
*
* @param task
* @param task
Instance
* @return
* @return
*/
*/
private
String
taskZkInfo
(
TaskInstance
task
)
{
private
String
taskZkInfo
(
TaskInstance
taskInstance
)
{
return
String
.
valueOf
(
task
.
getProcessInstancePriority
().
ordinal
())
+
Constants
.
UNDERLINE
+
task
.
getProcessInstanceId
()
+
Constants
.
UNDERLINE
+
task
.
getTaskInstancePriority
().
ordinal
()
+
Constants
.
UNDERLINE
+
task
.
getId
();
int
taskWorkerGroupId
=
getTaskWorkerGroupId
(
taskInstance
);
StringBuilder
sb
=
new
StringBuilder
(
100
);
sb
.
append
(
taskInstance
.
getProcessInstancePriority
().
ordinal
()).
append
(
Constants
.
UNDERLINE
)
.
append
(
taskInstance
.
getProcessInstanceId
()).
append
(
Constants
.
UNDERLINE
)
.
append
(
taskInstance
.
getTaskInstancePriority
().
ordinal
()).
append
(
Constants
.
UNDERLINE
)
.
append
(
taskInstance
.
getId
()).
append
(
Constants
.
UNDERLINE
);
if
(
taskWorkerGroupId
>
0
){
//not to find data from db
WorkerGroup
workerGroup
=
queryWorkerGroupById
(
taskWorkerGroupId
);
if
(
workerGroup
==
null
){
logger
.
info
(
"task {} cannot find the worker group, use all worker instead."
,
taskInstance
.
getId
());
sb
.
append
(
Constants
.
DEFAULT_WORKER_ID
);
return
sb
.
toString
();
}
String
ips
=
workerGroup
.
getIpList
();
if
(
StringUtils
.
isBlank
(
ips
)){
logger
.
error
(
"task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers"
,
taskInstance
.
getId
(),
workerGroup
.
getId
());
sb
.
append
(
Constants
.
DEFAULT_WORKER_ID
);
return
sb
.
toString
();
}
StringBuilder
ipSb
=
new
StringBuilder
(
100
);
String
[]
ipArray
=
ips
.
split
(
COMMA
);
for
(
String
ip
:
ipArray
)
{
long
ipLong
=
IpUtils
.
ipToLong
(
ip
);
ipSb
.
append
(
ipLong
).
append
(
COMMA
);
}
if
(
ipSb
.
length
()
>
0
)
{
ipSb
.
deleteCharAt
(
ipSb
.
length
()
-
1
);
}
sb
.
append
(
ipSb
);
}
else
{
sb
.
append
(
Constants
.
DEFAULT_WORKER_ID
);
}
return
sb
.
toString
();
}
}
/**
/**
...
@@ -1683,5 +1731,24 @@ public class ProcessDao extends AbstractBaseDao {
...
@@ -1683,5 +1731,24 @@ public class ProcessDao extends AbstractBaseDao {
}
}
/**
* get task worker group id
*
* @param taskInstance
* @return
*/
public
int
getTaskWorkerGroupId
(
TaskInstance
taskInstance
)
{
int
taskWorkerGroupId
=
taskInstance
.
getWorkerGroupId
();
ProcessInstance
processInstance
=
findProcessInstanceByTaskId
(
taskInstance
.
getId
());
if
(
processInstance
==
null
){
logger
.
error
(
"cannot find the task:{} process instance"
,
taskInstance
.
getId
());
return
Constants
.
DEFAULT_WORKER_ID
;
}
int
processWorkerGroupId
=
processInstance
.
getWorkerGroupId
();
taskWorkerGroupId
=
(
taskWorkerGroupId
<=
0
?
processWorkerGroupId
:
taskWorkerGroupId
);
return
taskWorkerGroupId
;
}
}
}
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
浏览文件 @
5b338361
...
@@ -194,6 +194,21 @@ public class ProcessInstance {
...
@@ -194,6 +194,21 @@ public class ProcessInstance {
*/
*/
private
int
tenantId
;
private
int
tenantId
;
/**
* worker group name. for api.
*/
private
String
workerGroupName
;
/**
* receivers for api
*/
private
String
receivers
;
/**
* receivers cc for api
*/
private
String
receiversCc
;
public
ProcessInstance
(){
public
ProcessInstance
(){
}
}
...
@@ -560,4 +575,28 @@ public class ProcessInstance {
...
@@ -560,4 +575,28 @@ public class ProcessInstance {
public
int
getTenantId
()
{
public
int
getTenantId
()
{
return
this
.
tenantId
;
return
this
.
tenantId
;
}
}
public
String
getWorkerGroupName
()
{
return
workerGroupName
;
}
public
void
setWorkerGroupName
(
String
workerGroupName
)
{
this
.
workerGroupName
=
workerGroupName
;
}
public
String
getReceivers
()
{
return
receivers
;
}
public
void
setReceivers
(
String
receivers
)
{
this
.
receivers
=
receivers
;
}
public
String
getReceiversCc
()
{
return
receiversCc
;
}
public
void
setReceiversCc
(
String
receiversCc
)
{
this
.
receiversCc
=
receiversCc
;
}
}
}
escheduler-server/pom.xml
浏览文件 @
5b338361
...
@@ -89,7 +89,7 @@
...
@@ -89,7 +89,7 @@
<artifactId>
escheduler-alert
</artifactId>
<artifactId>
escheduler-alert
</artifactId>
</dependency>
</dependency>
</dependencies>
</dependencies>
<build>
<build>
...
...
escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
浏览文件 @
5b338361
...
@@ -216,7 +216,7 @@ public class MasterServer implements CommandLineRunner, IStoppable {
...
@@ -216,7 +216,7 @@ public class MasterServer implements CommandLineRunner, IStoppable {
if
(
Stopper
.
isRunning
())
{
if
(
Stopper
.
isRunning
())
{
// send heartbeat to zk
// send heartbeat to zk
if
(
StringUtils
.
isBlank
(
zkMasterClient
.
getMasterZNode
()))
{
if
(
StringUtils
.
isBlank
(
zkMasterClient
.
getMasterZNode
()))
{
logger
.
error
(
"master send heartbeat to zk failed"
);
logger
.
error
(
"master send heartbeat to zk failed
: can't find zookeeper regist path of master server
"
);
return
;
return
;
}
}
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
浏览文件 @
5b338361
...
@@ -25,8 +25,9 @@ import cn.escheduler.common.utils.OSUtils;
...
@@ -25,8 +25,9 @@ import cn.escheduler.common.utils.OSUtils;
import
cn.escheduler.dao.ProcessDao
;
import
cn.escheduler.dao.ProcessDao
;
import
cn.escheduler.dao.model.*
;
import
cn.escheduler.dao.model.*
;
import
cn.escheduler.server.zk.ZKWorkerClient
;
import
cn.escheduler.server.zk.ZKWorkerClient
;
import
com.cronutils.utils.StringUtils
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.recipes.locks.InterProcessMutex
;
import
org.apache.curator.framework.recipes.locks.InterProcessMutex
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
...
@@ -98,15 +99,7 @@ public class FetchTaskThread implements Runnable{
...
@@ -98,15 +99,7 @@ public class FetchTaskThread implements Runnable{
*/
*/
private
boolean
checkWorkerGroup
(
TaskInstance
taskInstance
,
String
host
){
private
boolean
checkWorkerGroup
(
TaskInstance
taskInstance
,
String
host
){
int
taskWorkerGroupId
=
taskInstance
.
getWorkerGroupId
();
int
taskWorkerGroupId
=
processDao
.
getTaskWorkerGroupId
(
taskInstance
);
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceByTaskId
(
taskInstance
.
getId
());
if
(
processInstance
==
null
){
logger
.
error
(
"cannot find the task:{} process instance"
,
taskInstance
.
getId
());
return
false
;
}
int
processWorkerGroupId
=
processInstance
.
getWorkerGroupId
();
taskWorkerGroupId
=
(
taskWorkerGroupId
<=
0
?
processWorkerGroupId
:
taskWorkerGroupId
);
if
(
taskWorkerGroupId
<=
0
){
if
(
taskWorkerGroupId
<=
0
){
return
true
;
return
true
;
...
@@ -117,99 +110,103 @@ public class FetchTaskThread implements Runnable{
...
@@ -117,99 +110,103 @@ public class FetchTaskThread implements Runnable{
return
true
;
return
true
;
}
}
String
ips
=
workerGroup
.
getIpList
();
String
ips
=
workerGroup
.
getIpList
();
if
(
ips
==
null
){
if
(
StringUtils
.
isBlank
(
ips
)
){
logger
.
error
(
"task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers"
,
logger
.
error
(
"task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers"
,
taskInstance
.
getId
(),
workerGroup
.
getId
());
taskInstance
.
getId
(),
workerGroup
.
getId
());
}
}
String
[]
ipArray
=
ips
.
split
(
","
);
String
[]
ipArray
=
ips
.
split
(
Constants
.
COMMA
);
List
<
String
>
ipList
=
Arrays
.
asList
(
ipArray
);
List
<
String
>
ipList
=
Arrays
.
asList
(
ipArray
);
return
ipList
.
contains
(
host
);
return
ipList
.
contains
(
host
);
}
}
@Override
@Override
public
void
run
()
{
public
void
run
()
{
while
(
Stopper
.
isRunning
()){
while
(
Stopper
.
isRunning
()){
InterProcessMutex
mutex
=
null
;
InterProcessMutex
mutex
=
null
;
try
{
try
{
if
(
OSUtils
.
checkResource
(
this
.
conf
,
false
))
{
// creating distributed locks, lock path /escheduler/lock/worker
ThreadPoolExecutor
poolExecutor
=
(
ThreadPoolExecutor
)
workerExecService
;
String
zNodeLockPath
=
zkWorkerClient
.
getWorkerLockPath
();
mutex
=
new
InterProcessMutex
(
zkWorkerClient
.
getZkClient
(),
zNodeLockPath
);
mutex
.
acquire
();
ThreadPoolExecutor
poolExecutor
=
(
ThreadPoolExecutor
)
workerExecService
;
//check memory and cpu usage and threads
if
(
OSUtils
.
checkResource
(
this
.
conf
,
false
)
&&
checkThreadCount
(
poolExecutor
))
{
for
(
int
i
=
0
;
i
<
taskNum
;
i
++)
{
//whether have tasks, if no tasks , no need lock //get all tasks
List
<
String
>
tasksQueueList
=
taskQueue
.
getAllTasks
(
Constants
.
SCHEDULER_TASKS_QUEUE
);
int
activeCount
=
poolExecutor
.
getActiveCount
();
if
(
tasksQueueList
.
size
()
>
0
){
if
(
activeCount
>=
workerExecNums
)
{
// creating distributed locks, lock path /escheduler/lock/worker
logger
.
info
(
"thread insufficient , activeCount : {} , workerExecNums : {}"
,
activeCount
,
workerExecNums
);
String
zNodeLockPath
=
zkWorkerClient
.
getWorkerLockPath
(
);
continue
;
mutex
=
new
InterProcessMutex
(
zkWorkerClient
.
getZkClient
(),
zNodeLockPath
)
;
}
mutex
.
acquire
();
// task instance id str
// task instance id str
String
taskQueueStr
=
taskQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
false
);
List
<
String
>
taskQueueStrArr
=
taskQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
taskNum
);
if
(!
StringUtils
.
isEmpty
(
taskQueueStr
))
{
for
(
String
taskQueueStr
:
taskQueueStrArr
){
if
(
StringUtils
.
isNotBlank
(
taskQueueStr
))
{
String
[]
taskStringArray
=
taskQueueStr
.
split
(
Constants
.
UNDERLINE
);
if
(!
checkThreadCount
(
poolExecutor
))
{
String
taskInstIdStr
=
taskStringArray
[
taskStringArray
.
length
-
1
];
break
;
Date
now
=
new
Date
();
}
Integer
taskId
=
Integer
.
parseInt
(
taskInstIdStr
);
// find task instance by task id
String
[]
taskStringArray
=
taskQueueStr
.
split
(
Constants
.
UNDERLINE
);
TaskInstance
taskInstance
=
processDao
.
findTaskInstanceById
(
taskId
);
String
taskInstIdStr
=
taskStringArray
[
taskStringArray
.
length
-
1
];
Date
now
=
new
Date
();
Integer
taskId
=
Integer
.
parseInt
(
taskInstIdStr
);
logger
.
info
(
"worker fetch taskId : {} from queue "
,
taskId
);
// find task instance by task id
TaskInstance
taskInstance
=
processDao
.
findTaskInstanceById
(
taskId
);
int
retryTimes
=
30
;
logger
.
info
(
"worker fetch taskId : {} from queue "
,
taskId
);
// mainly to wait for the master insert task to succeed
while
(
taskInstance
==
null
&&
retryTimes
>
0
)
{
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
taskInstance
=
processDao
.
findTaskInstanceById
(
taskId
);
retryTimes
--;
}
if
(
taskInstance
==
null
)
{
int
retryTimes
=
30
;
logger
.
error
(
"task instance is null. task id : {} "
,
taskId
);
// mainly to wait for the master insert task to succeed
continue
;
while
(
taskInstance
==
null
&&
retryTimes
>
0
)
{
}
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
if
(!
checkWorkerGroup
(
taskInstance
,
OSUtils
.
getHost
())){
taskInstance
=
processDao
.
findTaskInstanceById
(
taskId
);
continue
;
retryTimes
--;
}
}
taskQueue
.
removeNode
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
taskQueueStr
);
logger
.
info
(
"remove task:{} from queue"
,
taskQueueStr
);
if
(
taskInstance
==
null
)
{
logger
.
error
(
"task instance is null. task id : {} "
,
taskId
);
continue
;
}
// set execute task worker host
if
(!
checkWorkerGroup
(
taskInstance
,
OSUtils
.
getHost
())){
taskInstance
.
setHost
(
OSUtils
.
getHost
());
continue
;
taskInstance
.
setStartTime
(
now
);
}
taskQueue
.
removeNode
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
taskQueueStr
);
logger
.
info
(
"remove task:{} from queue"
,
taskQueueStr
);
// set execute task worker host
taskInstance
.
setHost
(
OSUtils
.
getHost
());
taskInstance
.
setStartTime
(
now
);
// get process instance
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceDetailById
(
taskInstance
.
getProcessInstanceId
());
// get process instance
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceDetailById
(
taskInstance
.
getProcessInstanceId
());
// get process define
// get process define
ProcessDefinition
processDefine
=
processDao
.
findProcessDefineById
(
taskInstance
.
getProcessDefinitionId
());
ProcessDefinition
processDefine
=
processDao
.
findProcessDefineById
(
taskInstance
.
getProcessDefinitionId
());
taskInstance
.
setProcessInstance
(
processInstance
);
taskInstance
.
setProcessInstance
(
processInstance
);
taskInstance
.
setProcessDefine
(
processDefine
);
taskInstance
.
setProcessDefine
(
processDefine
);
// get local execute path
// get local execute path
String
execLocalPath
=
FileUtils
.
getProcessExecDir
(
processDefine
.
getProjectId
(),
String
execLocalPath
=
FileUtils
.
getProcessExecDir
(
processDefine
.
getProjectId
(),
processDefine
.
getId
(),
processDefine
.
getId
(),
processInstance
.
getId
(),
processInstance
.
getId
(),
taskInstance
.
getId
());
taskInstance
.
getId
());
logger
.
info
(
"task instance local execute path : {} "
,
execLocalPath
);
logger
.
info
(
"task instance local execute path : {} "
,
execLocalPath
);
// set task execute path
// set task execute path
taskInstance
.
setExecutePath
(
execLocalPath
);
taskInstance
.
setExecutePath
(
execLocalPath
);
Tenant
tenant
=
processDao
.
getTenantForProcess
(
processInstance
.
getTenantId
(),
Tenant
tenant
=
processDao
.
getTenantForProcess
(
processInstance
.
getTenantId
(),
processDefine
.
getUserId
());
processDefine
.
getUserId
());
...
@@ -218,21 +215,22 @@ public class FetchTaskThread implements Runnable{
...
@@ -218,21 +215,22 @@ public class FetchTaskThread implements Runnable{
FileUtils
.
createWorkDirAndUserIfAbsent
(
execLocalPath
,
FileUtils
.
createWorkDirAndUserIfAbsent
(
execLocalPath
,
tenant
.
getTenantCode
(),
logger
);
tenant
.
getTenantCode
(),
logger
);
logger
.
info
(
"task : {} ready to submit to task scheduler thread"
,
taskId
);
logger
.
info
(
"task : {} ready to submit to task scheduler thread"
,
taskId
);
// submit task
// submit task
workerExecService
.
submit
(
new
TaskScheduleThread
(
taskInstance
,
processDao
));
workerExecService
.
submit
(
new
TaskScheduleThread
(
taskInstance
,
processDao
));
}
}
}
}
}
}
}
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
}
catch
(
Exception
e
){
}
catch
(
Exception
e
){
logger
.
error
(
"fetch task thread exception : "
+
e
.
getMessage
(),
e
);
logger
.
error
(
"fetch task thread exception : "
+
e
.
getMessage
(),
e
);
}
}
finally
{
finally
{
if
(
mutex
!=
null
){
if
(
mutex
!=
null
){
try
{
try
{
mutex
.
release
();
mutex
.
release
();
...
@@ -247,4 +245,18 @@ public class FetchTaskThread implements Runnable{
...
@@ -247,4 +245,18 @@ public class FetchTaskThread implements Runnable{
}
}
}
}
}
}
/**
*
* @param poolExecutor
* @return
*/
private
boolean
checkThreadCount
(
ThreadPoolExecutor
poolExecutor
)
{
int
activeCount
=
poolExecutor
.
getActiveCount
();
if
(
activeCount
>=
workerExecNums
)
{
logger
.
info
(
"thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource"
,
activeCount
,
workerExecNums
,
Constants
.
SLEEP_TIME_MILLIS
);
return
false
;
}
return
true
;
}
}
}
\ No newline at end of file
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
浏览文件 @
5b338361
...
@@ -387,7 +387,7 @@ public class SqlTask extends AbstractTask {
...
@@ -387,7 +387,7 @@ public class SqlTask extends AbstractTask {
String
showTypeName
=
sqlParameters
.
getShowType
().
replace
(
Constants
.
COMMA
,
""
).
trim
();
String
showTypeName
=
sqlParameters
.
getShowType
().
replace
(
Constants
.
COMMA
,
""
).
trim
();
if
(
EnumUtils
.
isValidEnum
(
ShowType
.
class
,
showTypeName
)){
if
(
EnumUtils
.
isValidEnum
(
ShowType
.
class
,
showTypeName
)){
Map
<
String
,
Object
>
mailResult
=
MailUtils
.
sendMails
(
receviersList
,
receviersCcList
,
title
,
content
,
ShowType
.
valueOf
(
showTypeName
));
Map
<
String
,
Object
>
mailResult
=
MailUtils
.
sendMails
(
receviersList
,
receviersCcList
,
title
,
content
,
ShowType
.
valueOf
(
showTypeName
));
if
(!(
Boolean
)
mailResult
.
get
(
Constants
.
STATUS
)){
if
(!(
Boolean
)
mailResult
.
get
(
cn
.
escheduler
.
common
.
Constants
.
STATUS
)){
throw
new
RuntimeException
(
"send mail failed!"
);
throw
new
RuntimeException
(
"send mail failed!"
);
}
}
}
else
{
}
else
{
...
...
escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue
浏览文件 @
5b338361
...
@@ -85,8 +85,6 @@
...
@@ -85,8 +85,6 @@
deep
:
true
,
deep
:
true
,
handler
()
{
handler
()
{
this
.
isActive
=
false
this
.
isActive
=
false
this
.
notifyGroupList
=
null
this
.
workerGroupList
=
null
this
.
$nextTick
(()
=>
(
this
.
isActive
=
true
))
this
.
$nextTick
(()
=>
(
this
.
isActive
=
true
))
}
}
}
}
...
...
escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue
浏览文件 @
5b338361
...
@@ -6,7 +6,7 @@
...
@@ -6,7 +6,7 @@
<div
class=
"row-title"
>
<div
class=
"row-title"
>
<div
class=
"left"
>
<div
class=
"left"
>
<span
class=
"sp"
>
IP:
{{
item
.
host
}}
</span>
<span
class=
"sp"
>
IP:
{{
item
.
host
}}
</span>
<span
class=
"sp"
>
{{
$t
(
'
P
ort
'
)
}}
:
{{
item
.
port
}}
</span>
<span
class=
"sp"
>
{{
$t
(
'
P
rocess Pid
'
)
}}
:
{{
item
.
port
}}
</span>
<span
class=
"sp"
>
{{
$t
(
'
Zk registration directory
'
)
}}
:
{{
item
.
zkDirectory
}}
</span>
<span
class=
"sp"
>
{{
$t
(
'
Zk registration directory
'
)
}}
:
{{
item
.
zkDirectory
}}
</span>
</div>
</div>
<div
class=
"right"
>
<div
class=
"right"
>
...
@@ -93,4 +93,4 @@
...
@@ -93,4 +93,4 @@
</
script
>
</
script
>
<
style
lang=
"scss"
rel=
"stylesheet/scss"
>
<
style
lang=
"scss"
rel=
"stylesheet/scss"
>
@import
"./servers"
;
@import
"./servers"
;
</
style
>
</
style
>
\ No newline at end of file
escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
浏览文件 @
5b338361
...
@@ -6,7 +6,7 @@
...
@@ -6,7 +6,7 @@
<div
class=
"row-title"
>
<div
class=
"row-title"
>
<div
class=
"left"
>
<div
class=
"left"
>
<span
class=
"sp"
>
IP:
{{
item
.
host
}}
</span>
<span
class=
"sp"
>
IP:
{{
item
.
host
}}
</span>
<span
class=
"sp"
>
{{
$t
(
'
P
ort
'
)
}}
:
{{
item
.
port
}}
</span>
<span
class=
"sp"
>
{{
$t
(
'
P
rocess Pid
'
)
}}
:
{{
item
.
port
}}
</span>
<span
class=
"sp"
>
{{
$t
(
'
Zk registration directory
'
)
}}
:
{{
item
.
zkDirectory
}}
</span>
<span
class=
"sp"
>
{{
$t
(
'
Zk registration directory
'
)
}}
:
{{
item
.
zkDirectory
}}
</span>
</div>
</div>
<div
class=
"right"
>
<div
class=
"right"
>
...
@@ -94,4 +94,4 @@
...
@@ -94,4 +94,4 @@
</
script
>
</
script
>
<
style
lang=
"scss"
rel=
"stylesheet/scss"
>
<
style
lang=
"scss"
rel=
"stylesheet/scss"
>
@import
"./servers"
;
@import
"./servers"
;
</
style
>
</
style
>
\ No newline at end of file
escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js
浏览文件 @
5b338361
...
@@ -37,7 +37,7 @@ let warningTypeList = [
...
@@ -37,7 +37,7 @@ let warningTypeList = [
]
]
const
isEmial
=
(
val
)
=>
{
const
isEmial
=
(
val
)
=>
{
let
regEmail
=
/^
([
a-zA-Z0-9
]
+
[
_|
\
_
|
\.]?)
*
[
a-zA-Z0-9
]
+@
([
a-zA-Z0-9
]
+
[
_|
\_
|
\.]?)
*
[
a-zA-Z0-9
]
+
\.[
a-zA-Z
]{2,3}
$/
// eslint-disable-line
let
regEmail
=
/^
([
a-zA-Z0-9
]
+
[
_|
\
-
|
\.]?)
*
[
a-zA-Z0-9
]
+@
([
a-zA-Z0-9
]
+
[
_|
\-
|
\.]?)
*
[
a-zA-Z0-9
]
+
\.[
a-zA-Z
]{2,3}
$/
// eslint-disable-line
return
regEmail
.
test
(
val
)
return
regEmail
.
test
(
val
)
}
}
...
...
escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
浏览文件 @
5b338361
...
@@ -95,17 +95,17 @@
...
@@ -95,17 +95,17 @@
shape=
"circle"
shape=
"circle"
size=
"xsmall"
size=
"xsmall"
data-toggle=
"tooltip"
data-toggle=
"tooltip"
:title=
"$t('Stop')"
:title=
"
item.state === 'STOP' ? $t('Recovery Suspend') :
$t('Stop')"
@
click=
"_stop(item)"
@
click=
"_stop(item
,$index
)"
icon=
"iconfont icon-zanting1
"
:icon=
"item.state === 'STOP' ? 'iconfont icon-ai06' : 'iconfont icon-zanting'
"
:disabled=
"item.state !== 'RUNNING_EXEUTION'"
></x-button>
:disabled=
"item.state !== 'RUNNING_EXEUTION'
&& item.state != 'STOP'
"
></x-button>
<x-button
type=
"warning"
<x-button
type=
"warning"
shape=
"circle"
shape=
"circle"
size=
"xsmall"
size=
"xsmall"
data-toggle=
"tooltip"
data-toggle=
"tooltip"
:title=
"item.state === 'PAUSE' ? $t('Recovery Suspend') : $t('Pause')"
:title=
"item.state === 'PAUSE' ? $t('Recovery Suspend') : $t('Pause')"
@
click=
"_suspend(item,$index)"
@
click=
"_suspend(item,$index)"
:icon=
"item.state === 'PAUSE' ? 'iconfont icon-ai06' : 'iconfont icon-zanting'"
:icon=
"item.state === 'PAUSE' ? 'iconfont icon-ai06' : 'iconfont icon-zanting
1
'"
:disabled=
"item.state !== 'RUNNING_EXEUTION' && item.state !== 'PAUSE'"
></x-button>
:disabled=
"item.state !== 'RUNNING_EXEUTION' && item.state !== 'PAUSE'"
></x-button>
<x-poptip
<x-poptip
:ref=
"'poptip-delete-' + $index"
:ref=
"'poptip-delete-' + $index"
...
@@ -155,7 +155,7 @@
...
@@ -155,7 +155,7 @@
shape=
"circle"
shape=
"circle"
size=
"xsmall"
size=
"xsmall"
disabled=
"true"
>
disabled=
"true"
>
{{item.count}}
s
{{item.count}}
</x-button>
</x-button>
<x-button
<x-button
v-show=
"buttonType !== 'run'"
v-show=
"buttonType !== 'run'"
...
@@ -173,7 +173,7 @@
...
@@ -173,7 +173,7 @@
shape=
"circle"
shape=
"circle"
size=
"xsmall"
size=
"xsmall"
disabled=
"true"
>
disabled=
"true"
>
{{item.count}}
s
{{item.count}}
</x-button>
</x-button>
<x-button
<x-button
v-show=
"buttonType !== 'store'"
v-show=
"buttonType !== 'store'"
...
@@ -185,26 +185,26 @@
...
@@ -185,26 +185,26 @@
</x-button>
</x-button>
<!--Stop-->
<!--Stop-->
<
x-button
<
!--<x-button-->
type=
"error"
<!--type="error"-->
shape=
"circle"
<!--shape="circle"-->
size=
"xsmall"
<!--size="xsmall"-->
icon=
"iconfont icon-zanting1"
<!--icon="iconfont icon-zanting1"-->
disabled=
"true"
>
<!--disabled="true">--
>
<
/x-button
>
<
!--</x-button>--
>
<!--倒计时 => Recovery Suspend/Pause-->
<!--倒计时 => Recovery Suspend/Pause-->
<x-button
<x-button
v-show=
"
item.state === 'PAUSE'
&& buttonType === 'suspend'"
v-show=
"
(item.state === 'PAUSE' || item.state == 'STOP')
&& buttonType === 'suspend'"
type=
"warning"
type=
"warning"
shape=
"circle"
shape=
"circle"
size=
"xsmall"
size=
"xsmall"
disabled=
"true"
>
disabled=
"true"
>
{{item.count}}
s
{{item.count}}
</x-button>
</x-button>
<!--Recovery Suspend-->
<!--Recovery Suspend-->
<x-button
<x-button
v-show=
"
item.state === 'PAUSE'
&& buttonType !== 'suspend'"
v-show=
"
(item.state === 'PAUSE' || item.state == 'STOP')
&& buttonType !== 'suspend'"
type=
"warning"
type=
"warning"
shape=
"circle"
shape=
"circle"
size=
"xsmall"
size=
"xsmall"
...
@@ -217,6 +217,15 @@
...
@@ -217,6 +217,15 @@
type=
"warning"
type=
"warning"
shape=
"circle"
shape=
"circle"
size=
"xsmall"
size=
"xsmall"
icon=
"iconfont icon-zanting1"
disabled=
"true"
>
</x-button>
<!--Stop-->
<x-button
v-show=
"item.state !== 'STOP'"
type=
"warning"
shape=
"circle"
size=
"xsmall"
icon=
"iconfont icon-zanting"
icon=
"iconfont icon-zanting"
disabled=
"true"
>
disabled=
"true"
>
</x-button>
</x-button>
...
@@ -362,11 +371,20 @@
...
@@ -362,11 +371,20 @@
* stop
* stop
* @param STOP
* @param STOP
*/
*/
_stop
(
item
)
{
_stop
(
item
,
index
)
{
this
.
_upExecutorsState
({
if
(
item
.
state
==
'
STOP
'
)
{
processInstanceId
:
item
.
id
,
this
.
_countDownFn
({
executeType
:
'
STOP
'
id
:
item
.
id
,
})
executeType
:
'
RECOVER_SUSPENDED_PROCESS
'
,
index
:
index
,
buttonType
:
'
suspend
'
})
}
else
{
this
.
_upExecutorsState
({
processInstanceId
:
item
.
id
,
executeType
:
'
STOP
'
})
}
},
},
/**
/**
* pause
* pause
...
@@ -383,7 +401,7 @@
...
@@ -383,7 +401,7 @@
}
else
{
}
else
{
this
.
_upExecutorsState
({
this
.
_upExecutorsState
({
processInstanceId
:
item
.
id
,
processInstanceId
:
item
.
id
,
executeType
:
item
.
state
===
'
PAUSE
'
?
'
RECOVER_SUSPENDED_PROCESS
'
:
'
PAUSE
'
executeType
:
'
PAUSE
'
})
})
}
}
},
},
...
@@ -435,7 +453,7 @@
...
@@ -435,7 +453,7 @@
if
(
data
.
length
)
{
if
(
data
.
length
)
{
_
.
map
(
data
,
v
=>
{
_
.
map
(
data
,
v
=>
{
v
.
disabled
=
true
v
.
disabled
=
true
v
.
count
=
10
v
.
count
=
9
})
})
}
}
return
data
return
data
...
...
escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue
浏览文件 @
5b338361
...
@@ -131,7 +131,7 @@
...
@@ -131,7 +131,7 @@
}
}
},
},
_verification
()
{
_verification
()
{
let
regEmail
=
/^
([
a-zA-Z0-9
]
+
[
_|
\
_
|
\.]?)
*
[
a-zA-Z0-9
]
+@
([
a-zA-Z0-9
]
+
[
_|
\_
|
\.]?)
*
[
a-zA-Z0-9
]
+
\.[
a-zA-Z
]{2,3}
$/
// eslint-disable-line
let
regEmail
=
/^
([
a-zA-Z0-9
]
+
[
_|
\
-
|
\.]?)
*
[
a-zA-Z0-9
]
+@
([
a-zA-Z0-9
]
+
[
_|
\-
|
\.]?)
*
[
a-zA-Z0-9
]
+
\.[
a-zA-Z
]{2,3}
$/
// eslint-disable-line
// Mobile phone number regular
// Mobile phone number regular
let
regPhone
=
/^1
(
3|4|5|6|7|8
)\d{9}
$/
;
// eslint-disable-line
let
regPhone
=
/^1
(
3|4|5|6|7|8
)\d{9}
$/
;
// eslint-disable-line
...
...
escheduler-ui/src/js/module/i18n/locale/zh_CN.js
浏览文件 @
5b338361
...
@@ -237,7 +237,7 @@ export default {
...
@@ -237,7 +237,7 @@ export default {
'
Recovery Failed
'
:
'
恢复失败
'
,
'
Recovery Failed
'
:
'
恢复失败
'
,
'
Stop
'
:
'
停止
'
,
'
Stop
'
:
'
停止
'
,
'
Pause
'
:
'
暂停
'
,
'
Pause
'
:
'
暂停
'
,
'
Recovery Suspend
'
:
'
恢复
暂停
'
,
'
Recovery Suspend
'
:
'
恢复
运行
'
,
'
Gantt
'
:
'
甘特图
'
,
'
Gantt
'
:
'
甘特图
'
,
'
Name
'
:
'
名称
'
,
'
Name
'
:
'
名称
'
,
'
Node Type
'
:
'
节点类型
'
,
'
Node Type
'
:
'
节点类型
'
,
...
@@ -282,7 +282,7 @@ export default {
...
@@ -282,7 +282,7 @@ export default {
'
Start Process
'
:
'
启动工作流
'
,
'
Start Process
'
:
'
启动工作流
'
,
'
Execute from the current node
'
:
'
从当前节点开始执行
'
,
'
Execute from the current node
'
:
'
从当前节点开始执行
'
,
'
Recover tolerance fault process
'
:
'
恢复被容错的工作流
'
,
'
Recover tolerance fault process
'
:
'
恢复被容错的工作流
'
,
'
Resume the suspension process
'
:
'
恢复
暂停
流程
'
,
'
Resume the suspension process
'
:
'
恢复
运行
流程
'
,
'
Execute from the failed nodes
'
:
'
从失败节点开始执行
'
,
'
Execute from the failed nodes
'
:
'
从失败节点开始执行
'
,
'
Complement Data
'
:
'
补数
'
,
'
Complement Data
'
:
'
补数
'
,
'
Scheduling execution
'
:
'
调度执行
'
,
'
Scheduling execution
'
:
'
调度执行
'
,
...
...
install.sh
浏览文件 @
5b338361
...
@@ -330,7 +330,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT
...
@@ -330,7 +330,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT
sed
-i
${
txt
}
"s#master.heartbeat.interval.*#master.heartbeat.interval=
${
masterHeartbeatInterval
}
#g"
conf/master.properties
sed
-i
${
txt
}
"s#master.heartbeat.interval.*#master.heartbeat.interval=
${
masterHeartbeatInterval
}
#g"
conf/master.properties
sed
-i
${
txt
}
"s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=
${
masterTaskCommitRetryTimes
}
#g"
conf/master.properties
sed
-i
${
txt
}
"s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=
${
masterTaskCommitRetryTimes
}
#g"
conf/master.properties
sed
-i
${
txt
}
"s#master.task.commit.interval.*#master.task.commit.interval=
${
masterTaskCommitInterval
}
#g"
conf/master.properties
sed
-i
${
txt
}
"s#master.task.commit.interval.*#master.task.commit.interval=
${
masterTaskCommitInterval
}
#g"
conf/master.properties
sed
-i
${
txt
}
"s#master.max.cpuload.avg.*#master.max.cpuload.avg=
${
masterMaxCpuLoadAvg
}
#g"
conf/master.properties
#
sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
sed
-i
${
txt
}
"s#master.reserved.memory.*#master.reserved.memory=
${
masterReservedMemory
}
#g"
conf/master.properties
sed
-i
${
txt
}
"s#master.reserved.memory.*#master.reserved.memory=
${
masterReservedMemory
}
#g"
conf/master.properties
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录