Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
wk1234123
DolphinScheduler
提交
bb786b28
DolphinScheduler
项目概览
wk1234123
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
5
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
bb786b28
编写于
6月 14, 2019
作者:
leon-baoliang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
change forbidden nodes run process.
上级
dff5a26a
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
162 addition
and
51 deletion
+162
-51
escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java
...-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java
+112
-0
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java
.../cn/escheduler/server/master/runner/MasterExecThread.java
+10
-51
escheduler-server/src/test/java/cn/escheduler/server/master/MasterCommandTest.java
...t/java/cn/escheduler/server/master/MasterCommandTest.java
+40
-0
未找到文件。
escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java
浏览文件 @
bb786b28
...
...
@@ -18,16 +18,22 @@ package cn.escheduler.dao.utils;
import
cn.escheduler.common.enums.TaskDependType
;
import
cn.escheduler.common.graph.DAG
;
import
cn.escheduler.common.model.TaskNode
;
import
cn.escheduler.common.model.TaskNodeRelation
;
import
cn.escheduler.common.process.ProcessDag
;
import
cn.escheduler.common.utils.JSONUtils
;
import
cn.escheduler.dao.model.ProcessData
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
/**
* dag tools
...
...
@@ -192,6 +198,24 @@ public class DagHelper {
return
processDag
;
}
/**
* parse the forbidden task nodes in process definition.
* @param processDefinitionJson
* @return
*/
public
static
Map
<
String
,
TaskNode
>
getForbiddenTaskNodeMaps
(
String
processDefinitionJson
){
Map
<
String
,
TaskNode
>
forbidTaskNodeMap
=
new
ConcurrentHashMap
<>();
ProcessData
processData
=
JSONUtils
.
parseObject
(
processDefinitionJson
,
ProcessData
.
class
);
List
<
TaskNode
>
taskNodeList
=
processData
.
getTasks
();
for
(
TaskNode
node
:
taskNodeList
){
if
(
node
.
isForbidden
()){
forbidTaskNodeMap
.
putIfAbsent
(
node
.
getName
(),
node
);
}
}
return
forbidTaskNodeMap
;
}
/**
* find node by node name
...
...
@@ -209,4 +233,92 @@ public class DagHelper {
}
return
null
;
}
/**
* get start vertex in one dag
* it would find the post node if the start vertex is forbidden running
* @param parentNodeName the previous node
* @param dag
* @return
*/
public
static
Collection
<
String
>
getStartVertex
(
String
parentNodeName
,
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
){
Collection
<
String
>
startVertexs
=
null
;
if
(
StringUtils
.
isNotEmpty
(
parentNodeName
)){
startVertexs
=
dag
.
getSubsequentNodes
(
parentNodeName
);
}
else
{
startVertexs
=
dag
.
getBeginNode
();
}
Collection
<
String
>
tmpStartVertexs
=
new
ArrayList
<>();
tmpStartVertexs
.
addAll
(
startVertexs
);
for
(
String
start
:
tmpStartVertexs
){
TaskNode
startNode
=
dag
.
getNode
(
start
);
if
(!
startNode
.
isForbidden
()){
continue
;
}
Collection
<
String
>
postNodes
=
getStartVertex
(
start
,
dag
);
for
(
String
post
:
postNodes
){
if
(
checkForbiddenPostCanSubmit
(
post
,
dag
)){
startVertexs
.
add
(
post
);
}
}
startVertexs
.
remove
(
start
);
}
return
startVertexs
;
}
/**
*
* @param postNodeName
* @param dag
* @return
*/
private
static
boolean
checkForbiddenPostCanSubmit
(
String
postNodeName
,
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
){
TaskNode
postNode
=
dag
.
getNode
(
postNodeName
);
List
<
String
>
dependList
=
postNode
.
getDepList
();
for
(
String
dependNodeName
:
dependList
){
TaskNode
dependNode
=
dag
.
getNode
(
dependNodeName
);
if
(!
dependNode
.
isForbidden
()){
return
false
;
}
}
return
true
;
}
/***
* generate dag graph
* @param processDag
* @return
*/
public
static
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
buildDagGraph
(
ProcessDag
processDag
)
{
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
=
new
DAG
<>();
/**
* add vertex
*/
if
(
CollectionUtils
.
isNotEmpty
(
processDag
.
getNodes
())){
for
(
TaskNode
node
:
processDag
.
getNodes
()){
dag
.
addNode
(
node
.
getName
(),
node
);
}
}
/**
* add edge
*/
if
(
CollectionUtils
.
isNotEmpty
(
processDag
.
getEdges
())){
for
(
TaskNodeRelation
edge
:
processDag
.
getEdges
()){
dag
.
addEdge
(
edge
.
getStartNode
(),
edge
.
getEndNode
());
}
}
return
dag
;
}
}
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java
浏览文件 @
bb786b28
...
...
@@ -79,6 +79,7 @@ public class MasterExecThread implements Runnable {
private
Map
<
String
,
TaskInstance
>
completeTaskList
=
new
ConcurrentHashMap
<>();
private
Map
<
String
,
TaskInstance
>
readyToSubmitTaskList
=
new
ConcurrentHashMap
<>();
private
Map
<
String
,
TaskInstance
>
dependFailedTask
=
new
ConcurrentHashMap
<>();
private
Map
<
String
,
TaskNode
>
forbiddenTaskList
=
new
ConcurrentHashMap
<>();
private
List
<
TaskInstance
>
recoverToleranceFaultTaskList
=
new
ArrayList
<>();
private
AlertManager
alertManager
=
new
AlertManager
();
...
...
@@ -269,6 +270,7 @@ public class MasterExecThread implements Runnable {
private
void
buildFlowDag
()
throws
Exception
{
recoverNodeIdList
=
getStartTaskInstanceList
(
processInstance
.
getCommandParam
());
forbiddenTaskList
=
DagHelper
.
getForbiddenTaskNodeMaps
(
processInstance
.
getProcessInstanceJson
());
// generate process to get DAG info
List
<
String
>
recoveryNameList
=
getRecoveryNodeNameList
();
List
<
String
>
startNodeNameList
=
parseStartNodeName
(
processInstance
.
getCommandParam
());
...
...
@@ -279,7 +281,8 @@ public class MasterExecThread implements Runnable {
return
;
}
// generate process dag
dag
=
buildDagGraph
(
processDag
);
dag
=
DagHelper
.
buildDagGraph
(
processDag
);
}
private
void
initTaskQueue
(){
...
...
@@ -411,24 +414,7 @@ public class MasterExecThread implements Runnable {
return
taskInstance
;
}
private
Collection
<
String
>
getStartVertex
(
String
parentNodeName
,
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
){
Collection
<
String
>
startVertex
=
null
;
if
(
StringUtils
.
isNotEmpty
(
parentNodeName
)){
startVertex
=
dag
.
getSubsequentNodes
(
parentNodeName
);
}
else
{
startVertex
=
dag
.
getBeginNode
();
}
for
(
String
start
:
startVertex
){
TaskNode
node
=
dag
.
getNode
(
start
);
if
(
node
.
isForbidden
()){
}
}
return
startVertex
;
}
/**
* get post task instance by node
...
...
@@ -440,10 +426,12 @@ public class MasterExecThread implements Runnable {
private
List
<
TaskInstance
>
getPostTaskInstanceByNode
(
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
,
String
parentNodeName
){
List
<
TaskInstance
>
postTaskList
=
new
ArrayList
<>();
Collection
<
String
>
startVertex
=
getStartVertex
(
parentNodeName
,
dag
);
Collection
<
String
>
startVertex
=
DagHelper
.
getStartVertex
(
parentNodeName
,
dag
);
if
(
startVertex
==
null
){
return
postTaskList
;
}
for
(
String
nodeName
:
startVertex
){
// encapsulation task instance
TaskInstance
taskInstance
=
createTaskInstance
(
processInstance
,
nodeName
,
dag
.
getNode
(
nodeName
),
parentNodeName
);
...
...
@@ -532,8 +520,8 @@ public class MasterExecThread implements Runnable {
List
<
String
>
depsNameList
=
taskNode
.
getDepList
();
for
(
String
depsNode
:
depsNameList
){
// dependencies must be
all complete
if
(!
completeTaskList
.
containsKey
(
depsNode
)){
// dependencies must be
fully completed or run prohibited
if
(!
completeTaskList
.
containsKey
(
depsNode
)
||
!
forbiddenTaskList
.
containsKey
(
depsNode
)
){
return
DependResult
.
WAITING
;
}
ExecutionStatus
taskState
=
completeTaskList
.
get
(
depsNode
).
getState
();
...
...
@@ -919,35 +907,6 @@ public class MasterExecThread implements Runnable {
}
}
/***
* generate dag graph
* @param processDag
* @return
*/
public
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
buildDagGraph
(
ProcessDag
processDag
)
{
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
=
new
DAG
<>();
/**
* add vertex
*/
if
(
CollectionUtils
.
isNotEmpty
(
processDag
.
getNodes
())){
for
(
TaskNode
node
:
processDag
.
getNodes
()){
dag
.
addNode
(
node
.
getName
(),
node
);
}
}
/**
* add edge
*/
if
(
CollectionUtils
.
isNotEmpty
(
processDag
.
getEdges
())){
for
(
TaskNodeRelation
edge
:
processDag
.
getEdges
()){
dag
.
addEdge
(
edge
.
getStartNode
(),
edge
.
getEndNode
());
}
}
return
dag
;
}
/**
* whether the retry interval is timed out
* @param taskInstance
...
...
escheduler-server/src/test/java/cn/escheduler/server/master/MasterCommandTest.java
浏览文件 @
bb786b28
...
...
@@ -18,15 +18,27 @@ package cn.escheduler.server.master;
import
cn.escheduler.common.enums.CommandType
;
import
cn.escheduler.common.enums.FailureStrategy
;
import
cn.escheduler.common.enums.TaskDependType
;
import
cn.escheduler.common.enums.WarningType
;
import
cn.escheduler.common.graph.DAG
;
import
cn.escheduler.common.model.TaskNode
;
import
cn.escheduler.common.model.TaskNodeRelation
;
import
cn.escheduler.common.process.ProcessDag
;
import
cn.escheduler.dao.datasource.ConnectionFactory
;
import
cn.escheduler.dao.mapper.CommandMapper
;
import
cn.escheduler.dao.mapper.ProcessDefinitionMapper
;
import
cn.escheduler.dao.model.Command
;
import
cn.escheduler.dao.model.ProcessDefinition
;
import
cn.escheduler.dao.utils.DagHelper
;
import
org.codehaus.jackson.SerializableString
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.ArrayList
;
import
java.util.Collection
;
/**
* master test
*/
...
...
@@ -36,9 +48,14 @@ public class MasterCommandTest {
private
CommandMapper
commandMapper
;
private
ProcessDefinitionMapper
processDefinitionMapper
;
@Before
public
void
before
(){
commandMapper
=
ConnectionFactory
.
getSqlSession
().
getMapper
(
CommandMapper
.
class
);
processDefinitionMapper
=
ConnectionFactory
.
getSqlSession
().
getMapper
(
ProcessDefinitionMapper
.
class
);
}
...
...
@@ -104,4 +121,27 @@ public class MasterCommandTest {
}
@Test
public
void
testDagHelper
(){
ProcessDefinition
processDefinition
=
processDefinitionMapper
.
queryByDefineId
(
19
);
try
{
ProcessDag
processDag
=
DagHelper
.
generateFlowDag
(
processDefinition
.
getProcessDefinitionJson
(),
new
ArrayList
<>(),
new
ArrayList
<>(),
TaskDependType
.
TASK_POST
);
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
=
DagHelper
.
buildDagGraph
(
processDag
);
Collection
<
String
>
start
=
DagHelper
.
getStartVertex
(
"1"
,
dag
);
System
.
out
.
println
(
start
.
toString
());
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录