Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
DolphinScheduler
提交
eabb3021
DolphinScheduler
项目概览
apache
/
DolphinScheduler
上一次同步 1 年多
通知
704
Star
9572
Fork
3514
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
eabb3021
编写于
11月 14, 2021
作者:
O
OS
提交者:
GitHub
11月 14, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
cherry-pick 6813 remove processDefinition resourceIds (#6841)
上级
ae087699
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
50 addition
and
91 deletion
+50
-91
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
...eduler/api/service/impl/ProcessDefinitionServiceImpl.java
+50
-65
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
...scheduler/api/service/impl/TaskDefinitionServiceImpl.java
+0
-6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
...inscheduler/api/service/ProcessDefinitionServiceTest.java
+0
-2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
...apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+0
-15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
...che/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
+0
-1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java
...phinscheduler/dao/mapper/ProcessDefinitionMapperTest.java
+0
-2
未找到文件。
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
浏览文件 @
eabb3021
...
...
@@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import
org.apache.dolphinscheduler.common.model.TaskNode
;
import
org.apache.dolphinscheduler.common.model.TaskNodeRelation
;
import
org.apache.dolphinscheduler.common.thread.Stopper
;
import
org.apache.dolphinscheduler.common.utils.CollectionUtils
;
import
org.apache.dolphinscheduler.common.utils.DateUtils
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.dolphinscheduler.common.utils.SnowFlakeUtils
;
...
...
@@ -72,7 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import
org.apache.dolphinscheduler.service.permission.PermissionCheck
;
import
org.apache.dolphinscheduler.service.process.ProcessService
;
import
org.apache.commons.
lang.String
Utils
;
import
org.apache.commons.
collections4.Collection
Utils
;
import
java.io.BufferedOutputStream
;
import
java.io.IOException
;
...
...
@@ -165,15 +164,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* create process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param loginUser
login user
* @param projectCode
project code
* @param name
process definition name
* @param description
description
* @param globalParams
global params
* @param locations
locations for nodes
* @param timeout
timeout
* @param tenantCode
tenantCode
* @param taskRelationJson
relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
...
...
@@ -294,8 +293,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return
result
;
}
List
<
ProcessTaskRelation
>
processTaskRelations
=
taskRelationList
.
stream
()
.
map
(
processTaskRelationLog
->
JSONUtils
.
parseObject
(
JSONUtils
.
toJsonString
(
processTaskRelationLog
),
ProcessTaskRelation
.
class
))
.
collect
(
Collectors
.
toList
());
.
map
(
processTaskRelationLog
->
JSONUtils
.
parseObject
(
JSONUtils
.
toJsonString
(
processTaskRelationLog
),
ProcessTaskRelation
.
class
))
.
collect
(
Collectors
.
toList
());
List
<
TaskNode
>
taskNodeList
=
processService
.
transformTask
(
processTaskRelations
,
taskDefinitionLogs
);
if
(
taskNodeList
.
size
()
!=
taskRelationList
.
size
())
{
Set
<
Long
>
postTaskCodes
=
taskRelationList
.
stream
().
map
(
ProcessTaskRelationLog:
:
getPostTaskCode
).
collect
(
Collectors
.
toSet
());
...
...
@@ -303,7 +302,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Collection
<
Long
>
codes
=
CollectionUtils
.
subtract
(
postTaskCodes
,
taskNodeCodes
);
if
(
CollectionUtils
.
isNotEmpty
(
codes
))
{
logger
.
error
(
"the task code is not exit"
);
putMsg
(
result
,
Status
.
TASK_DEFINE_NOT_EXIST
,
StringUtils
.
join
(
codes
,
Constants
.
COMMA
));
putMsg
(
result
,
Status
.
TASK_DEFINE_NOT_EXIST
,
org
.
apache
.
commons
.
lang
.
StringUtils
.
join
(
codes
,
Constants
.
COMMA
));
return
result
;
}
}
...
...
@@ -332,7 +331,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list
*
* @param loginUser login user
* @param loginUser
login user
* @param projectCode project code
* @return definition list
*/
...
...
@@ -384,12 +383,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list paging
*
* @param loginUser login user
* @param loginUser
login user
* @param projectCode project code
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @param searchVal
search value
* @param userId
user id
* @param pageNo
page number
* @param pageSize
page size
* @return process definition page
*/
@Override
...
...
@@ -406,7 +405,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Page
<
ProcessDefinition
>
page
=
new
Page
<>(
pageNo
,
pageSize
);
IPage
<
ProcessDefinition
>
processDefinitionIPage
=
processDefinitionMapper
.
queryDefineListPaging
(
page
,
searchVal
,
userId
,
project
.
getCode
(),
isAdmin
(
loginUser
));
page
,
searchVal
,
userId
,
project
.
getCode
(),
isAdmin
(
loginUser
));
List
<
ProcessDefinition
>
records
=
processDefinitionIPage
.
getRecords
();
for
(
ProcessDefinition
pd
:
records
)
{
...
...
@@ -427,9 +426,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query detail of process definition
*
* @param loginUser login user
* @param loginUser
login user
* @param projectCode project code
* @param code process definition code
* @param code
process definition code
* @return process definition detail
*/
@Override
...
...
@@ -479,16 +478,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* update process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param loginUser
login user
* @param projectCode
project code
* @param name
process definition name
* @param code
process definition code
* @param description
description
* @param globalParams
global params
* @param locations
locations for nodes
* @param timeout
timeout
* @param tenantCode
tenantCode
* @param taskRelationJson
relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
...
...
@@ -583,7 +582,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
throw
new
ServiceException
(
Status
.
UPDATE_PROCESS_DEFINITION_ERROR
);
}
int
insertResult
=
processService
.
saveTaskRelation
(
loginUser
,
processDefinition
.
getProjectCode
(),
processDefinition
.
getCode
(),
insertVersion
,
taskRelationList
,
taskDefinitionLogs
);
processDefinition
.
getCode
(),
insertVersion
,
taskRelationList
,
taskDefinitionLogs
);
if
(
insertResult
==
Constants
.
EXIT_CODE_SUCCESS
)
{
putMsg
(
result
,
Status
.
SUCCESS
);
result
.
put
(
Constants
.
DATA_LIST
,
processDefinition
);
...
...
@@ -597,9 +596,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* verify process definition name unique
*
* @param loginUser login user
* @param loginUser
login user
* @param projectCode project code
* @param name name
* @param name
name
* @return true if process definition name not exists, otherwise false
*/
@Override
...
...
@@ -719,20 +718,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
switch
(
releaseState
)
{
case
ONLINE:
// To check resources whether they are already cancel authorized or deleted
String
resourceIds
=
processDefinition
.
getResourceIds
();
if
(
StringUtils
.
isNotBlank
(
resourceIds
))
{
Integer
[]
resourceIdArray
=
Arrays
.
stream
(
resourceIds
.
split
(
Constants
.
COMMA
)).
map
(
Integer:
:
parseInt
).
toArray
(
Integer
[]::
new
);
PermissionCheck
<
Integer
>
permissionCheck
=
new
PermissionCheck
<>(
AuthorizationType
.
RESOURCE_FILE_ID
,
processService
,
resourceIdArray
,
loginUser
.
getId
(),
logger
);
try
{
permissionCheck
.
checkPermission
();
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
putMsg
(
result
,
Status
.
RESOURCE_NOT_EXIST_OR_NO_PERMISSION
,
RELEASESTATE
);
return
result
;
}
}
processDefinition
.
setReleaseState
(
releaseState
);
processDefinitionMapper
.
updateById
(
processDefinition
);
break
;
...
...
@@ -769,7 +754,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*/
@Override
public
void
batchExportProcessDefinitionByCodes
(
User
loginUser
,
long
projectCode
,
String
codes
,
HttpServletResponse
response
)
{
if
(
StringUtils
.
isEmpty
(
codes
))
{
if
(
org
.
apache
.
commons
.
lang
.
StringUtils
.
isEmpty
(
codes
))
{
return
;
}
Project
project
=
projectMapper
.
queryByCode
(
projectCode
);
...
...
@@ -1339,7 +1324,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return
result
;
}
if
(
StringUtils
.
isEmpty
(
processDefinitionCodes
))
{
if
(
org
.
apache
.
commons
.
lang
.
StringUtils
.
isEmpty
(
processDefinitionCodes
))
{
putMsg
(
result
,
Status
.
PROCESS_DEFINITION_CODES_IS_EMPTY
,
processDefinitionCodes
);
return
result
;
}
...
...
@@ -1369,7 +1354,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
diffCode
.
forEach
(
code
->
failedProcessList
.
add
(
code
+
"[null]"
));
for
(
ProcessDefinition
processDefinition
:
processDefinitionList
)
{
List
<
ProcessTaskRelation
>
processTaskRelations
=
processTaskRelationMapper
.
queryByProcessCode
(
processDefinition
.
getProjectCode
(),
processDefinition
.
getCode
());
processTaskRelationMapper
.
queryByProcessCode
(
processDefinition
.
getProjectCode
(),
processDefinition
.
getCode
());
List
<
ProcessTaskRelationLog
>
taskRelationList
=
processTaskRelations
.
stream
().
map
(
ProcessTaskRelationLog:
:
new
).
collect
(
Collectors
.
toList
());
processDefinition
.
setProjectCode
(
targetProjectCode
);
if
(
isCopy
)
{
...
...
@@ -1407,8 +1392,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param version the version user want to switch
* @param code
process definition code
* @param version
the version user want to switch
* @return switch process definition version result code
*/
@Override
...
...
@@ -1444,11 +1429,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* check batch operate result
*
* @param srcProjectCode srcProjectCode
* @param srcProjectCode
srcProjectCode
* @param targetProjectCode targetProjectCode
* @param result result
* @param result
result
* @param failedProcessList failedProcessList
* @param isCopy isCopy
* @param isCopy
isCopy
*/
private
void
checkBatchOperateResult
(
long
srcProjectCode
,
long
targetProjectCode
,
Map
<
String
,
Object
>
result
,
List
<
String
>
failedProcessList
,
boolean
isCopy
)
{
...
...
@@ -1466,11 +1451,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query the pagination versions info by one certain process definition code
*
* @param loginUser login user info to check auth
* @param loginUser
login user info to check auth
* @param projectCode project code
* @param pageNo page number
* @param pageSize page size
* @param code process definition code
* @param pageNo
page number
* @param pageSize
page size
* @param code
process definition code
* @return the pagination process definition versions info of the certain process definition
*/
@Override
...
...
@@ -1500,10 +1485,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* delete one certain process definition by version number and process definition code
*
* @param loginUser login user info to check auth
* @param loginUser
login user info to check auth
* @param projectCode project code
* @param code process definition code
* @param version version number
* @param code
process definition code
* @param version
version number
* @return delete result code
*/
@Override
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
浏览文件 @
eabb3021
...
...
@@ -119,12 +119,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg
(
result
,
Status
.
PROCESS_NODE_S_PARAMETER_INVALID
,
taskDefinitionLog
.
getName
());
return
result
;
}
TaskDefinition
taskDefinition
=
taskDefinitionMapper
.
queryByName
(
projectCode
,
taskDefinitionLog
.
getName
());
if
(
taskDefinition
!=
null
)
{
logger
.
error
(
"task definition name {} already exists"
,
taskDefinitionLog
.
getName
());
putMsg
(
result
,
Status
.
TASK_DEFINITION_NAME_EXISTED
,
taskDefinitionLog
.
getName
());
return
result
;
}
}
int
saveTaskResult
=
processService
.
saveTaskDefine
(
loginUser
,
projectCode
,
taskDefinitionLogs
);
if
(
saveTaskResult
==
Constants
.
DEFINITION_FAILURE
)
{
...
...
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
浏览文件 @
eabb3021
...
...
@@ -435,8 +435,6 @@ public class ProcessDefinitionServiceTest {
Assert
.
assertEquals
(
Status
.
SUCCESS
,
onlineRes
.
get
(
Constants
.
STATUS
));
// project check auth success, processs definition online
ProcessDefinition
processDefinition1
=
getProcessDefinition
();
processDefinition1
.
setResourceIds
(
"1,2"
);
Map
<
String
,
Object
>
onlineWithResourceRes
=
processDefinitionService
.
releaseProcessDefinition
(
loginUser
,
projectCode
,
46
,
ReleaseState
.
ONLINE
);
Assert
.
assertEquals
(
Status
.
SUCCESS
,
onlineWithResourceRes
.
get
(
Constants
.
STATUS
));
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
浏览文件 @
eabb3021
...
...
@@ -162,12 +162,6 @@ public class ProcessDefinition {
@TableField
(
exist
=
false
)
private
String
modifyBy
;
/**
* resource ids
*/
@TableField
(
exist
=
false
)
private
String
resourceIds
;
/**
* warningGroupId
*/
...
...
@@ -340,14 +334,6 @@ public class ProcessDefinition {
this
.
scheduleReleaseState
=
scheduleReleaseState
;
}
public
String
getResourceIds
()
{
return
resourceIds
;
}
public
void
setResourceIds
(
String
resourceIds
)
{
this
.
resourceIds
=
resourceIds
;
}
public
int
getTimeout
()
{
return
timeout
;
}
...
...
@@ -458,7 +444,6 @@ public class ProcessDefinition {
+
", tenantId="
+
tenantId
+
", tenantCode='"
+
tenantCode
+
'\''
+
", modifyBy='"
+
modifyBy
+
'\''
+
", resourceIds='"
+
resourceIds
+
'\''
+
", warningGroupId="
+
warningGroupId
+
'}'
;
}
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
浏览文件 @
eabb3021
...
...
@@ -65,7 +65,6 @@ public class ProcessDefinitionLog extends ProcessDefinition {
this
.
setTimeout
(
processDefinition
.
getTimeout
());
this
.
setTenantId
(
processDefinition
.
getTenantId
());
this
.
setModifyBy
(
processDefinition
.
getModifyBy
());
this
.
setResourceIds
(
processDefinition
.
getResourceIds
());
this
.
setWarningGroupId
(
processDefinition
.
getWarningGroupId
());
}
...
...
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java
浏览文件 @
eabb3021
...
...
@@ -353,7 +353,6 @@ public class ProcessDefinitionMapperTest {
@Test
public
void
listResourcesTest
()
{
ProcessDefinition
processDefinition
=
insertOne
();
processDefinition
.
setResourceIds
(
"3,5"
);
processDefinition
.
setReleaseState
(
ReleaseState
.
ONLINE
);
List
<
Map
<
String
,
Object
>>
maps
=
processDefinitionMapper
.
listResources
();
Assert
.
assertNotNull
(
maps
);
...
...
@@ -362,7 +361,6 @@ public class ProcessDefinitionMapperTest {
@Test
public
void
listResourcesByUserTest
()
{
ProcessDefinition
processDefinition
=
insertOne
();
processDefinition
.
setResourceIds
(
"3,5"
);
processDefinition
.
setReleaseState
(
ReleaseState
.
ONLINE
);
List
<
Map
<
String
,
Object
>>
maps
=
processDefinitionMapper
.
listResourcesByUser
(
processDefinition
.
getUserId
());
Assert
.
assertNotNull
(
maps
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录