Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
pentaLiker
DolphinScheduler
提交
11443602
DolphinScheduler
项目概览
pentaLiker
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
11443602
编写于
11月 05, 2021
作者:
J
JinYong Li
提交者:
GitHub
11月 05, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add convert dependent/conditions (#6710)
上级
b8d0f06a
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
99 addition
and
11 deletion
+99
-11
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
...a/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+99
-11
未找到文件。
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
浏览文件 @
11443602
...
...
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.ConditionType;
import
org.apache.dolphinscheduler.common.enums.DbType
;
import
org.apache.dolphinscheduler.common.enums.Flag
;
import
org.apache.dolphinscheduler.common.enums.Priority
;
import
org.apache.dolphinscheduler.common.enums.TaskType
;
import
org.apache.dolphinscheduler.common.enums.TimeoutFlag
;
import
org.apache.dolphinscheduler.common.process.ResourceInfo
;
import
org.apache.dolphinscheduler.common.task.TaskTimeoutParameter
;
...
...
@@ -54,6 +55,7 @@ import java.util.Date;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Optional
;
import
java.util.stream.Collectors
;
import
javax.sql.DataSource
;
...
...
@@ -61,6 +63,8 @@ import javax.sql.DataSource;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.fasterxml.jackson.databind.JsonNode
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ArrayNode
;
import
com.fasterxml.jackson.databind.node.JsonNodeFactory
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
...
...
@@ -599,7 +603,9 @@ public abstract class UpgradeDao extends AbstractBaseDao {
List
<
ProcessDefinitionLog
>
processDefinitionLogs
=
new
ArrayList
<>();
List
<
ProcessTaskRelationLog
>
processTaskRelationLogs
=
new
ArrayList
<>();
List
<
TaskDefinitionLog
>
taskDefinitionLogs
=
new
ArrayList
<>();
splitProcessDefinitionJson
(
processDefinitions
,
processDefinitionJsonMap
,
processDefinitionLogs
,
processTaskRelationLogs
,
taskDefinitionLogs
);
Map
<
Integer
,
Map
<
Long
,
Map
<
String
,
Long
>>>
processTaskMap
=
new
HashMap
<>();
splitProcessDefinitionJson
(
processDefinitions
,
processDefinitionJsonMap
,
processDefinitionLogs
,
processTaskRelationLogs
,
taskDefinitionLogs
,
processTaskMap
);
convertDependence
(
taskDefinitionLogs
,
projectIdCodeMap
,
processTaskMap
);
// execute json split
jsonSplitDao
.
executeJsonSplitProcessDefinition
(
dataSource
.
getConnection
(),
processDefinitionLogs
);
...
...
@@ -614,7 +620,8 @@ public abstract class UpgradeDao extends AbstractBaseDao {
Map
<
Integer
,
String
>
processDefinitionJsonMap
,
List
<
ProcessDefinitionLog
>
processDefinitionLogs
,
List
<
ProcessTaskRelationLog
>
processTaskRelationLogs
,
List
<
TaskDefinitionLog
>
taskDefinitionLogs
)
throws
Exception
{
List
<
TaskDefinitionLog
>
taskDefinitionLogs
,
Map
<
Integer
,
Map
<
Long
,
Map
<
String
,
Long
>>>
processTaskMap
)
throws
Exception
{
Map
<
Integer
,
ProcessDefinition
>
processDefinitionMap
=
processDefinitions
.
stream
()
.
collect
(
Collectors
.
toMap
(
ProcessDefinition:
:
getId
,
processDefinition
->
processDefinition
));
Date
now
=
new
Date
();
...
...
@@ -634,6 +641,8 @@ public abstract class UpgradeDao extends AbstractBaseDao {
Map
<
String
,
Long
>
taskIdCodeMap
=
new
HashMap
<>();
Map
<
String
,
List
<
String
>>
taskNamePreMap
=
new
HashMap
<>();
Map
<
String
,
Long
>
taskNameCodeMap
=
new
HashMap
<>();
Map
<
Long
,
Map
<
String
,
Long
>>
processCodeTaskNameCodeMap
=
new
HashMap
<>();
List
<
TaskDefinitionLog
>
taskDefinitionLogList
=
new
ArrayList
<>();
ArrayNode
tasks
=
JSONUtils
.
parseArray
(
jsonObject
.
get
(
"tasks"
).
toString
());
for
(
int
i
=
0
;
i
<
tasks
.
size
();
i
++)
{
ObjectNode
task
=
(
ObjectNode
)
tasks
.
path
(
i
);
...
...
@@ -647,7 +656,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
}
param
.
put
(
"conditionResult"
,
task
.
get
(
"conditionResult"
));
param
.
put
(
"dependence"
,
task
.
get
(
"dependence"
));
taskDefinitionLog
.
setTaskParams
(
param
.
toString
());
taskDefinitionLog
.
setTaskParams
(
param
.
asText
());
}
TaskTimeoutParameter
timeout
=
JSONUtils
.
parseObject
(
JSONUtils
.
toJsonString
(
task
.
get
(
"timeout"
)),
TaskTimeoutParameter
.
class
);
if
(
timeout
!=
null
)
{
...
...
@@ -655,15 +664,15 @@ public abstract class UpgradeDao extends AbstractBaseDao {
taskDefinitionLog
.
setTimeoutFlag
(
timeout
.
getEnable
()
?
TimeoutFlag
.
OPEN
:
TimeoutFlag
.
CLOSE
);
taskDefinitionLog
.
setTimeoutNotifyStrategy
(
timeout
.
getStrategy
());
}
taskDefinitionLog
.
setDescription
(
task
.
get
(
"description"
).
toString
());
taskDefinitionLog
.
setFlag
(
Constants
.
FLOWNODE_RUN_FLAG_NORMAL
.
equals
(
task
.
get
(
"runFlag"
).
toString
())
?
Flag
.
YES
:
Flag
.
NO
);
taskDefinitionLog
.
setTaskType
(
task
.
get
(
"type"
).
toString
());
taskDefinitionLog
.
setDescription
(
task
.
get
(
"description"
).
asText
());
taskDefinitionLog
.
setFlag
(
Constants
.
FLOWNODE_RUN_FLAG_NORMAL
.
equals
(
task
.
get
(
"runFlag"
).
asText
())
?
Flag
.
YES
:
Flag
.
NO
);
taskDefinitionLog
.
setTaskType
(
task
.
get
(
"type"
).
asText
());
taskDefinitionLog
.
setFailRetryInterval
(
task
.
get
(
"retryInterval"
).
asInt
());
taskDefinitionLog
.
setFailRetryTimes
(
task
.
get
(
"maxRetryTimes"
).
asInt
());
taskDefinitionLog
.
setTaskPriority
(
JSONUtils
.
parseObject
(
JSONUtils
.
toJsonString
(
task
.
get
(
"taskInstancePriority"
)),
Priority
.
class
));
String
name
=
task
.
get
(
"name"
).
toString
();
String
name
=
task
.
get
(
"name"
).
asText
();
taskDefinitionLog
.
setName
(
name
);
taskDefinitionLog
.
setWorkerGroup
(
task
.
get
(
"workerGroup"
).
toString
());
taskDefinitionLog
.
setWorkerGroup
(
task
.
get
(
"workerGroup"
).
asText
());
long
taskCode
=
SnowFlakeUtils
.
getInstance
().
nextId
();
taskDefinitionLog
.
setCode
(
taskCode
);
taskDefinitionLog
.
setVersion
(
Constants
.
VERSION_FIRST
);
...
...
@@ -675,12 +684,14 @@ public abstract class UpgradeDao extends AbstractBaseDao {
taskDefinitionLog
.
setOperateTime
(
now
);
taskDefinitionLog
.
setCreateTime
(
now
);
taskDefinitionLog
.
setUpdateTime
(
now
);
taskDefinitionLog
s
.
add
(
taskDefinitionLog
);
taskIdCodeMap
.
put
(
task
.
get
(
"id"
).
toString
(),
taskCode
);
List
<
String
>
preTasks
=
JSONUtils
.
toList
(
task
.
get
(
"preTasks"
).
toString
(),
String
.
class
);
taskDefinitionLog
List
.
add
(
taskDefinitionLog
);
taskIdCodeMap
.
put
(
task
.
get
(
"id"
).
asText
(),
taskCode
);
List
<
String
>
preTasks
=
JSONUtils
.
toList
(
task
.
get
(
"preTasks"
).
asText
(),
String
.
class
);
taskNamePreMap
.
put
(
name
,
preTasks
);
taskNameCodeMap
.
put
(
name
,
taskCode
);
}
convertConditions
(
taskDefinitionLogList
,
taskNameCodeMap
);
taskDefinitionLogs
.
addAll
(
taskDefinitionLogList
);
processDefinition
.
setLocations
(
convertLocations
(
processDefinition
.
getLocations
(),
taskIdCodeMap
));
ProcessDefinitionLog
processDefinitionLog
=
new
ProcessDefinitionLog
(
processDefinition
);
processDefinitionLog
.
setOperator
(
1
);
...
...
@@ -688,6 +699,45 @@ public abstract class UpgradeDao extends AbstractBaseDao {
processDefinitionLog
.
setUpdateTime
(
now
);
processDefinitionLogs
.
add
(
processDefinitionLog
);
handleProcessTaskRelation
(
taskNamePreMap
,
taskNameCodeMap
,
processDefinition
,
processTaskRelationLogs
);
processCodeTaskNameCodeMap
.
put
(
processDefinition
.
getCode
(),
taskNameCodeMap
);
processTaskMap
.
put
(
entry
.
getKey
(),
processCodeTaskNameCodeMap
);
}
}
public
void
convertConditions
(
List
<
TaskDefinitionLog
>
taskDefinitionLogList
,
Map
<
String
,
Long
>
taskNameCodeMap
)
throws
Exception
{
for
(
TaskDefinitionLog
taskDefinitionLog
:
taskDefinitionLogList
)
{
if
(
TaskType
.
CONDITIONS
.
getDesc
().
equals
(
taskDefinitionLog
.
getTaskType
()))
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
ObjectNode
taskParams
=
JSONUtils
.
parseObject
(
taskDefinitionLog
.
getTaskParams
());
// reset conditionResult
ObjectNode
conditionResult
=
(
ObjectNode
)
taskParams
.
get
(
"conditionResult"
);
List
<
String
>
successNode
=
JSONUtils
.
toList
(
conditionResult
.
get
(
"successNode"
).
toString
(),
String
.
class
);
List
<
Long
>
nodeCode
=
new
ArrayList
<>();
successNode
.
forEach
(
node
->
nodeCode
.
add
(
taskNameCodeMap
.
get
(
node
)));
conditionResult
.
set
(
"successNode"
,
objectMapper
.
readTree
(
objectMapper
.
writeValueAsString
(
nodeCode
)));
List
<
String
>
failedNode
=
JSONUtils
.
toList
(
conditionResult
.
get
(
"failedNode"
).
toString
(),
String
.
class
);
nodeCode
.
clear
();
failedNode
.
forEach
(
node
->
nodeCode
.
add
(
taskNameCodeMap
.
get
(
node
)));
conditionResult
.
set
(
"failedNode"
,
objectMapper
.
readTree
(
objectMapper
.
writeValueAsString
(
nodeCode
)));
// reset dependItemList
ObjectNode
dependence
=
(
ObjectNode
)
taskParams
.
get
(
"dependence"
);
ArrayNode
dependTaskList
=
JSONUtils
.
parseArray
(
JSONUtils
.
toJsonString
(
dependence
.
get
(
"dependTaskList"
)));
for
(
int
i
=
0
;
i
<
dependTaskList
.
size
();
i
++)
{
ObjectNode
dependTask
=
(
ObjectNode
)
dependTaskList
.
path
(
i
);
ArrayNode
dependItemList
=
JSONUtils
.
parseArray
(
JSONUtils
.
toJsonString
(
dependTask
.
get
(
"dependItemList"
)));
for
(
int
j
=
0
;
j
<
dependItemList
.
size
();
j
++)
{
ObjectNode
dependItem
=
(
ObjectNode
)
dependItemList
.
path
(
j
);
JsonNode
depTasks
=
dependItem
.
get
(
"depTasks"
);
dependItem
.
put
(
"depTaskCode"
,
taskNameCodeMap
.
get
(
depTasks
.
asText
()));
dependItem
.
remove
(
"depTasks"
);
dependItemList
.
set
(
j
,
dependItem
);
}
dependTask
.
put
(
"dependItemList"
,
dependItemList
);
dependTaskList
.
set
(
i
,
dependTask
);
}
dependence
.
put
(
"dependTaskList"
,
dependTaskList
);
taskDefinitionLog
.
setTaskParams
(
JSONUtils
.
toJsonString
(
taskParams
));
}
}
}
...
...
@@ -709,6 +759,44 @@ public abstract class UpgradeDao extends AbstractBaseDao {
return
jsonNodes
.
toString
();
}
public
void
convertDependence
(
List
<
TaskDefinitionLog
>
taskDefinitionLogs
,
Map
<
Integer
,
Long
>
projectIdCodeMap
,
Map
<
Integer
,
Map
<
Long
,
Map
<
String
,
Long
>>>
processTaskMap
)
{
for
(
TaskDefinitionLog
taskDefinitionLog
:
taskDefinitionLogs
)
{
if
(
TaskType
.
DEPENDENT
.
getDesc
().
equals
(
taskDefinitionLog
.
getTaskType
()))
{
ObjectNode
taskParams
=
JSONUtils
.
parseObject
(
taskDefinitionLog
.
getTaskParams
());
ObjectNode
dependence
=
(
ObjectNode
)
taskParams
.
get
(
"dependence"
);
ArrayNode
dependTaskList
=
JSONUtils
.
parseArray
(
JSONUtils
.
toJsonString
(
dependence
.
get
(
"dependTaskList"
)));
for
(
int
i
=
0
;
i
<
dependTaskList
.
size
();
i
++)
{
ObjectNode
dependTask
=
(
ObjectNode
)
dependTaskList
.
path
(
i
);
ArrayNode
dependItemList
=
JSONUtils
.
parseArray
(
JSONUtils
.
toJsonString
(
dependTask
.
get
(
"dependItemList"
)));
for
(
int
j
=
0
;
j
<
dependItemList
.
size
();
j
++)
{
ObjectNode
dependItem
=
(
ObjectNode
)
dependItemList
.
path
(
j
);
dependItem
.
put
(
"projectCode"
,
projectIdCodeMap
.
get
(
dependItem
.
get
(
"projectId"
).
asInt
()));
int
definitionId
=
dependItem
.
get
(
"definitionId"
).
asInt
();
Map
<
Long
,
Map
<
String
,
Long
>>
processCodeTaskNameCodeMap
=
processTaskMap
.
get
(
definitionId
);
Optional
<
Map
.
Entry
<
Long
,
Map
<
String
,
Long
>>>
mapEntry
=
processCodeTaskNameCodeMap
.
entrySet
().
stream
().
findFirst
();
if
(
mapEntry
.
isPresent
())
{
Map
.
Entry
<
Long
,
Map
<
String
,
Long
>>
processCodeTaskNameCodeEntry
=
mapEntry
.
get
();
dependItem
.
put
(
"definitionCode"
,
processCodeTaskNameCodeEntry
.
getKey
());
String
depTasks
=
dependItem
.
get
(
"depTasks"
).
asText
();
long
taskCode
=
"ALL"
.
equals
(
depTasks
)
?
0L
:
processCodeTaskNameCodeEntry
.
getValue
().
get
(
depTasks
);
dependItem
.
put
(
"depTaskCode"
,
taskCode
);
}
dependItem
.
remove
(
"projectId"
);
dependItem
.
remove
(
"definitionId"
);
dependItem
.
remove
(
"depTasks"
);
dependItemList
.
set
(
j
,
dependItem
);
}
dependTask
.
put
(
"dependItemList"
,
dependItemList
);
dependTaskList
.
set
(
i
,
dependTask
);
}
dependence
.
put
(
"dependTaskList"
,
dependTaskList
);
taskDefinitionLog
.
setTaskParams
(
JSONUtils
.
toJsonString
(
taskParams
));
}
}
}
private
void
handleProcessTaskRelation
(
Map
<
String
,
List
<
String
>>
taskNamePreMap
,
Map
<
String
,
Long
>
taskNameCodeMap
,
ProcessDefinition
processDefinition
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录