Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
pentaLiker
DolphinScheduler
提交
1fac6237
DolphinScheduler
项目概览
pentaLiker
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
1fac6237
编写于
11月 12, 2020
作者:
B
baoliang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor code style
上级
0ba8a6bc
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
73 addition
and
58 deletion
+73
-58
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
...java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+73
-58
未找到文件。
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
浏览文件 @
1fac6237
...
...
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.utils.*;
import
org.apache.dolphinscheduler.common.utils.StringUtils
;
import
org.apache.dolphinscheduler.dao.entity.ProcessData
;
import
org.apache.dolphinscheduler.dao.entity.TaskInstance
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -46,6 +47,7 @@ public class DagHelper {
/**
* generate flow node relation list by task node list;
* Edges that are not in the task Node List will not be added to the result
*
* @param taskNodeList taskNodeList
* @return task node relation list
*/
...
...
@@ -67,10 +69,11 @@ public class DagHelper {
/**
* generate task nodes needed by dag
* @param taskNodeList taskNodeList
* @param startNodeNameList startNodeNameList
*
* @param taskNodeList taskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeNameList recoveryNodeNameList
* @param taskDependType taskDependType
* @param taskDependType
taskDependType
* @return task node list
*/
public
static
List
<
TaskNode
>
generateFlowNodeListByStartNode
(
List
<
TaskNode
>
taskNodeList
,
List
<
String
>
startNodeNameList
,
...
...
@@ -78,8 +81,8 @@ public class DagHelper {
List
<
TaskNode
>
destFlowNodeList
=
new
ArrayList
<>();
List
<
String
>
startNodeList
=
startNodeNameList
;
if
(
taskDependType
!=
TaskDependType
.
TASK_POST
&&
CollectionUtils
.
isEmpty
(
startNodeList
)){
if
(
taskDependType
!=
TaskDependType
.
TASK_POST
&&
CollectionUtils
.
isEmpty
(
startNodeList
))
{
logger
.
error
(
"start node list is empty! cannot continue run the process "
);
return
destFlowNodeList
;
}
...
...
@@ -127,7 +130,8 @@ public class DagHelper {
/**
* find all the nodes that depended on the start node
* @param startNode startNode
*
* @param startNode startNode
* @param taskNodeList taskNodeList
* @return task node list
*/
...
...
@@ -151,9 +155,10 @@ public class DagHelper {
/**
* find all nodes that start nodes depend on.
* @param startNode startNode
*
* @param startNode startNode
* @param recoveryNodeNameList recoveryNodeNameList
* @param taskNodeList taskNodeList
* @param taskNodeList
taskNodeList
* @return task node list
*/
private
static
List
<
TaskNode
>
getFlowNodeListPre
(
TaskNode
startNode
,
List
<
String
>
recoveryNodeNameList
,
List
<
TaskNode
>
taskNodeList
,
List
<
String
>
visitedNodeNameList
)
{
...
...
@@ -185,10 +190,11 @@ public class DagHelper {
/**
* generate dag by start nodes and recovery nodes
*
* @param processDefinitionJson processDefinitionJson
* @param startNodeNameList startNodeNameList
* @param recoveryNodeNameList recoveryNodeNameList
* @param depNodeType depNodeType
* @param startNodeNameList
startNodeNameList
* @param recoveryNodeNameList
recoveryNodeNameList
* @param depNodeType
depNodeType
* @return process dag
* @throws Exception if error throws Exception
*/
...
...
@@ -215,10 +221,11 @@ public class DagHelper {
/**
* parse the forbidden task nodes in process definition.
*
* @param processDefinitionJson processDefinitionJson
* @return task node map
*/
public
static
Map
<
String
,
TaskNode
>
getForbiddenTaskNodeMaps
(
String
processDefinitionJson
){
public
static
Map
<
String
,
TaskNode
>
getForbiddenTaskNodeMaps
(
String
processDefinitionJson
)
{
Map
<
String
,
TaskNode
>
forbidTaskNodeMap
=
new
ConcurrentHashMap
<>();
ProcessData
processData
=
JSONUtils
.
parseObject
(
processDefinitionJson
,
ProcessData
.
class
);
...
...
@@ -226,8 +233,8 @@ public class DagHelper {
if
(
null
!=
processData
)
{
taskNodeList
=
processData
.
getTasks
();
}
for
(
TaskNode
node
:
taskNodeList
)
{
if
(
node
.
isForbidden
())
{
for
(
TaskNode
node
:
taskNodeList
)
{
if
(
node
.
isForbidden
())
{
forbidTaskNodeMap
.
putIfAbsent
(
node
.
getName
(),
node
);
}
}
...
...
@@ -237,8 +244,9 @@ public class DagHelper {
/**
* find node by node name
*
* @param nodeDetails nodeDetails
* @param nodeName nodeName
* @param nodeName
nodeName
* @return task node
*/
public
static
TaskNode
findNodeByName
(
List
<
TaskNode
>
nodeDetails
,
String
nodeName
)
{
...
...
@@ -252,8 +260,9 @@ public class DagHelper {
/**
* the task can be submit when all the depends nodes are forbidden or complete
* @param taskNode taskNode
* @param dag dag
*
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return can submit
*/
...
...
@@ -282,13 +291,14 @@ public class DagHelper {
* parse the successor nodes of previous node.
* this function parse the condition node to find the right branch.
* also check all the depends nodes forbidden or complete
*
* @param preNodeName
* @return successor nodes
*/
public
static
Set
<
String
>
parsePostNodes
(
String
preNodeName
,
Map
<
String
,
TaskNode
>
skipTaskNodeList
,
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
,
Map
<
String
,
TaskInstance
>
completeTaskList
)
{
Map
<
String
,
TaskNode
>
skipTaskNodeList
,
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
,
Map
<
String
,
TaskInstance
>
completeTaskList
)
{
Set
<
String
>
postNodeList
=
new
HashSet
<>();
Collection
<
String
>
startVertexes
=
new
ArrayList
<>();
if
(
preNodeName
==
null
)
{
...
...
@@ -302,7 +312,7 @@ public class DagHelper {
for
(
String
subsequent
:
startVertexes
)
{
TaskNode
taskNode
=
dag
.
getNode
(
subsequent
);
if
(
isTaskNodeNeedSkip
(
taskNode
,
skipTaskNodeList
))
{
setTaskNodeSkip
(
subsequent
,
dag
,
completeTaskList
,
skipTaskNodeList
);
setTaskNodeSkip
(
subsequent
,
dag
,
completeTaskList
,
skipTaskNodeList
);
continue
;
}
if
(!
DagHelper
.
allDependsForbiddenOrEnd
(
taskNode
,
dag
,
skipTaskNodeList
,
completeTaskList
))
{
...
...
@@ -319,17 +329,18 @@ public class DagHelper {
/**
* if all of the task dependence are skipped, skip it too.
*
* @param taskNode
* @return
*/
private
static
boolean
isTaskNodeNeedSkip
(
TaskNode
taskNode
,
Map
<
String
,
TaskNode
>
skipTaskNodeList
)
{
if
(
CollectionUtils
.
isEmpty
(
taskNode
.
getDepList
()))
{
Map
<
String
,
TaskNode
>
skipTaskNodeList
)
{
if
(
CollectionUtils
.
isEmpty
(
taskNode
.
getDepList
()))
{
return
false
;
}
for
(
String
depNode
:
taskNode
.
getDepList
())
{
if
(!
skipTaskNodeList
.
containsKey
(
depNode
))
{
for
(
String
depNode
:
taskNode
.
getDepList
())
{
if
(!
skipTaskNodeList
.
containsKey
(
depNode
))
{
return
false
;
}
}
...
...
@@ -338,37 +349,38 @@ public class DagHelper {
/**
* parse condition task find the branch process
* set skip flag for another one.
* parse condition task find the branch process
* set skip flag for another one.
*
* @param nodeName
* @return
*/
public
static
List
<
String
>
parseConditionTask
(
String
nodeName
,
Map
<
String
,
TaskNode
>
skipTaskNodeList
,
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
,
Map
<
String
,
TaskInstance
>
completeTaskList
)
{
Map
<
String
,
TaskNode
>
skipTaskNodeList
,
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
,
Map
<
String
,
TaskInstance
>
completeTaskList
)
{
List
<
String
>
conditionTaskList
=
new
ArrayList
<>();
TaskNode
taskNode
=
dag
.
getNode
(
nodeName
);
if
(!
taskNode
.
isConditionsTask
()){
if
(!
taskNode
.
isConditionsTask
())
{
return
conditionTaskList
;
}
if
(!
completeTaskList
.
containsKey
(
nodeName
)){
if
(!
completeTaskList
.
containsKey
(
nodeName
))
{
return
conditionTaskList
;
}
TaskInstance
taskInstance
=
completeTaskList
.
get
(
nodeName
);
ConditionsParameters
conditionsParameters
=
JSONUtils
.
parseObject
(
taskNode
.
getConditionResult
(),
ConditionsParameters
.
class
);
List
<
String
>
skipNodeList
=
new
ArrayList
<>();
if
(
taskInstance
.
getState
().
typeIsSuccess
())
{
if
(
taskInstance
.
getState
().
typeIsSuccess
())
{
conditionTaskList
=
conditionsParameters
.
getSuccessNode
();
skipNodeList
=
conditionsParameters
.
getFailedNode
();
}
else
if
(
taskInstance
.
getState
().
typeIsFailure
())
{
}
else
if
(
taskInstance
.
getState
().
typeIsFailure
())
{
conditionTaskList
=
conditionsParameters
.
getFailedNode
();
skipNodeList
=
conditionsParameters
.
getSuccessNode
();
}
else
{
}
else
{
conditionTaskList
.
add
(
nodeName
);
}
for
(
String
failedNode
:
skipNodeList
)
{
for
(
String
failedNode
:
skipNodeList
)
{
setTaskNodeSkip
(
failedNode
,
dag
,
completeTaskList
,
skipTaskNodeList
);
}
return
conditionTaskList
;
...
...
@@ -376,27 +388,27 @@ public class DagHelper {
/**
* set task node and the post nodes skip flag
*
* @param skipNodeName
* @param dag
* @param completeTaskList
* @param skipTaskNodeList
*/
private
static
void
setTaskNodeSkip
(
String
skipNodeName
,
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
,
Map
<
String
,
TaskInstance
>
completeTaskList
,
Map
<
String
,
TaskNode
>
skipTaskNodeList
)
{
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
,
Map
<
String
,
TaskInstance
>
completeTaskList
,
Map
<
String
,
TaskNode
>
skipTaskNodeList
)
{
skipTaskNodeList
.
putIfAbsent
(
skipNodeName
,
dag
.
getNode
(
skipNodeName
));
Collection
<
String
>
postNodeList
=
dag
.
getSubsequentNodes
(
skipNodeName
);
for
(
String
post
:
postNodeList
)
{
for
(
String
post
:
postNodeList
)
{
TaskNode
postNode
=
dag
.
getNode
(
post
);
if
(
isTaskNodeNeedSkip
(
postNode
,
skipTaskNodeList
))
{
if
(
isTaskNodeNeedSkip
(
postNode
,
skipTaskNodeList
))
{
setTaskNodeSkip
(
post
,
dag
,
completeTaskList
,
skipTaskNodeList
);
}
}
}
/***
* build dag graph
* @param processDag processDag
...
...
@@ -404,19 +416,19 @@ public class DagHelper {
*/
public
static
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
buildDagGraph
(
ProcessDag
processDag
)
{
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
=
new
DAG
<>();
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
=
new
DAG
<>();
//add vertex
if
(
CollectionUtils
.
isNotEmpty
(
processDag
.
getNodes
())){
for
(
TaskNode
node
:
processDag
.
getNodes
()){
dag
.
addNode
(
node
.
getName
(),
node
);
if
(
CollectionUtils
.
isNotEmpty
(
processDag
.
getNodes
()))
{
for
(
TaskNode
node
:
processDag
.
getNodes
())
{
dag
.
addNode
(
node
.
getName
(),
node
);
}
}
//add edge
if
(
CollectionUtils
.
isNotEmpty
(
processDag
.
getEdges
())){
for
(
TaskNodeRelation
edge
:
processDag
.
getEdges
()){
dag
.
addEdge
(
edge
.
getStartNode
(),
edge
.
getEndNode
());
if
(
CollectionUtils
.
isNotEmpty
(
processDag
.
getEdges
()))
{
for
(
TaskNodeRelation
edge
:
processDag
.
getEdges
())
{
dag
.
addEdge
(
edge
.
getStartNode
(),
edge
.
getEndNode
());
}
}
return
dag
;
...
...
@@ -424,6 +436,7 @@ public class DagHelper {
/**
* get process dag
*
* @param taskNodeList task node list
* @return Process dag
*/
...
...
@@ -451,21 +464,22 @@ public class DagHelper {
/**
* is there have conditions after the parent node
*
* @param parentNodeName
* @return
*/
public
static
boolean
haveConditionsAfterNode
(
String
parentNodeName
,
DAG
<
String
,
TaskNode
,
TaskNodeRelation
>
dag
)
{
)
{
boolean
result
=
false
;
Set
<
String
>
subsequentNodes
=
dag
.
getSubsequentNodes
(
parentNodeName
);
if
(
CollectionUtils
.
isEmpty
(
subsequentNodes
))
{
if
(
CollectionUtils
.
isEmpty
(
subsequentNodes
))
{
return
result
;
}
for
(
String
nodeName
:
subsequentNodes
)
{
for
(
String
nodeName
:
subsequentNodes
)
{
TaskNode
taskNode
=
dag
.
getNode
(
nodeName
);
List
<
String
>
preTasksList
=
JSONUtils
.
toList
(
taskNode
.
getPreTasks
(),
String
.
class
);
if
(
preTasksList
.
contains
(
parentNodeName
)
&&
taskNode
.
isConditionsTask
())
{
if
(
preTasksList
.
contains
(
parentNodeName
)
&&
taskNode
.
isConditionsTask
())
{
return
true
;
}
}
...
...
@@ -474,19 +488,20 @@ public class DagHelper {
/**
* is there have conditions after the parent node
*
* @param parentNodeName
* @return
*/
public
static
boolean
haveConditionsAfterNode
(
String
parentNodeName
,
List
<
TaskNode
>
taskNodes
){
)
{
boolean
result
=
false
;
if
(
CollectionUtils
.
isEmpty
(
taskNodes
))
{
if
(
CollectionUtils
.
isEmpty
(
taskNodes
))
{
return
result
;
}
for
(
TaskNode
taskNode
:
taskNodes
)
{
for
(
TaskNode
taskNode
:
taskNodes
)
{
List
<
String
>
preTasksList
=
JSONUtils
.
toList
(
taskNode
.
getPreTasks
(),
String
.
class
);
if
(
preTasksList
.
contains
(
parentNodeName
)
&&
taskNode
.
isConditionsTask
())
{
if
(
preTasksList
.
contains
(
parentNodeName
)
&&
taskNode
.
isConditionsTask
())
{
return
true
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录