Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
三久
DolphinScheduler
提交
8a47785f
DolphinScheduler
项目概览
三久
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
8a47785f
编写于
9月 13, 2022
作者:
W
Wenjun Ruan
提交者:
GitHub
9月 13, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix listing processDefinition the schedulerReleaseState will never be null (#11888)
上级
070b8c25
变更
16
展开全部
隐藏空白更改
内联
并排
Showing
16 changed file
with
590 addition
and
306 deletion
+590
-306
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/BaseController.java
...pache/dolphinscheduler/api/controller/BaseController.java
+1
-0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
...scheduler/api/controller/ProcessDefinitionController.java
+17
-11
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
...olphinscheduler/api/service/ProcessDefinitionService.java
+19
-16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
...g/apache/dolphinscheduler/api/service/ProjectService.java
+5
-3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
...apache/dolphinscheduler/api/service/SchedulerService.java
+6
-3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
...eduler/api/service/impl/ProcessDefinitionServiceImpl.java
+35
-22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
...dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+17
-0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
...lphinscheduler/api/service/impl/SchedulerServiceImpl.java
+36
-21
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
...duler/api/controller/ProcessDefinitionControllerTest.java
+82
-65
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
...inscheduler/api/service/ProcessDefinitionServiceTest.java
+219
-138
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
.../dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+1
-3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/PageListingResult.java
.../apache/dolphinscheduler/dao/model/PageListingResult.java
+38
-0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
...dolphinscheduler/dao/repository/ProcessDefinitionDao.java
+40
-0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
...heduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
+51
-0
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
...e/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+12
-15
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java
...phinscheduler/dao/mapper/ProcessDefinitionMapperTest.java
+11
-9
未找到文件。
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/BaseController.java
浏览文件 @
8a47785f
...
...
@@ -46,6 +46,7 @@ public class BaseController {
* @param pageSize page size
* @return check result code
*/
// todo: directly throw exception
public
Result
checkPageParams
(
int
pageNo
,
int
pageSize
)
{
Result
result
=
new
Result
();
Status
resultEnum
=
Status
.
SUCCESS
;
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
浏览文件 @
8a47785f
...
...
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import
org.apache.dolphinscheduler.api.enums.Status
;
import
org.apache.dolphinscheduler.api.exceptions.ApiException
;
import
org.apache.dolphinscheduler.api.service.ProcessDefinitionService
;
import
org.apache.dolphinscheduler.api.utils.PageInfo
;
import
org.apache.dolphinscheduler.api.utils.Result
;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum
;
...
...
@@ -213,7 +214,8 @@ public class ProcessDefinitionController extends BaseController {
@ApiParam
(
name
=
"projectCode"
,
value
=
"PROJECT_CODE"
,
required
=
true
)
@PathVariable
long
projectCode
,
@RequestParam
(
value
=
"name"
,
required
=
true
)
String
name
,
@RequestParam
(
value
=
"code"
,
required
=
false
,
defaultValue
=
"0"
)
long
processDefinitionCode
)
{
Map
<
String
,
Object
>
result
=
processDefinitionService
.
verifyProcessDefinitionName
(
loginUser
,
projectCode
,
name
,
processDefinitionCode
);
Map
<
String
,
Object
>
result
=
processDefinitionService
.
verifyProcessDefinitionName
(
loginUser
,
projectCode
,
name
,
processDefinitionCode
);
return
returnDataList
(
result
);
}
...
...
@@ -503,21 +505,25 @@ public class ProcessDefinitionController extends BaseController {
@ResponseStatus
(
HttpStatus
.
OK
)
@ApiException
(
QUERY_PROCESS_DEFINITION_LIST_PAGING_ERROR
)
@AccessLogAnnotation
(
ignoreRequestArgs
=
"loginUser"
)
public
Result
queryProcessDefinitionListPaging
(
@ApiIgnore
@RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
@ApiParam
(
name
=
"projectCode"
,
value
=
"PROJECT_CODE"
,
required
=
true
)
@PathVariable
long
projectCode
,
@RequestParam
(
value
=
"searchVal"
,
required
=
false
)
String
searchVal
,
@RequestParam
(
value
=
"otherParamsJson"
,
required
=
false
)
String
otherParamsJson
,
@RequestParam
(
value
=
"userId"
,
required
=
false
,
defaultValue
=
"0"
)
Integer
userId
,
@RequestParam
(
"pageNo"
)
Integer
pageNo
,
@RequestParam
(
"pageSize"
)
Integer
pageSize
)
{
Result
result
=
checkPageParams
(
pageNo
,
pageSize
);
public
Result
<
PageInfo
<
ProcessDefinition
>>
queryProcessDefinitionListPaging
(
@ApiIgnore
@RequestAttribute
(
value
=
Constants
.
SESSION_USER
)
User
loginUser
,
@ApiParam
(
name
=
"projectCode"
,
value
=
"PROJECT_CODE"
,
required
=
true
)
@PathVariable
long
projectCode
,
@RequestParam
(
value
=
"searchVal"
,
required
=
false
)
String
searchVal
,
@RequestParam
(
value
=
"otherParamsJson"
,
required
=
false
)
String
otherParamsJson
,
@RequestParam
(
value
=
"userId"
,
required
=
false
,
defaultValue
=
"0"
)
Integer
userId
,
@RequestParam
(
"pageNo"
)
Integer
pageNo
,
@RequestParam
(
"pageSize"
)
Integer
pageSize
)
{
Result
<
PageInfo
<
ProcessDefinition
>>
result
=
checkPageParams
(
pageNo
,
pageSize
);
if
(!
result
.
checkResult
())
{
return
result
;
}
searchVal
=
ParameterUtils
.
handleEscapes
(
searchVal
);
return
processDefinitionService
.
queryProcessDefinitionListPaging
(
loginUser
,
projectCode
,
searchVal
,
otherParamsJson
,
userId
,
pageNo
,
pageSize
);
PageInfo
<
ProcessDefinition
>
pageInfo
=
processDefinitionService
.
queryProcessDefinitionListPaging
(
loginUser
,
projectCode
,
searchVal
,
otherParamsJson
,
userId
,
pageNo
,
pageSize
);
return
Result
.
success
(
pageInfo
);
}
/**
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
浏览文件 @
8a47785f
...
...
@@ -17,11 +17,7 @@
package
org.apache.dolphinscheduler.api.service
;
import
java.util.List
;
import
java.util.Map
;
import
javax.servlet.http.HttpServletResponse
;
import
org.apache.dolphinscheduler.api.utils.PageInfo
;
import
org.apache.dolphinscheduler.api.utils.Result
;
import
org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum
;
import
org.apache.dolphinscheduler.common.enums.ReleaseState
;
...
...
@@ -29,6 +25,12 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import
org.apache.dolphinscheduler.dao.entity.Project
;
import
org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog
;
import
org.apache.dolphinscheduler.dao.entity.User
;
import
java.util.List
;
import
java.util.Map
;
import
javax.servlet.http.HttpServletResponse
;
import
org.springframework.web.multipart.MultipartFile
;
/**
...
...
@@ -97,13 +99,13 @@ public interface ProcessDefinitionService {
* @param userId user id
* @return process definition page
*/
Result
queryProcessDefinitionListPaging
(
User
loginUser
,
long
projectCode
,
String
searchVal
,
String
otherParamsJson
,
Integer
userId
,
Integer
pageNo
,
Integer
pageSize
);
PageInfo
<
ProcessDefinition
>
queryProcessDefinitionListPaging
(
User
loginUser
,
long
projectCode
,
String
searchVal
,
String
otherParamsJson
,
Integer
userId
,
Integer
pageNo
,
Integer
pageSize
);
/**
* query detail of process definition
...
...
@@ -271,7 +273,8 @@ public interface ProcessDefinitionService {
* @param processTaskRelationJson process task relation json
* @return check result code
*/
Map
<
String
,
Object
>
checkProcessNodeList
(
String
processTaskRelationJson
,
List
<
TaskDefinitionLog
>
taskDefinitionLogs
);
Map
<
String
,
Object
>
checkProcessNodeList
(
String
processTaskRelationJson
,
List
<
TaskDefinitionLog
>
taskDefinitionLogs
);
/**
* get task node details based on process definition
...
...
@@ -330,7 +333,7 @@ public interface ProcessDefinitionService {
* @param limit limit
* @return tree view json data
*/
Map
<
String
,
Object
>
viewTree
(
User
loginUser
,
long
projectCode
,
long
code
,
Integer
limit
);
Map
<
String
,
Object
>
viewTree
(
User
loginUser
,
long
projectCode
,
long
code
,
Integer
limit
);
/**
* switch the defined process definition version
...
...
@@ -456,7 +459,8 @@ public interface ProcessDefinitionService {
* @param result
* @param otherParamsJson
*/
void
saveOtherRelation
(
User
loginUser
,
ProcessDefinition
processDefinition
,
Map
<
String
,
Object
>
result
,
String
otherParamsJson
);
void
saveOtherRelation
(
User
loginUser
,
ProcessDefinition
processDefinition
,
Map
<
String
,
Object
>
result
,
String
otherParamsJson
);
/**
* get Json String
...
...
@@ -466,4 +470,3 @@ public interface ProcessDefinitionService {
*/
String
doOtherOperateProcess
(
User
loginUser
,
ProcessDefinition
processDefinition
);
}
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
浏览文件 @
8a47785f
...
...
@@ -64,9 +64,11 @@ public interface ProjectService {
* @param perm String
* @return true if the login user have permission to see the project
*/
Map
<
String
,
Object
>
checkProjectAndAuth
(
User
loginUser
,
Project
project
,
long
projectCode
,
String
perm
);
Map
<
String
,
Object
>
checkProjectAndAuth
(
User
loginUser
,
Project
project
,
long
projectCode
,
String
perm
);
boolean
hasProjectAndPerm
(
User
loginUser
,
Project
project
,
Map
<
String
,
Object
>
result
,
String
perm
);
void
checkProjectAndAuthThrowException
(
User
loginUser
,
Project
project
,
String
permission
);
boolean
hasProjectAndPerm
(
User
loginUser
,
Project
project
,
Map
<
String
,
Object
>
result
,
String
perm
);
/**
* has project and permission
...
...
@@ -172,4 +174,4 @@ public interface ProjectService {
*/
void
checkProjectAndAuth
(
Result
result
,
User
loginUser
,
Project
project
,
long
projectCode
,
String
perm
);
}
\ No newline at end of file
}
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
浏览文件 @
8a47785f
...
...
@@ -22,8 +22,10 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import
org.apache.dolphinscheduler.common.enums.Priority
;
import
org.apache.dolphinscheduler.common.enums.ReleaseState
;
import
org.apache.dolphinscheduler.common.enums.WarningType
;
import
org.apache.dolphinscheduler.dao.entity.Schedule
;
import
org.apache.dolphinscheduler.dao.entity.User
;
import
java.util.List
;
import
java.util.Map
;
/**
...
...
@@ -83,7 +85,6 @@ public interface SchedulerService {
String
workerGroup
,
Long
environmentCode
);
/**
* set schedule online or offline
*
...
...
@@ -110,12 +111,14 @@ public interface SchedulerService {
* @return schedule list page
*/
Result
querySchedule
(
User
loginUser
,
long
projectCode
,
long
processDefineCode
,
String
searchVal
,
Integer
pageNo
,
Integer
pageSize
);
Integer
pageNo
,
Integer
pageSize
);
List
<
Schedule
>
queryScheduleByProcessDefinitionCodes
(
List
<
Long
>
processDefinitionCodes
);
/**
* query schedule list
*
* @param loginUser login user
* @param loginUser
login user
* @param projectCode project code
* @return schedule list
*/
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
浏览文件 @
8a47785f
...
...
@@ -100,6 +100,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper
;
import
org.apache.dolphinscheduler.dao.mapper.TenantMapper
;
import
org.apache.dolphinscheduler.dao.mapper.UserMapper
;
import
org.apache.dolphinscheduler.dao.model.PageListingResult
;
import
org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao
;
import
org.apache.dolphinscheduler.plugin.task.api.enums.SqlType
;
import
org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy
;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode
;
...
...
@@ -128,6 +130,7 @@ import java.util.Objects;
import
java.util.Optional
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
import
java.util.zip.ZipEntry
;
import
java.util.zip.ZipInputStream
;
...
...
@@ -135,6 +138,8 @@ import java.util.zip.ZipInputStream;
import
javax.servlet.ServletOutputStream
;
import
javax.servlet.http.HttpServletResponse
;
import
lombok.NonNull
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -176,6 +181,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private
ProcessDefinitionMapper
processDefinitionMapper
;
@Autowired
private
ProcessDefinitionDao
processDefinitionDao
;
@Lazy
@Autowired
private
ProcessInstanceService
processInstanceService
;
...
...
@@ -475,39 +483,44 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return process definition page
*/
@Override
public
Result
queryProcessDefinitionListPaging
(
User
loginUser
,
long
projectCode
,
String
searchVal
,
String
otherParamsJson
,
Integer
userId
,
Integer
pageNo
,
Integer
pageSize
)
{
Result
result
=
new
Result
();
public
PageInfo
<
ProcessDefinition
>
queryProcessDefinitionListPaging
(
@NonNull
User
loginUser
,
long
projectCode
,
String
searchVal
,
String
otherParamsJson
,
Integer
userId
,
Integer
pageNo
,
Integer
pageSize
)
{
Project
project
=
projectMapper
.
queryByCode
(
projectCode
);
// check user access for project
Map
<
String
,
Object
>
checkResult
=
projectService
.
checkProjectAndAuth
(
loginUser
,
project
,
projectCode
,
WORKFLOW_DEFINITION
);
Status
resultStatus
=
(
Status
)
checkResult
.
get
(
Constants
.
STATUS
);
if
(
resultStatus
!=
Status
.
SUCCESS
)
{
putMsg
(
result
,
resultStatus
);
return
result
;
}
projectService
.
checkProjectAndAuthThrowException
(
loginUser
,
project
,
WORKFLOW_DEFINITION
);
PageListingResult
<
ProcessDefinition
>
processDefinitionsPageListingResult
=
processDefinitionDao
.
listingProcessDefinition
(
pageNo
,
pageSize
,
searchVal
,
userId
,
projectCode
);
List
<
ProcessDefinition
>
processDefinitions
=
processDefinitionsPageListingResult
.
getRecords
();
Page
<
ProcessDefinition
>
page
=
new
Page
<>(
pageNo
,
pageSize
);
IPage
<
ProcessDefinition
>
processDefinitionIPage
=
processDefinitionMapper
.
queryDefineListPaging
(
page
,
searchVal
,
userId
,
project
.
getCode
(),
isAdmin
(
loginUser
));
List
<
Long
>
processDefinitionCodes
=
processDefinitions
.
stream
().
map
(
ProcessDefinition:
:
getCode
).
collect
(
Collectors
.
toList
());
Map
<
Long
,
Schedule
>
scheduleMap
=
schedulerService
.
queryScheduleByProcessDefinitionCodes
(
processDefinitionCodes
)
.
stream
()
.
collect
(
Collectors
.
toMap
(
Schedule:
:
getProcessDefinitionCode
,
Function
.
identity
()));
List
<
ProcessDefinition
>
records
=
processDefinitionIPage
.
getRecords
();
for
(
ProcessDefinition
pd
:
records
)
{
for
(
ProcessDefinition
pd
:
processDefinitions
)
{
// todo: use batch query
ProcessDefinitionLog
processDefinitionLog
=
processDefinitionLogMapper
.
queryByDefinitionCodeAndVersion
(
pd
.
getCode
(),
pd
.
getVersion
());
User
user
=
userMapper
.
selectById
(
processDefinitionLog
.
getOperator
());
pd
.
setModifyBy
(
user
.
getUserName
());
Schedule
schedule
=
scheduleMap
.
get
(
pd
.
getCode
());
pd
.
setScheduleReleaseState
(
schedule
==
null
?
null
:
schedule
.
getReleaseState
());
}
processDefinitionIPage
.
setRecords
(
records
);
PageInfo
<
ProcessDefinition
>
pageInfo
=
new
PageInfo
<>(
pageNo
,
pageSize
);
pageInfo
.
setTotal
((
int
)
processDefinitionIPage
.
getTotal
());
pageInfo
.
setTotalList
(
processDefinitionIPage
.
getRecords
());
result
.
setData
(
pageInfo
);
putMsg
(
result
,
Status
.
SUCCESS
);
pageInfo
.
setTotal
((
int
)
processDefinitionsPageListingResult
.
getTotalCount
());
pageInfo
.
setTotalList
(
processDefinitions
);
return
result
;
return
pageInfo
;
}
/**
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
浏览文件 @
8a47785f
...
...
@@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import
static
org
.
apache
.
dolphinscheduler
.
api
.
constants
.
ApiFuncIdentificationConstant
.
PROJECT_UPDATE
;
import
org.apache.dolphinscheduler.api.enums.Status
;
import
org.apache.dolphinscheduler.api.exceptions.ServiceException
;
import
org.apache.dolphinscheduler.api.service.ProjectService
;
import
org.apache.dolphinscheduler.api.utils.PageInfo
;
import
org.apache.dolphinscheduler.api.utils.Result
;
...
...
@@ -53,6 +54,10 @@ import java.util.Map;
import
java.util.Objects
;
import
java.util.Set
;
import
javax.annotation.Nullable
;
import
lombok.NonNull
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -215,6 +220,18 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
return
result
;
}
public
void
checkProjectAndAuthThrowException
(
@NonNull
User
loginUser
,
@Nullable
Project
project
,
String
permission
)
{
// todo: throw a permission exception
if
(
project
==
null
)
{
throw
new
ServiceException
(
Status
.
PROJECT_NOT_EXIST
);
}
if
(!
canOperatorPermissions
(
loginUser
,
new
Object
[]{
project
.
getId
()},
AuthorizationType
.
PROJECTS
,
permission
))
{
throw
new
ServiceException
(
Status
.
USER_NO_OPERATION_PROJECT_PERM
,
loginUser
.
getUserName
(),
project
.
getCode
());
}
}
@Override
public
boolean
hasProjectAndPerm
(
User
loginUser
,
Project
project
,
Map
<
String
,
Object
>
result
,
String
permission
)
{
boolean
checkResult
=
false
;
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
浏览文件 @
8a47785f
...
...
@@ -52,11 +52,13 @@ import org.apache.dolphinscheduler.service.cron.CronUtils;
import
org.apache.dolphinscheduler.service.exceptions.CronParseException
;
import
org.apache.dolphinscheduler.service.process.ProcessService
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
java.time.ZoneId
;
import
java.time.ZonedDateTime
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.List
;
...
...
@@ -64,6 +66,8 @@ import java.util.Map;
import
java.util.TimeZone
;
import
java.util.stream.Collectors
;
import
lombok.NonNull
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -74,7 +78,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import
com.baomidou.mybatisplus.extension.plugins.pagination.Page
;
import
com.cronutils.model.Cron
;
/**
* scheduler service impl
*/
...
...
@@ -110,7 +113,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Autowired
private
ProcessTaskRelationMapper
processTaskRelationMapper
;
/**
* save schedule
*
...
...
@@ -152,7 +154,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// check work flow define release state
ProcessDefinition
processDefinition
=
processDefinitionMapper
.
queryByCode
(
processDefineCode
);
result
=
executorService
.
checkProcessDefinitionValid
(
projectCode
,
processDefinition
,
processDefineCode
,
processDefinition
.
getVersion
());
processDefinition
.
getVersion
());
if
(
result
.
get
(
Constants
.
STATUS
)
!=
Status
.
SUCCESS
)
{
return
result
;
}
...
...
@@ -264,11 +266,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return
result
;
}
updateSchedule
(
result
,
schedule
,
processDefinition
,
scheduleExpression
,
warningType
,
warningGroupId
,
failureStrategy
,
processInstancePriority
,
workerGroup
,
environmentCode
);
updateSchedule
(
result
,
schedule
,
processDefinition
,
scheduleExpression
,
warningType
,
warningGroupId
,
failureStrategy
,
processInstancePriority
,
workerGroup
,
environmentCode
);
return
result
;
}
/**
* set schedule online or offline
*
...
...
@@ -303,16 +305,18 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// check schedule release state
if
(
scheduleObj
.
getReleaseState
()
==
scheduleStatus
)
{
logger
.
info
(
"schedule release is already {},needn't to change schedule id: {} from {} to {}"
,
scheduleObj
.
getReleaseState
(),
scheduleObj
.
getId
(),
scheduleObj
.
getReleaseState
(),
scheduleStatus
);
scheduleObj
.
getReleaseState
(),
scheduleObj
.
getId
(),
scheduleObj
.
getReleaseState
(),
scheduleStatus
);
putMsg
(
result
,
Status
.
SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE
,
scheduleStatus
);
return
result
;
}
ProcessDefinition
processDefinition
=
processDefinitionMapper
.
queryByCode
(
scheduleObj
.
getProcessDefinitionCode
());
ProcessDefinition
processDefinition
=
processDefinitionMapper
.
queryByCode
(
scheduleObj
.
getProcessDefinitionCode
());
if
(
processDefinition
==
null
||
projectCode
!=
processDefinition
.
getProjectCode
())
{
putMsg
(
result
,
Status
.
PROCESS_DEFINE_NOT_EXIST
,
String
.
valueOf
(
scheduleObj
.
getProcessDefinitionCode
()));
return
result
;
}
List
<
ProcessTaskRelation
>
processTaskRelations
=
processTaskRelationMapper
.
queryByProcessCode
(
projectCode
,
scheduleObj
.
getProcessDefinitionCode
());
List
<
ProcessTaskRelation
>
processTaskRelations
=
processTaskRelationMapper
.
queryByProcessCode
(
projectCode
,
scheduleObj
.
getProcessDefinitionCode
());
if
(
processTaskRelations
.
isEmpty
())
{
putMsg
(
result
,
Status
.
PROCESS_DAG_IS_EMPTY
);
return
result
;
...
...
@@ -321,7 +325,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// check process definition release state
if
(
processDefinition
.
getReleaseState
()
!=
ReleaseState
.
ONLINE
)
{
logger
.
info
(
"not release process definition id: {} , name : {}"
,
processDefinition
.
getId
(),
processDefinition
.
getName
());
processDefinition
.
getName
());
putMsg
(
result
,
Status
.
PROCESS_DEFINE_NOT_RELEASE
,
processDefinition
.
getName
());
return
result
;
}
...
...
@@ -330,7 +334,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
processService
.
recurseFindSubProcess
(
processDefinition
.
getCode
(),
subProcessDefineCodes
);
if
(!
subProcessDefineCodes
.
isEmpty
())
{
List
<
ProcessDefinition
>
subProcessDefinitionList
=
processDefinitionMapper
.
queryByCodes
(
subProcessDefineCodes
);
processDefinitionMapper
.
queryByCodes
(
subProcessDefineCodes
);
if
(
subProcessDefinitionList
!=
null
&&
!
subProcessDefinitionList
.
isEmpty
())
{
for
(
ProcessDefinition
subProcessDefinition
:
subProcessDefinitionList
)
{
/**
...
...
@@ -338,9 +342,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
*/
if
(
subProcessDefinition
.
getReleaseState
()
!=
ReleaseState
.
ONLINE
)
{
logger
.
info
(
"not release process definition id: {} , name : {}"
,
subProcessDefinition
.
getId
(),
subProcessDefinition
.
getName
());
subProcessDefinition
.
getId
(),
subProcessDefinition
.
getName
());
putMsg
(
result
,
Status
.
PROCESS_DEFINE_NOT_RELEASE
,
String
.
valueOf
(
subProcessDefinition
.
getId
()));
String
.
valueOf
(
subProcessDefinition
.
getId
()));
return
result
;
}
}
...
...
@@ -364,11 +368,13 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
try
{
switch
(
scheduleStatus
)
{
case
ONLINE:
logger
.
info
(
"Call master client set schedule online, project id: {}, flow id: {},host: {}"
,
project
.
getId
(),
processDefinition
.
getId
(),
masterServers
);
logger
.
info
(
"Call master client set schedule online, project id: {}, flow id: {},host: {}"
,
project
.
getId
(),
processDefinition
.
getId
(),
masterServers
);
setSchedule
(
project
.
getId
(),
scheduleObj
);
break
;
case
OFFLINE:
logger
.
info
(
"Call master client set schedule offline, project id: {}, flow id: {},host: {}"
,
project
.
getId
(),
processDefinition
.
getId
(),
masterServers
);
logger
.
info
(
"Call master client set schedule offline, project id: {}, flow id: {},host: {}"
,
project
.
getId
(),
processDefinition
.
getId
(),
masterServers
);
deleteSchedule
(
project
.
getId
(),
id
);
break
;
default
:
...
...
@@ -376,7 +382,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return
result
;
}
}
catch
(
Exception
e
)
{
result
.
put
(
Constants
.
MSG
,
scheduleStatus
==
ReleaseState
.
ONLINE
?
"set online failure"
:
"set offline failure"
);
result
.
put
(
Constants
.
MSG
,
scheduleStatus
==
ReleaseState
.
ONLINE
?
"set online failure"
:
"set offline failure"
);
throw
new
ServiceException
(
result
.
get
(
Constants
.
MSG
).
toString
(),
e
);
}
...
...
@@ -416,7 +423,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Page
<
Schedule
>
page
=
new
Page
<>(
pageNo
,
pageSize
);
IPage
<
Schedule
>
scheduleIPage
=
scheduleMapper
.
queryByProcessDefineCodePaging
(
page
,
processDefineCode
,
searchVal
);
scheduleMapper
.
queryByProcessDefineCodePaging
(
page
,
processDefineCode
,
searchVal
);
List
<
ScheduleVo
>
scheduleList
=
new
ArrayList
<>();
for
(
Schedule
schedule
:
scheduleIPage
.
getRecords
())
{
...
...
@@ -431,6 +438,13 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return
result
;
}
public
List
<
Schedule
>
queryScheduleByProcessDefinitionCodes
(
@NonNull
List
<
Long
>
processDefinitionCodes
)
{
if
(
CollectionUtils
.
isEmpty
(
processDefinitionCodes
))
{
return
Collections
.
emptyList
();
}
return
scheduleMapper
.
querySchedulesByProcessDefinitionCodes
(
processDefinitionCodes
);
}
/**
* query schedule list
*
...
...
@@ -573,9 +587,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return
result
;
}
List
<
ZonedDateTime
>
selfFireDateList
=
CronUtils
.
getSelfFireDateList
(
startTime
,
endTime
,
cron
,
Constants
.
PREVIEW_SCHEDULE_EXECUTE_COUNT
);
CronUtils
.
getSelfFireDateList
(
startTime
,
endTime
,
cron
,
Constants
.
PREVIEW_SCHEDULE_EXECUTE_COUNT
);
List
<
String
>
previewDateList
=
selfFireDateList
.
stream
().
map
(
t
->
DateUtils
.
dateToString
(
t
,
zoneId
)).
collect
(
Collectors
.
toList
());
selfFireDateList
.
stream
().
map
(
t
->
DateUtils
.
dateToString
(
t
,
zoneId
)).
collect
(
Collectors
.
toList
());
result
.
put
(
Constants
.
DATA_LIST
,
previewDateList
);
putMsg
(
result
,
Status
.
SUCCESS
);
return
result
;
...
...
@@ -607,7 +621,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
String
workerGroup
,
long
environmentCode
)
{
Project
project
=
projectMapper
.
queryByCode
(
projectCode
);
//check user access for project
//
check user access for project
Map
<
String
,
Object
>
result
=
projectService
.
checkProjectAndAuth
(
loginUser
,
project
,
projectCode
,
null
);
if
(
result
.
get
(
Constants
.
STATUS
)
!=
Status
.
SUCCESS
)
{
return
result
;
...
...
@@ -625,7 +639,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return
result
;
}
updateSchedule
(
result
,
schedule
,
processDefinition
,
scheduleExpression
,
warningType
,
warningGroupId
,
failureStrategy
,
processInstancePriority
,
workerGroup
,
environmentCode
);
updateSchedule
(
result
,
schedule
,
processDefinition
,
scheduleExpression
,
warningType
,
warningGroupId
,
failureStrategy
,
processInstancePriority
,
workerGroup
,
environmentCode
);
return
result
;
}
...
...
@@ -634,7 +649,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
FailureStrategy
failureStrategy
,
Priority
processInstancePriority
,
String
workerGroup
,
long
environmentCode
)
{
if
(
checkValid
(
result
,
schedule
.
getReleaseState
()
==
ReleaseState
.
ONLINE
,
Status
.
SCHEDULE_CRON_ONLINE_FORBID_UPDATE
))
{
Status
.
SCHEDULE_CRON_ONLINE_FORBID_UPDATE
))
{
return
;
}
...
...
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
浏览文件 @
8a47785f
...
...
@@ -17,14 +17,6 @@
package
org.apache.dolphinscheduler.api.controller
;
import
java.text.MessageFormat
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
javax.servlet.http.HttpServletResponse
;
import
org.apache.dolphinscheduler.api.enums.Status
;
import
org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl
;
import
org.apache.dolphinscheduler.api.utils.PageInfo
;
...
...
@@ -35,8 +27,16 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import
org.apache.dolphinscheduler.common.enums.UserType
;
import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition
;
import
org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog
;
import
org.apache.dolphinscheduler.dao.entity.Resource
;
import
org.apache.dolphinscheduler.dao.entity.User
;
import
java.text.MessageFormat
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
javax.servlet.http.HttpServletResponse
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
...
...
@@ -72,16 +72,18 @@ public class ProcessDefinitionControllerTest {
@Test
public
void
testCreateProcessDefinition
()
{
String
relationJson
=
"[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+
"\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+
"\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]"
;
String
taskDefinitionJson
=
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+
"\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+
"\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+
"\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+
"\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"
;
String
relationJson
=
"[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+
"\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+
"\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]"
;
String
taskDefinitionJson
=
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+
"\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+
"\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+
"\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+
"\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"
;
long
projectCode
=
1L
;
String
name
=
"dag_test"
;
String
description
=
"desc test"
;
...
...
@@ -93,11 +95,16 @@ public class ProcessDefinitionControllerTest {
putMsg
(
result
,
Status
.
SUCCESS
);
result
.
put
(
Constants
.
DATA_LIST
,
1
);
Mockito
.
when
(
processDefinitionService
.
createProcessDefinition
(
user
,
projectCode
,
name
,
description
,
globalParams
,
locations
,
timeout
,
tenantCode
,
relationJson
,
taskDefinitionJson
,
""
,
ProcessExecutionTypeEnum
.
PARALLEL
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
createProcessDefinition
(
user
,
projectCode
,
name
,
description
,
globalParams
,
locations
,
timeout
,
tenantCode
,
relationJson
,
taskDefinitionJson
,
""
,
ProcessExecutionTypeEnum
.
PARALLEL
))
.
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
createProcessDefinition
(
user
,
projectCode
,
name
,
description
,
globalParams
,
locations
,
timeout
,
tenantCode
,
relationJson
,
taskDefinitionJson
,
""
,
ProcessExecutionTypeEnum
.
PARALLEL
);
Result
response
=
processDefinitionController
.
createProcessDefinition
(
user
,
projectCode
,
name
,
description
,
globalParams
,
locations
,
timeout
,
tenantCode
,
relationJson
,
taskDefinitionJson
,
""
,
ProcessExecutionTypeEnum
.
PARALLEL
);
Assert
.
assertEquals
(
Status
.
SUCCESS
.
getCode
(),
response
.
getCode
().
intValue
());
}
...
...
@@ -126,7 +133,8 @@ public class ProcessDefinitionControllerTest {
long
projectCode
=
1L
;
String
name
=
"dag_test"
;
Mockito
.
when
(
processDefinitionService
.
verifyProcessDefinitionName
(
user
,
projectCode
,
name
,
0
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
verifyProcessDefinitionName
(
user
,
projectCode
,
name
,
0
))
.
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
verifyProcessDefinitionName
(
user
,
projectCode
,
name
,
0
);
Assert
.
assertTrue
(
response
.
isStatus
(
Status
.
PROCESS_DEFINITION_NAME_EXIST
));
...
...
@@ -134,16 +142,18 @@ public class ProcessDefinitionControllerTest {
@Test
public
void
updateProcessDefinition
()
{
String
relationJson
=
"[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+
"\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+
"\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]"
;
String
taskDefinitionJson
=
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+
"\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+
"\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+
"\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+
"\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"
;
String
relationJson
=
"[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+
"\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+
"\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]"
;
String
taskDefinitionJson
=
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+
"\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+
"\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+
"\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+
"\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+
"\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"
;
String
locations
=
"{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"
;
long
projectCode
=
1L
;
String
name
=
"dag_test"
;
...
...
@@ -156,11 +166,15 @@ public class ProcessDefinitionControllerTest {
putMsg
(
result
,
Status
.
SUCCESS
);
result
.
put
(
"processDefinitionId"
,
1
);
Mockito
.
when
(
processDefinitionService
.
updateProcessDefinition
(
user
,
projectCode
,
name
,
code
,
description
,
globalParams
,
locations
,
timeout
,
tenantCode
,
relationJson
,
taskDefinitionJson
,
""
,
ProcessExecutionTypeEnum
.
PARALLEL
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
updateProcessDefinition
(
user
,
projectCode
,
name
,
code
,
description
,
globalParams
,
locations
,
timeout
,
tenantCode
,
relationJson
,
taskDefinitionJson
,
""
,
ProcessExecutionTypeEnum
.
PARALLEL
)).
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
updateProcessDefinition
(
user
,
projectCode
,
name
,
code
,
description
,
globalParams
,
locations
,
timeout
,
tenantCode
,
relationJson
,
taskDefinitionJson
,
""
,
ProcessExecutionTypeEnum
.
PARALLEL
,
ReleaseState
.
OFFLINE
);
Result
response
=
processDefinitionController
.
updateProcessDefinition
(
user
,
projectCode
,
name
,
code
,
description
,
globalParams
,
locations
,
timeout
,
tenantCode
,
relationJson
,
taskDefinitionJson
,
""
,
ProcessExecutionTypeEnum
.
PARALLEL
,
ReleaseState
.
OFFLINE
);
Assert
.
assertEquals
(
Status
.
SUCCESS
.
getCode
(),
response
.
getCode
().
intValue
());
}
...
...
@@ -171,8 +185,10 @@ public class ProcessDefinitionControllerTest {
Map
<
String
,
Object
>
result
=
new
HashMap
<>();
putMsg
(
result
,
Status
.
SUCCESS
);
Mockito
.
when
(
processDefinitionService
.
releaseProcessDefinition
(
user
,
projectCode
,
id
,
ReleaseState
.
OFFLINE
)).
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
releaseProcessDefinition
(
user
,
projectCode
,
id
,
ReleaseState
.
OFFLINE
);
Mockito
.
when
(
processDefinitionService
.
releaseProcessDefinition
(
user
,
projectCode
,
id
,
ReleaseState
.
OFFLINE
))
.
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
releaseProcessDefinition
(
user
,
projectCode
,
id
,
ReleaseState
.
OFFLINE
);
Assert
.
assertTrue
(
response
!=
null
&&
response
.
isSuccess
());
}
...
...
@@ -210,7 +226,8 @@ public class ProcessDefinitionControllerTest {
Map
<
String
,
Object
>
result
=
new
HashMap
<>();
putMsg
(
result
,
Status
.
SUCCESS
);
Mockito
.
when
(
processDefinitionService
.
batchCopyProcessDefinition
(
user
,
projectCode
,
code
,
targetProjectCode
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
batchCopyProcessDefinition
(
user
,
projectCode
,
code
,
targetProjectCode
))
.
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
copyProcessDefinition
(
user
,
projectCode
,
code
,
targetProjectCode
);
Assert
.
assertTrue
(
response
!=
null
&&
response
.
isSuccess
());
...
...
@@ -225,7 +242,8 @@ public class ProcessDefinitionControllerTest {
Map
<
String
,
Object
>
result
=
new
HashMap
<>();
putMsg
(
result
,
Status
.
SUCCESS
);
Mockito
.
when
(
processDefinitionService
.
batchMoveProcessDefinition
(
user
,
projectCode
,
id
,
targetProjectCode
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
batchMoveProcessDefinition
(
user
,
projectCode
,
id
,
targetProjectCode
))
.
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
moveProcessDefinition
(
user
,
projectCode
,
id
,
targetProjectCode
);
Assert
.
assertTrue
(
response
!=
null
&&
response
.
isSuccess
());
...
...
@@ -285,7 +303,8 @@ public class ProcessDefinitionControllerTest {
Map
<
String
,
Object
>
result
=
new
HashMap
<>();
putMsg
(
result
,
Status
.
SUCCESS
);
Mockito
.
when
(
processDefinitionService
.
deleteProcessDefinitionByCode
(
user
,
projectCode
,
code
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
deleteProcessDefinitionByCode
(
user
,
projectCode
,
code
))
.
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
deleteProcessDefinitionByCode
(
user
,
projectCode
,
code
);
Assert
.
assertTrue
(
response
!=
null
&&
response
.
isSuccess
());
...
...
@@ -299,7 +318,8 @@ public class ProcessDefinitionControllerTest {
Map
<
String
,
Object
>
result
=
new
HashMap
<>();
putMsg
(
result
,
Status
.
SUCCESS
);
Mockito
.
when
(
processDefinitionService
.
getTaskNodeListByDefinitionCode
(
user
,
projectCode
,
code
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
getTaskNodeListByDefinitionCode
(
user
,
projectCode
,
code
))
.
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
getNodeListByDefinitionCode
(
user
,
projectCode
,
code
);
Assert
.
assertTrue
(
response
!=
null
&&
response
.
isSuccess
());
...
...
@@ -313,7 +333,8 @@ public class ProcessDefinitionControllerTest {
Map
<
String
,
Object
>
result
=
new
HashMap
<>();
putMsg
(
result
,
Status
.
SUCCESS
);
Mockito
.
when
(
processDefinitionService
.
getNodeListMapByDefinitionCodes
(
user
,
projectCode
,
codeList
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
getNodeListMapByDefinitionCodes
(
user
,
projectCode
,
codeList
))
.
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
getNodeListMapByDefinitionCodes
(
user
,
projectCode
,
codeList
);
Assert
.
assertTrue
(
response
!=
null
&&
response
.
isSuccess
());
...
...
@@ -325,7 +346,8 @@ public class ProcessDefinitionControllerTest {
Map
<
String
,
Object
>
result
=
new
HashMap
<>();
putMsg
(
result
,
Status
.
SUCCESS
);
Mockito
.
when
(
processDefinitionService
.
queryAllProcessDefinitionByProjectCode
(
user
,
projectCode
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
queryAllProcessDefinitionByProjectCode
(
user
,
projectCode
))
.
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
queryAllProcessDefinitionByProjectCode
(
user
,
projectCode
);
Assert
.
assertTrue
(
response
!=
null
&&
response
.
isSuccess
());
...
...
@@ -340,7 +362,7 @@ public class ProcessDefinitionControllerTest {
Map
<
String
,
Object
>
result
=
new
HashMap
<>();
putMsg
(
result
,
Status
.
SUCCESS
);
Mockito
.
when
(
processDefinitionService
.
viewTree
(
user
,
projectCode
,
processId
,
limit
)).
thenReturn
(
result
);
Mockito
.
when
(
processDefinitionService
.
viewTree
(
user
,
projectCode
,
processId
,
limit
)).
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
viewTree
(
user
,
projectCode
,
processId
,
limit
);
Assert
.
assertTrue
(
response
!=
null
&&
response
.
isSuccess
());
...
...
@@ -354,12 +376,12 @@ public class ProcessDefinitionControllerTest {
String
searchVal
=
""
;
int
userId
=
1
;
Result
result
=
new
Result
();
putMsg
(
result
,
Status
.
SUCCESS
);
result
.
setData
(
new
PageInfo
<
Resource
>(
1
,
10
));
PageInfo
<
ProcessDefinition
>
pageInfo
=
new
PageInfo
<>(
1
,
10
);
Mockito
.
when
(
processDefinitionService
.
queryProcessDefinitionListPaging
(
user
,
projectCode
,
searchVal
,
""
,
userId
,
pageNo
,
pageSize
)).
thenReturn
(
result
);
Result
response
=
processDefinitionController
.
queryProcessDefinitionListPaging
(
user
,
projectCode
,
searchVal
,
""
,
userId
,
pageNo
,
pageSize
);
Mockito
.
when
(
processDefinitionService
.
queryProcessDefinitionListPaging
(
user
,
projectCode
,
searchVal
,
""
,
userId
,
pageNo
,
pageSize
)).
thenReturn
(
pageInfo
);
Result
<
PageInfo
<
ProcessDefinition
>>
response
=
processDefinitionController
.
queryProcessDefinitionListPaging
(
user
,
projectCode
,
searchVal
,
""
,
userId
,
pageNo
,
pageSize
);
Assert
.
assertTrue
(
response
!=
null
&&
response
.
isSuccess
());
}
...
...
@@ -369,8 +391,10 @@ public class ProcessDefinitionControllerTest {
String
processDefinitionIds
=
"1,2"
;
long
projectCode
=
1L
;
HttpServletResponse
response
=
new
MockHttpServletResponse
();
Mockito
.
doNothing
().
when
(
this
.
processDefinitionService
).
batchExportProcessDefinitionByCodes
(
user
,
projectCode
,
processDefinitionIds
,
response
);
processDefinitionController
.
batchExportProcessDefinitionByCodes
(
user
,
projectCode
,
processDefinitionIds
,
response
);
Mockito
.
doNothing
().
when
(
this
.
processDefinitionService
).
batchExportProcessDefinitionByCodes
(
user
,
projectCode
,
processDefinitionIds
,
response
);
processDefinitionController
.
batchExportProcessDefinitionByCodes
(
user
,
projectCode
,
processDefinitionIds
,
response
);
}
@Test
...
...
@@ -381,18 +405,10 @@ public class ProcessDefinitionControllerTest {
putMsg
(
resultMap
,
Status
.
SUCCESS
);
resultMap
.
setData
(
new
PageInfo
<
ProcessDefinitionLog
>(
1
,
10
));
Mockito
.
when
(
processDefinitionService
.
queryProcessDefinitionVersions
(
user
,
projectCode
,
1
,
10
,
1
))
user
,
projectCode
,
1
,
10
,
1
))
.
thenReturn
(
resultMap
);
Result
result
=
processDefinitionController
.
queryProcessDefinitionVersions
(
user
,
projectCode
,
1
,
10
,
1
);
user
,
projectCode
,
1
,
10
,
1
);
Assert
.
assertEquals
(
Status
.
SUCCESS
.
getCode
(),
(
int
)
result
.
getCode
());
}
...
...
@@ -402,7 +418,8 @@ public class ProcessDefinitionControllerTest {
long
projectCode
=
1L
;
Map
<
String
,
Object
>
resultMap
=
new
HashMap
<>();
putMsg
(
resultMap
,
Status
.
SUCCESS
);
Mockito
.
when
(
processDefinitionService
.
switchProcessDefinitionVersion
(
user
,
projectCode
,
1
,
10
)).
thenReturn
(
resultMap
);
Mockito
.
when
(
processDefinitionService
.
switchProcessDefinitionVersion
(
user
,
projectCode
,
1
,
10
))
.
thenReturn
(
resultMap
);
Result
result
=
processDefinitionController
.
switchProcessDefinitionVersion
(
user
,
projectCode
,
1
,
10
);
Assert
.
assertEquals
(
Status
.
SUCCESS
.
getCode
(),
(
int
)
result
.
getCode
());
...
...
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
浏览文件 @
8a47785f
此差异已折叠。
点击以展开。
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
浏览文件 @
8a47785f
...
...
@@ -108,14 +108,12 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* @param searchVal searchVal
* @param userId userId
* @param projectCode projectCode
* @param isAdmin isAdmin
* @return process definition IPage
*/
IPage
<
ProcessDefinition
>
queryDefineListPaging
(
IPage
<
ProcessDefinition
>
page
,
@Param
(
"searchVal"
)
String
searchVal
,
@Param
(
"userId"
)
int
userId
,
@Param
(
"projectCode"
)
long
projectCode
,
@Param
(
"isAdmin"
)
boolean
isAdmin
);
@Param
(
"projectCode"
)
long
projectCode
);
/**
* query all process definition list
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/PageListingResult.java
0 → 100644
浏览文件 @
8a47785f
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.dao.model
;
import
java.util.List
;
import
lombok.AllArgsConstructor
;
import
lombok.Builder
;
import
lombok.Data
;
@Data
@Builder
@AllArgsConstructor
public
class
PageListingResult
<
T
>
{
private
List
<
T
>
records
;
private
long
totalCount
;
private
int
currentPage
;
private
int
pageSize
;
}
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
0 → 100644
浏览文件 @
8a47785f
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.dao.repository
;
import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition
;
import
org.apache.dolphinscheduler.dao.model.PageListingResult
;
import
javax.annotation.Nullable
;
public
interface
ProcessDefinitionDao
{
/**
* Listing the process definition belongs to the given userId and projectCode.
* If the searchValue is not null, will used at processDefinitionName or processDefinitionDescription.
*/
// todo: Right now this method will use fuzzy query at searchVal, this will be very slow if there are exist a lot of
// processDefinition belongs to the target user/project.
PageListingResult
<
ProcessDefinition
>
listingProcessDefinition
(
int
pageNumber
,
int
pageSize
,
@Nullable
String
searchVal
,
int
userId
,
long
projectCode
);
}
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
0 → 100644
浏览文件 @
8a47785f
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.dao.repository.impl
;
import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition
;
import
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper
;
import
org.apache.dolphinscheduler.dao.model.PageListingResult
;
import
org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Repository
;
import
com.baomidou.mybatisplus.core.metadata.IPage
;
import
com.baomidou.mybatisplus.extension.plugins.pagination.Page
;
@Repository
public
class
ProcessDefinitionDaoImpl
implements
ProcessDefinitionDao
{
@Autowired
private
ProcessDefinitionMapper
processDefinitionMapper
;
@Override
public
PageListingResult
<
ProcessDefinition
>
listingProcessDefinition
(
int
pageNumber
,
int
pageSize
,
String
searchVal
,
int
userId
,
long
projectCode
)
{
Page
<
ProcessDefinition
>
page
=
new
Page
<>(
pageNumber
,
pageSize
);
IPage
<
ProcessDefinition
>
processDefinitions
=
processDefinitionMapper
.
queryDefineListPaging
(
page
,
searchVal
,
userId
,
projectCode
);
return
PageListingResult
.<
ProcessDefinition
>
builder
()
.
totalCount
(
processDefinitions
.
getTotal
())
.
currentPage
(
pageNumber
)
.
pageSize
(
pageSize
)
.
records
(
processDefinitions
.
getRecords
())
.
build
();
}
}
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
浏览文件 @
8a47785f
...
...
@@ -68,23 +68,20 @@
and pd.name = #{processDefinitionName}
</select>
<select
id=
"queryDefineListPaging"
resultType=
"org.apache.dolphinscheduler.dao.entity.ProcessDefinition"
>
SELECT td.id, td.code, td.name, td.version, td.release_state, td.project_code, td.user_id, td.description,
td.global_params, td.flag, td.warning_group_id, td.timeout, td.tenant_id, td.update_time, td.create_time,
sc.schedule_release_state, tu.user_name ,td.execution_type
FROM t_ds_process_definition td
left join (select process_definition_code,release_state as schedule_release_state from t_ds_schedules group by
process_definition_code,release_state) sc on sc.process_definition_code = td.code
left join t_ds_user tu on td.user_id = tu.id
where td.project_code = #{projectCode}
<if
test=
" searchVal != null and searchVal != ''"
>
AND (td.name like concat('%', #{searchVal}, '%')
OR td.description like concat('%', #{searchVal}, '%')
)
</if>
SELECT
<include
refid=
"baseSql"
/>
FROM t_ds_process_definition
where project_code = #{projectCode}
AND project_code = #{projectCode}
<if
test=
" userId != 0"
>
and td.user_id = #{userId}
AND user_id = #{userId}
</if>
<if
test=
" searchVal != null and searchVal != ''"
>
AND (
name like concat('%', #{searchVal}, '%') OR description like concat('%', #{searchVal}, '%')
)
</if>
order by
td.update_time desc,td.id a
sc
order by
update_time de
sc
</select>
<select
id=
"queryAllDefinitionList"
resultType=
"org.apache.dolphinscheduler.dao.entity.ProcessDefinition"
>
...
...
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java
浏览文件 @
8a47785f
...
...
@@ -64,7 +64,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
* @return ProcessDefinition
*/
private
ProcessDefinition
insertOne
(
String
name
)
{
//insertOne
//
insertOne
ProcessDefinition
processDefinition
=
new
ProcessDefinition
();
processDefinition
.
setCode
(
atomicLong
.
getAndIncrement
());
processDefinition
.
setName
(
name
);
...
...
@@ -81,9 +81,9 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
*/
@Test
public
void
testUpdate
()
{
//insertOne
//
insertOne
ProcessDefinition
processDefinition
=
insertOne
(
"def 1"
);
//update
//
update
processDefinition
.
setUpdateTime
(
new
Date
());
int
update
=
processDefinitionMapper
.
updateById
(
processDefinition
);
Assert
.
assertEquals
(
1
,
update
);
...
...
@@ -105,7 +105,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
@Test
public
void
testQuery
()
{
insertOne
(
"def 1"
);
//query
//
query
List
<
ProcessDefinition
>
dataSources
=
processDefinitionMapper
.
selectList
(
null
);
Assert
.
assertNotEquals
(
dataSources
.
size
(),
0
);
}
...
...
@@ -136,7 +136,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
user
.
setUserType
(
UserType
.
GENERAL_USER
);
user
.
setTenantId
(
tenant
.
getId
());
userMapper
.
insert
(
user
);
//insertOne
//
insertOne
ProcessDefinition
processDefinition
=
new
ProcessDefinition
();
processDefinition
.
setCode
(
1L
);
processDefinition
.
setName
(
"def 1"
);
...
...
@@ -180,7 +180,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
user
.
setTenantId
(
tenant
.
getId
());
userMapper
.
insert
(
user
);
//insertOne
//
insertOne
ProcessDefinition
processDefinition
=
new
ProcessDefinition
();
processDefinition
.
setCode
(
1L
);
processDefinition
.
setName
(
"def 1"
);
...
...
@@ -243,7 +243,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
user
.
setTenantId
(
tenant
.
getId
());
userMapper
.
insert
(
user
);
//insertOne
//
insertOne
ProcessDefinition
processDefinition
=
new
ProcessDefinition
();
processDefinition
.
setCode
(
1L
);
processDefinition
.
setName
(
"def 1"
);
...
...
@@ -264,7 +264,8 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
public
void
testQueryDefineListPaging
()
{
insertOne
(
"def 1"
);
Page
<
ProcessDefinition
>
page
=
new
Page
(
1
,
3
);
IPage
<
ProcessDefinition
>
processDefinitionIPage
=
processDefinitionMapper
.
queryDefineListPaging
(
page
,
"def"
,
101
,
1010L
,
true
);
IPage
<
ProcessDefinition
>
processDefinitionIPage
=
processDefinitionMapper
.
queryDefineListPaging
(
page
,
"def"
,
101
,
1010L
);
Assert
.
assertNotEquals
(
processDefinitionIPage
.
getTotal
(),
0
);
}
...
...
@@ -318,7 +319,8 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
Long
[]
projectCodes
=
new
Long
[
1
];
projectCodes
[
0
]
=
processDefinition
.
getProjectCode
();
List
<
DefinitionGroupByUser
>
processDefinitions
=
processDefinitionMapper
.
countDefinitionByProjectCodes
(
projectCodes
);
List
<
DefinitionGroupByUser
>
processDefinitions
=
processDefinitionMapper
.
countDefinitionByProjectCodes
(
projectCodes
);
Assert
.
assertNotEquals
(
processDefinitions
.
size
(),
0
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录