未验证 提交 13c607ee 编写于 作者: J JinyLeeChina 提交者: GitHub

[Feature][JsonSplit-api]merge code from dev to json_2 (#5827)

* [BUG-#5678][Registry]fix registry init node miss (#5686)

* [Improvement][UI] Update the update time after the user information is successfully modified (#5684)

* improve

edit the userinfo success, but the updatetime is not the latest.

* Improved shell task execution result log information, adding process.waitFor() and process.exitValue() information to the original log (#5691)
Co-authored-by: Nshenglm <shenglm840722@126.com>

* [Feature-#5565][Master Worker-Server] Global Param passed by sense dependencies (#5603)

* add globalParams new plan with varPool

* add unit test

* add python task varPoolParams


Co-authored-by: wangxj <wangxj31>

* Issue robot translation judgment changed to Chinese (#5694)
Co-authored-by: weixin_41213428's avatarchenxingchun <438044805@qq.com>

* the update function should use post instead of get (#5703)

* enhance form verify (#5696)

* checkState only supports %s not {} (#5711)

* [Fix-5701]When deleting a user, the accessToken associated with the user should also be deleted (#5697)

* update

* fix the codestyle error

* fix the compile error

* support rollback

* [Fix-5699][UI] Fix update user error in user information (#5700)

* [Improvement] the automatically generated spi service name in alert-plugin is wrong (#5676)

* bug fix

the auto generated spi service can't be recongized



* include a  new method

* [Improvement-5622][project management] Modify the title (#5723)

* [Fix-5714] When updating the existing alarm instance, the creation time should't be updated (#5715)



* add a new init method.

* [Fix#5758] There are some problems in the api documentation that need to be improved (#5759)

* add the necessary parameters

* openapi improve

* fix code style error

* [FIX-#5721][master-server] Global params parameter missing (#5757)



Co-authored-by: wangxj <wangxj31>

* [Fix-5738][UI] The cancel button in the pop-up dialog of `batch copy` and `batch move`  doesn't work. (#5739)

* Update relatedItems.vue

* Update relatedItems.vue

* [Improvement#5741][Worker] Improve task process status log  (#5776)

* [Improvement-5773][server] need to support two parameters related to task (#5774)

* add some new parameter for task

* restore official properties

* improve imports

* modify a variable's name
Co-authored-by: Njiang hua <jiang.hua@zhaopin.com.cn>

* [FIX-5786][Improvement][Server] When the Worker turns down, the MasterServer cannot handle the Remove event correctly and throws NPE

* [Improvement][Worker] Task log may be lost #5775 (#5783)

* [Imporvement #5725][CheckStyle] upgrade checkstyle file (#5789)

* [Imporvement #5725][CheckStyle] upgrade checkstyle file
  Upgrade checkstyle.xml to support checkstyle version 8.24+

* change ci checkstyle version

* [Fix-5795][Improvement][Server] The starttime field in the HttpTask log is not displayed as expected.  (#5796)

* improve timestamp format

make the startime in the log of httptask to be easier to read.


* fix bad code smell and update the note.

* [Imporvement #5621][job instance] start-time and end-time (#5621) (#5797)

·the list of workflow instances is sorted by start time and end time
·This closes #5621

* fix (#5803)
Co-authored-by: Nshuangbofu <fusb@tuya.com>

* fix: Remove duplicate "registryClient.close" method calls (#5805)
Co-authored-by: Nwen-hemin <wenhemin@apache.com>

* [Improvement][SPI] support load single plugin (#5794)

change load operation of 'registry.plugin.dir'

* [Improvement][Api Module] refactor registry client, remove spring annotation (#5814)

* fix: refactor registry client, remove spring annotation

* fix UT

* fix UT

* fix checkstyle

* fix UT

* fix UT

* fix UT

* fix: Rename RegistryCenterUtils method name
Co-authored-by: Nwen-hemin <wenhemin@apache.com>

* [Fix-5699][UI] Fix update user error in user information introduced by #5700 (#5735)

* [Fix-5726] When we used the UI page, we found some problems such as parameter validation, parameter update shows success but actually work (#5727)

* enhance the validation in UI

* enchance form verifaction

* simplify disable condition

* fix conflicts
Co-authored-by: NKirs <acm_master@163.com>
Co-authored-by: Nkyoty <echohlne@gmail.com>
Co-authored-by: Nji04xiaogang <ji04xiaogang@163.com>
Co-authored-by: Nshenglm <shenglm840722@126.com>
Co-authored-by: Nwangxj3 <857234426@qq.com>
Co-authored-by: Nxingchun-chen <55787491+xingchun-chen@users.noreply.github.com>
Co-authored-by: weixin_41213428's avatarchenxingchun <438044805@qq.com>
Co-authored-by: NShiwen Cheng <chengshiwen0103@gmail.com>
Co-authored-by: NJianchao Wang <akingchao@qq.com>
Co-authored-by: NTanvi Moharir <74228962+tanvimoharir@users.noreply.github.com>
Co-authored-by: 大数据猿人's avatarHua Jiang <jianghuachinacom@163.com>
Co-authored-by: Njiang hua <jiang.hua@zhaopin.com.cn>
Co-authored-by: NWenjun Ruan <861923274@qq.com>
Co-authored-by: NTandoy <56899730+Tandoy@users.noreply.github.com>
Co-authored-by: N傅双波 <786183073@qq.com>
Co-authored-by: Nshuangbofu <fusb@tuya.com>
Co-authored-by: Nwen-hemin <39549317+wen-hemin@users.noreply.github.com>
Co-authored-by: Nwen-hemin <wenhemin@apache.com>
Co-authored-by: NJinyLeeChina <297062848@qq.com>
上级 47de7cae
......@@ -108,7 +108,7 @@ jobs:
CHECKSTYLE_CONFIG: style/checkstyle.xml
REVIEWDOG_VERSION: v0.10.2
run: |
wget -O - -q https://github.com/checkstyle/checkstyle/releases/download/checkstyle-8.22/checkstyle-8.22-all.jar > /opt/checkstyle.jar
wget -O - -q https://github.com/checkstyle/checkstyle/releases/download/checkstyle-8.43/checkstyle-8.43-all.jar > /opt/checkstyle.jar
wget -O - -q https://raw.githubusercontent.com/reviewdog/reviewdog/master/install.sh | sh -s -- -b /opt ${REVIEWDOG_VERSION}
java -jar /opt/checkstyle.jar "${WORKDIR}" -c "${CHECKSTYLE_CONFIG}" -f xml \
| /opt/reviewdog -f=checkstyle \
......
......@@ -116,8 +116,8 @@ public class AccessTokenController extends BaseController {
@ApiOperation(value = "queryAccessTokenList", notes = "QUERY_ACCESS_TOKEN_LIST_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
......
......@@ -78,7 +78,7 @@ public class AlertGroupController extends BaseController {
@ApiImplicitParams({
@ApiImplicitParam(name = "groupName", value = "GROUP_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "description", value = "DESC", dataType = "String"),
@ApiImplicitParam(name = "alertInstanceIds", value = "alertInstanceIds", dataType = "String")
@ApiImplicitParam(name = "alertInstanceIds", value = "alertInstanceIds", required = true, dataType = "String")
})
@PostMapping(value = "/create")
@ResponseStatus(HttpStatus.CREATED)
......@@ -98,7 +98,7 @@ public class AlertGroupController extends BaseController {
* @param loginUser login user
* @return alert group list
*/
@ApiOperation(value = "list", notes = "QUERY_ALERT_GROUP_LIST_NOTES")
@ApiOperation(value = "listAlertgroupById", notes = "QUERY_ALERT_GROUP_LIST_NOTES")
@GetMapping(value = "/list")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_ALL_ALERTGROUP_ERROR)
......@@ -121,8 +121,8 @@ public class AlertGroupController extends BaseController {
@ApiOperation(value = "queryAlertGroupListPaging", notes = "QUERY_ALERT_GROUP_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
......@@ -156,7 +156,7 @@ public class AlertGroupController extends BaseController {
@ApiImplicitParam(name = "id", value = "ALERT_GROUP_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "groupName", value = "GROUP_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "description", value = "DESC", dataType = "String"),
@ApiImplicitParam(name = "alertInstanceIds", value = "alertInstanceIds", dataType = "String")
@ApiImplicitParam(name = "alertInstanceIds", value = "alertInstanceIds", required = true, dataType = "String")
})
@PostMapping(value = "/update")
@ResponseStatus(HttpStatus.OK)
......
......@@ -102,7 +102,7 @@ public class AlertPluginInstanceController extends BaseController {
* @param pluginInstanceParams instance params
* @return result
*/
@ApiOperation(value = "update", notes = "UPDATE_ALERT_PLUGIN_INSTANCE_NOTES")
@ApiOperation(value = "updateAlertPluginInstance", notes = "UPDATE_ALERT_PLUGIN_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "alertPluginInstanceId", value = "ALERT_PLUGIN_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "instanceName", value = "ALERT_PLUGIN_INSTANCE_NAME", required = true, dataType = "String", example = "DING TALK"),
......@@ -127,7 +127,7 @@ public class AlertPluginInstanceController extends BaseController {
* @param id id
* @return result
*/
@ApiOperation(value = "delete", notes = "DELETE_ALERT_PLUGIN_INSTANCE_NOTES")
@ApiOperation(value = "deleteAlertPluginInstance", notes = "DELETE_ALERT_PLUGIN_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "ALERT_PLUGIN_ID", required = true, dataType = "Int", example = "100")
})
......@@ -149,7 +149,7 @@ public class AlertPluginInstanceController extends BaseController {
* @param id alert plugin instance id
* @return result
*/
@ApiOperation(value = "get", notes = "GET_ALERT_PLUGIN_INSTANCE_NOTES")
@ApiOperation(value = "getAlertPluginInstance", notes = "GET_ALERT_PLUGIN_INSTANCE_NOTES")
@PostMapping(value = "/get")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_ALERT_PLUGIN_INSTANCE_ERROR)
......@@ -166,7 +166,7 @@ public class AlertPluginInstanceController extends BaseController {
* @param loginUser login user
* @return result
*/
@ApiOperation(value = "/queryAll", notes = "QUERY_ALL_ALERT_PLUGIN_INSTANCE_NOTES")
@ApiOperation(value = "queryAllAlertPluginInstance", notes = "QUERY_ALL_ALERT_PLUGIN_INSTANCE_NOTES")
@PostMapping(value = "/queryAll")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_ALL_ALERT_PLUGIN_INSTANCE_ERROR)
......@@ -185,7 +185,7 @@ public class AlertPluginInstanceController extends BaseController {
*/
@ApiOperation(value = "verifyAlertInstanceName", notes = "VERIFY_ALERT_INSTANCE_NAME_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "groupName", value = "GROUP_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "alertInstanceName", value = "ALERT_INSTANCE_NAME", required = true, dataType = "String"),
})
@GetMapping(value = "/verify-alert-instance-name")
@ResponseStatus(HttpStatus.OK)
......@@ -216,8 +216,8 @@ public class AlertPluginInstanceController extends BaseController {
*/
@ApiOperation(value = "queryAlertPluginInstanceListPaging", notes = "QUERY_ALERT_PLUGIN_INSTANCE_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
......
......@@ -86,7 +86,7 @@ public class DataSourceController extends BaseController {
@ApiException(CREATE_DATASOURCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result createDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "DATA_SOURCE_PARAM", required = true)
@ApiParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true)
@RequestBody BaseDataSourceParamDTO dataSourceParam) {
return dataSourceService.createDataSource(loginUser, dataSourceParam);
}
......@@ -101,7 +101,7 @@ public class DataSourceController extends BaseController {
*/
@ApiOperation(value = "updateDataSource", notes = "UPDATE_DATA_SOURCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "DATA_SOURCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO"),
})
@PostMapping(value = "/update")
@ResponseStatus(HttpStatus.OK)
......@@ -168,8 +168,8 @@ public class DataSourceController extends BaseController {
@ApiOperation(value = "queryDataSourceListPaging", notes = "QUERY_DATA_SOURCE_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
......@@ -197,7 +197,7 @@ public class DataSourceController extends BaseController {
*/
@ApiOperation(value = "connectDataSource", notes = "CONNECT_DATA_SOURCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "DATA_SOURCE_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO"),
})
@PostMapping(value = "/connect")
@ResponseStatus(HttpStatus.OK)
......@@ -237,7 +237,7 @@ public class DataSourceController extends BaseController {
* @param id datasource id
* @return delete result
*/
@ApiOperation(value = "delete", notes = "DELETE_DATA_SOURCE_NOTES")
@ApiOperation(value = "deleteDataSource", notes = "DELETE_DATA_SOURCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "DATA_SOURCE_ID", required = true, dataType = "Int", example = "100")
})
......
......@@ -67,9 +67,9 @@ public class LoggerController extends BaseController {
*/
@ApiOperation(value = "queryLog", notes = "QUERY_TASK_INSTANCE_LOG_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "limit", value = "LIMIT", dataType = "Int", example = "100")
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "limit", value = "LIMIT", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/detail")
@ResponseStatus(HttpStatus.OK)
......@@ -92,7 +92,7 @@ public class LoggerController extends BaseController {
*/
@ApiOperation(value = "downloadTaskLog", notes = "DOWNLOAD_TASK_INSTANCE_LOG_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/download-log")
@ResponseBody
......
......@@ -106,8 +106,8 @@ public class ProcessInstanceController extends BaseController {
@ApiImplicitParam(name = "host", value = "HOST", type = "String"),
@ApiImplicitParam(name = "startDate", value = "START_DATE", type = "String"),
@ApiImplicitParam(name = "endDate", value = "END_DATE", type = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "100")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "list-paging")
@ResponseStatus(HttpStatus.OK)
......@@ -145,7 +145,7 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "queryTaskListByProcessId", notes = "QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/task-list-by-process-id")
@ResponseStatus(HttpStatus.OK)
......@@ -176,9 +176,9 @@ public class ProcessInstanceController extends BaseController {
@ApiOperation(value = "updateProcessInstance", notes = "UPDATE_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceJson", value = "PROCESS_INSTANCE_JSON", type = "String"),
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type = "String"),
@ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", type = "Boolean"),
@ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required = true, type = "Boolean"),
@ApiImplicitParam(name = "locations", value = "PROCESS_INSTANCE_LOCATIONS", type = "String"),
@ApiImplicitParam(name = "connects", value = "PROCESS_INSTANCE_CONNECTS", type = "String"),
@ApiImplicitParam(name = "flag", value = "RECOVERY_PROCESS_INSTANCE_FLAG", type = "Flag"),
......@@ -212,7 +212,7 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "queryProcessInstanceById", notes = "QUERY_PROCESS_INSTANCE_BY_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/select-by-id")
@ResponseStatus(HttpStatus.OK)
......@@ -238,9 +238,9 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "queryTopNLongestRunningProcessInstance", notes = "QUERY_TOPN_LONGEST_RUNNING_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "size", value = "PROCESS_INSTANCE_SIZE", dataType = "Int", example = "10"),
@ApiImplicitParam(name = "startTime", value = "PROCESS_INSTANCE_START_TIME", dataType = "String"),
@ApiImplicitParam(name = "endTime", value = "PROCESS_INSTANCE_END_TIME", dataType = "String"),
@ApiImplicitParam(name = "size", value = "PROCESS_INSTANCE_SIZE", required = true, dataType = "Int", example = "10"),
@ApiImplicitParam(name = "startTime", value = "PROCESS_INSTANCE_START_TIME", required = true, dataType = "String"),
@ApiImplicitParam(name = "endTime", value = "PROCESS_INSTANCE_END_TIME", required = true, dataType = "String"),
})
@GetMapping(value = "/top-n")
@ResponseStatus(HttpStatus.OK)
......@@ -269,7 +269,7 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "deleteProcessInstanceById", notes = "DELETE_PROCESS_INSTANCE_BY_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/delete")
@ResponseStatus(HttpStatus.OK)
......@@ -294,7 +294,7 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "querySubProcessInstanceByTaskId", notes = "QUERY_SUBPROCESS_INSTANCE_BY_TASK_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskId", value = "TASK_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "taskId", value = "TASK_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/select-sub-process")
@ResponseStatus(HttpStatus.OK)
......@@ -317,7 +317,7 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "queryParentInstanceBySubId", notes = "QUERY_PARENT_PROCESS_INSTANCE_BY_SUB_PROCESS_INSTANCE_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "subId", value = "SUB_PROCESS_INSTANCE_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "subId", value = "SUB_PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/select-parent-process")
@ResponseStatus(HttpStatus.OK)
......@@ -339,7 +339,7 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "viewVariables", notes = "QUERY_PROCESS_INSTANCE_GLOBAL_VARIABLES_AND_LOCAL_VARIABLES_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/view-variables")
@ResponseStatus(HttpStatus.OK)
......@@ -361,7 +361,7 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "vieGanttTree", notes = "VIEW_GANTT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/view-gantt")
@ResponseStatus(HttpStatus.OK)
......@@ -383,6 +383,11 @@ public class ProcessInstanceController extends BaseController {
* @param processInstanceIds process instance id
* @return delete result code
*/
@ApiOperation(value = "batchDeleteProcessInstanceByIds", notes = "BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectName", value = "PROJECT_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "processInstanceIds", value = "PROCESS_INSTANCE_IDS", required = true, dataType = "String"),
})
@GetMapping(value = "/batch-delete")
@ResponseStatus(HttpStatus.OK)
@ApiException(BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR)
......
......@@ -89,8 +89,8 @@ public class QueueController extends BaseController {
@ApiOperation(value = "queryQueueListPaging", notes = "QUERY_QUEUE_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
......
......@@ -226,8 +226,8 @@ public class ResourcesController extends BaseController {
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType = "ResourceType"),
@ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType = "int", example = "10"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
......@@ -487,7 +487,7 @@ public class ResourcesController extends BaseController {
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "UDF_TYPE", required = true, dataType = "UdfType"),
@ApiImplicitParam(name = "funcName", value = "FUNC_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "suffix", value = "CLASS_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "className", value = "CLASS_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "argTypes", value = "ARG_TYPES", dataType = "String"),
@ApiImplicitParam(name = "database", value = "DATABASE_NAME", dataType = "String"),
@ApiImplicitParam(name = "description", value = "UDF_DESC", dataType = "String"),
......@@ -585,8 +585,8 @@ public class ResourcesController extends BaseController {
@ApiOperation(value = "queryUdfFuncListPaging", notes = "QUERY_UDF_FUNCTION_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/udf-func/list-paging")
@ResponseStatus(HttpStatus.OK)
......
......@@ -89,8 +89,8 @@ public class TaskInstanceController extends BaseController {
@ApiImplicitParam(name = "host", value = "HOST", type = "String"),
@ApiImplicitParam(name = "startDate", value = "START_DATE", type = "String"),
@ApiImplicitParam(name = "endDate", value = "END_DATE", type = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping("/list-paging")
@ResponseStatus(HttpStatus.OK)
......
......@@ -103,8 +103,8 @@ public class TenantController extends BaseController {
@ApiOperation(value = "queryTenantlistPaging", notes = "QUERY_TENANT_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
......@@ -153,7 +153,7 @@ public class TenantController extends BaseController {
*/
@ApiOperation(value = "updateTenant", notes = "UPDATE_TENANT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "ID", value = "TENANT_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "id", value = "TENANT_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", required = true, dataType = "String"),
@ApiImplicitParam(name = "queueId", value = "QUEUE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "description", value = "TENANT_DESC", type = "String")
......@@ -182,7 +182,7 @@ public class TenantController extends BaseController {
*/
@ApiOperation(value = "deleteTenantById", notes = "DELETE_TENANT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "ID", value = "TENANT_ID", required = true, dataType = "Int", example = "100")
@ApiImplicitParam(name = "id", value = "TENANT_ID", required = true, dataType = "Int", example = "100")
})
@PostMapping(value = "/delete")
......
......@@ -75,7 +75,7 @@ public class UiPluginController extends BaseController {
@ApiOperation(value = "queryUiPluginDetailById", notes = "QUERY_UI_PLUGIN_DETAIL_BY_ID")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "id", required = true, dataType = "PluginType"),
@ApiImplicitParam(name = "pluginId", value = "PLUGIN_ID", required = true, dataType = "Int", example = "100"),
})
@PostMapping(value = "/queryUiPluginDetailById")
@ResponseStatus(HttpStatus.CREATED)
......
......@@ -90,11 +90,11 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "createUser", notes = "CREATE_USER_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userName", value = "USER_NAME", type = "String"),
@ApiImplicitParam(name = "userPassword", value = "USER_PASSWORD", type = "String"),
@ApiImplicitParam(name = "tenantId", value = "TENANT_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "userName", value = "USER_NAME", required = true, type = "String"),
@ApiImplicitParam(name = "userPassword", value = "USER_PASSWORD", required = true, type = "String"),
@ApiImplicitParam(name = "tenantId", value = "TENANT_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "queue", value = "QUEUE", dataType = "String"),
@ApiImplicitParam(name = "email", value = "EMAIL", dataType = "String"),
@ApiImplicitParam(name = "email", value = "EMAIL", required = true, dataType = "String"),
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "String"),
@ApiImplicitParam(name = "state", value = "STATE", dataType = "Int", example = "1")
})
......@@ -125,8 +125,8 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "queryUserList", notes = "QUERY_USER_LIST_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "10"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type = "String")
})
@GetMapping(value = "/list-paging")
......@@ -162,12 +162,12 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "updateUser", notes = "UPDATE_USER_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "USER_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "userName", value = "USER_NAME", type = "String"),
@ApiImplicitParam(name = "userPassword", value = "USER_PASSWORD", type = "String"),
@ApiImplicitParam(name = "tenantId", value = "TENANT_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "id", value = "USER_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "userName", value = "USER_NAME", required = true, type = "String"),
@ApiImplicitParam(name = "userPassword", value = "USER_PASSWORD", required = true, type = "String"),
@ApiImplicitParam(name = "tenantId", value = "TENANT_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "queue", value = "QUEUE", dataType = "String"),
@ApiImplicitParam(name = "email", value = "EMAIL", dataType = "String"),
@ApiImplicitParam(name = "email", value = "EMAIL", required = true, dataType = "String"),
@ApiImplicitParam(name = "phone", value = "PHONE", dataType = "String"),
@ApiImplicitParam(name = "state", value = "STATE", dataType = "Int", example = "1")
})
......@@ -197,7 +197,7 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "delUserById", notes = "DELETE_USER_BY_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "USER_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "id", value = "USER_ID", required = true, dataType = "Int", example = "100")
})
@PostMapping(value = "/delete")
@ResponseStatus(HttpStatus.OK)
......@@ -219,8 +219,8 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "grantProject", notes = "GRANT_PROJECT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userId", value = "USER_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "projectIds", value = "PROJECT_IDS", type = "String")
@ApiImplicitParam(name = "userId", value = "USER_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "projectIds", value = "PROJECT_IDS", required = true, type = "String")
})
@PostMapping(value = "/grant-project")
@ResponseStatus(HttpStatus.OK)
......@@ -243,8 +243,8 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "grantResource", notes = "GRANT_RESOURCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userId", value = "USER_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "resourceIds", value = "RESOURCE_IDS", type = "String")
@ApiImplicitParam(name = "userId", value = "USER_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "resourceIds", value = "RESOURCE_IDS", required = true, type = "String")
})
@PostMapping(value = "/grant-file")
@ResponseStatus(HttpStatus.OK)
......@@ -268,8 +268,8 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "grantUDFFunc", notes = "GRANT_UDF_FUNC_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userId", value = "USER_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "udfIds", value = "UDF_IDS", type = "String")
@ApiImplicitParam(name = "userId", value = "USER_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "udfIds", value = "UDF_IDS", required = true, type = "String")
})
@PostMapping(value = "/grant-udf-func")
@ResponseStatus(HttpStatus.OK)
......@@ -293,8 +293,8 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "grantDataSource", notes = "GRANT_DATASOURCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userId", value = "USER_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "datasourceIds", value = "DATASOURCE_IDS", type = "String")
@ApiImplicitParam(name = "userId", value = "USER_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "datasourceIds", value = "DATASOURCE_IDS", required = true, type = "String")
})
@PostMapping(value = "/grant-datasource")
@ResponseStatus(HttpStatus.OK)
......@@ -366,7 +366,7 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "verifyUserName", notes = "VERIFY_USER_NAME_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userName", value = "USER_NAME", type = "String")
@ApiImplicitParam(name = "userName", value = "USER_NAME", required = true, type = "String")
})
@GetMapping(value = "/verify-user-name")
@ResponseStatus(HttpStatus.OK)
......@@ -388,7 +388,7 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "unauthorizedUser", notes = "UNAUTHORIZED_USER_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "alertgroupId", value = "ALERT_GROUP_ID", type = "String")
@ApiImplicitParam(name = "alertgroupId", value = "ALERT_GROUP_ID", required = true, type = "String")
})
@GetMapping(value = "/unauth-user")
@ResponseStatus(HttpStatus.OK)
......@@ -410,7 +410,7 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "authorizedUser", notes = "AUTHORIZED_USER_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "alertgroupId", value = "ALERT_GROUP_ID", type = "String")
@ApiImplicitParam(name = "alertgroupId", value = "ALERT_GROUP_ID", required = true, type = "String")
})
@GetMapping(value = "/authed-user")
@ResponseStatus(HttpStatus.OK)
......@@ -437,10 +437,10 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value="registerUser",notes = "REGISTER_USER_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userName", value = "USER_NAME", type = "String"),
@ApiImplicitParam(name = "userPassword", value = "USER_PASSWORD", type = "String"),
@ApiImplicitParam(name = "repeatPassword", value = "REPEAT_PASSWORD", type = "String"),
@ApiImplicitParam(name = "email", value = "EMAIL", type = "String"),
@ApiImplicitParam(name = "userName", value = "USER_NAME", required = true, type = "String"),
@ApiImplicitParam(name = "userPassword", value = "USER_PASSWORD", required = true, type = "String"),
@ApiImplicitParam(name = "repeatPassword", value = "REPEAT_PASSWORD", required = true, type = "String"),
@ApiImplicitParam(name = "email", value = "EMAIL", required = true, type = "String"),
})
@PostMapping("/register")
@ResponseStatus(HttpStatus.OK)
......@@ -485,7 +485,7 @@ public class UsersController extends BaseController {
*/
@ApiOperation(value = "batchActivateUser",notes = "BATCH_ACTIVATE_USER_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userNames", value = "USER_NAMES", type = "String"),
@ApiImplicitParam(name = "userNames", value = "USER_NAMES", required = true, type = "String"),
})
@PostMapping("/batch/activate")
@ResponseStatus(HttpStatus.OK)
......
......@@ -45,6 +45,7 @@ import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
......@@ -60,6 +61,7 @@ public class WorkFlowLineageController extends BaseController {
@Autowired
private WorkFlowLineageService workFlowLineageService;
@ApiOperation(value = "queryWorkFlowLineageByName", notes = "QUERY_WORKFLOW_LINEAGE_BY_NAME_NOTES")
@GetMapping(value = "/list-name")
@ResponseStatus(HttpStatus.OK)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
......@@ -76,6 +78,7 @@ public class WorkFlowLineageController extends BaseController {
}
}
@ApiOperation(value = "queryWorkFlowLineageByIds", notes = "QUERY_WORKFLOW_LINEAGE_BY_IDS_NOTES")
@GetMapping(value = "/list-ids")
@ResponseStatus(HttpStatus.OK)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
......
......@@ -99,8 +99,8 @@ public class WorkerGroupController extends BaseController {
*/
@ApiOperation(value = "queryAllWorkerGroupsPaging", notes = "QUERY_WORKER_GROUP_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String")
})
@GetMapping(value = "/list-paging")
......
......@@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -109,11 +110,9 @@ public class AlertPluginInstanceServiceImpl extends BaseServiceImpl implements A
@Override
public Map<String, Object> update(User loginUser, int pluginInstanceId, String instanceName, String pluginInstanceParams) {
AlertPluginInstance alertPluginInstance = new AlertPluginInstance();
String paramsMapJson = parsePluginParamsMap(pluginInstanceParams);
alertPluginInstance.setPluginInstanceParams(paramsMapJson);
alertPluginInstance.setInstanceName(instanceName);
alertPluginInstance.setId(pluginInstanceId);
AlertPluginInstance alertPluginInstance = new AlertPluginInstance(pluginInstanceId, paramsMapJson, instanceName, new Date());
Map<String, Object> result = new HashMap<>();
int i = alertPluginInstanceMapper.updateById(alertPluginInstance);
......
......@@ -119,10 +119,6 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
* updateProcessInstance datasource
*
* @param loginUser login user
* @param name data source name
* @param desc data source description
* @param type data source type
* @param parameter datasource parameters
* @param id data source id
* @return update result code
*/
......@@ -157,14 +153,14 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
}
Result<Object> isConnection = checkConnection(dataSource.getType(), connectionParam);
if (Status.SUCCESS.getCode() != isConnection.getCode()) {
return result;
if (isConnection.isFailed()) {
return isConnection;
}
Date now = new Date();
dataSource.setName(dataSource.getName().trim());
dataSource.setNote(dataSource.getNote());
dataSource.setName(dataSourceParam.getName().trim());
dataSource.setNote(dataSourceParam.getNote());
dataSource.setUserName(loginUser.getUserName());
dataSource.setType(dataSource.getType());
dataSource.setConnectionParams(JSONUtils.toJsonString(connectionParam));
......@@ -312,7 +308,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
* check connection
*
* @param type data source type
* @param parameter data source parameters
* @param connectionParam connectionParam
* @return true if connect successfully, otherwise false
*/
@Override
......
......@@ -17,27 +17,22 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.utils.RegistryMonitor;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -49,12 +44,6 @@ import com.google.common.collect.Sets;
@Service
public class MonitorServiceImpl extends BaseServiceImpl implements MonitorService {
@Autowired
private RegistryMonitor registryMonitor;
@Autowired
private RegistryClient registryClient;
@Autowired
private MonitorDBDao monitorDBDao;
......@@ -105,7 +94,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
public Map<String,Object> queryZookeeperState(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<ZookeeperRecord> zookeeperRecordList = registryMonitor.zookeeperInfoList();
List<ZookeeperRecord> zookeeperRecordList = RegistryCenterUtils.zookeeperInfoList();
result.put(Constants.DATA_LIST, zookeeperRecordList);
putMsg(result, Status.SUCCESS);
......@@ -160,10 +149,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
@Override
public List<Server> getServerListFromRegistry(boolean isMaster) {
checkNotNull(registryMonitor);
NodeType nodeType = isMaster ? NodeType.MASTER : NodeType.WORKER;
return registryClient.getServerList(nodeType);
return isMaster ? RegistryCenterUtils.getMasterServers() : RegistryCenterUtils.getWorkerServers();
}
}
......@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.RegistryMonitor;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
......@@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
import java.util.Date;
......@@ -40,8 +39,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -61,16 +58,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
WorkerGroupMapper workerGroupMapper;
@Autowired
private RegistryMonitor registryMonitor;
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Resource
RegistryClient registryClient;
/**
* create or update a worker group
*
......@@ -147,7 +137,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
}
// check zookeeper
String workerGroupPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SLASH + workerGroup.getName();
return registryClient.isExisted(workerGroupPath);
return RegistryCenterUtils.isNodeExisted(workerGroupPath);
}
/**
......@@ -157,7 +147,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return boolean
*/
private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
Map<String, String> serverMaps = registryMonitor.getServerMaps(NodeType.WORKER, true);
Map<String, String> serverMaps = RegistryCenterUtils.getServerMaps(NodeType.WORKER, true);
if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
return null;
}
......@@ -258,7 +248,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
List<String> workerGroupList = null;
try {
workerGroupList = registryClient.getChildrenKeys(workerPath);
workerGroupList = RegistryCenterUtils.getChildrenNodes(workerPath);
} catch (Exception e) {
logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging);
}
......@@ -276,7 +266,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
String workerGroupPath = workerPath + Constants.SLASH + workerGroup;
List<String> childrenNodes = null;
try {
childrenNodes = registryClient.getChildrenKeys(workerGroupPath);
childrenNodes = RegistryCenterUtils.getChildrenNodes(workerGroupPath);
} catch (Exception e) {
logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
}
......@@ -287,7 +277,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
wg.setName(workerGroup);
if (isPaging) {
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
String registeredValue = registryClient.get(workerGroupPath + Constants.SLASH + childrenNodes.get(0));
String registeredValue = RegistryCenterUtils.getNodeData(workerGroupPath + Constants.SLASH + childrenNodes.get(0));
wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[6]));
wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[7]));
wg.setSystemDefault(true);
......@@ -334,7 +324,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Override
public Map<String, Object> getWorkerAddressList() {
Map<String, Object> result = new HashMap<>();
List<String> serverNodeList = registryMonitor.getServerNodeList(NodeType.WORKER, true);
List<String> serverNodeList = RegistryCenterUtils.getServerNodeList(NodeType.WORKER, true);
result.put(Constants.DATA_LIST, serverNodeList);
putMsg(result, Status.SUCCESS);
return result;
......
......@@ -26,31 +26,19 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* monitor zookeeper info todo registry-spi
* fixme Some of the information obtained in the api belongs to the unique information of zk.
* I am not sure whether there is a good abstraction method. This is related to whether the specific plug-in is provided.
*/
@Component
public class RegistryMonitor {
public class RegistryCenterUtils {
@Autowired
RegistryClient registryClient;
@PostConstruct
public void initRegistry() {
registryClient.init();
}
private static RegistryClient registryClient = RegistryClient.getInstance();
/**
* @return zookeeper info list
*/
public List<ZookeeperRecord> zookeeperInfoList() {
public static List<ZookeeperRecord> zookeeperInfoList() {
return null;
}
......@@ -59,7 +47,7 @@ public class RegistryMonitor {
*
* @return master server information
*/
public List<Server> getMasterServers() {
public static List<Server> getMasterServers() {
return registryClient.getServerList(NodeType.MASTER);
}
......@@ -68,7 +56,7 @@ public class RegistryMonitor {
*
* @return worker server informations
*/
public List<Server> getWorkerServers() {
public static List<Server> getWorkerServers() {
return registryClient.getServerList(NodeType.WORKER);
}
......@@ -106,11 +94,23 @@ public class RegistryMonitor {
return list;
}
public Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
public static Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
return registryClient.getServerMaps(nodeType, hostOnly);
}
public List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
public static List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
return registryClient.getServerNodeList(nodeType, hostOnly);
}
public static boolean isNodeExisted(String key) {
return registryClient.isExisted(key);
}
public static List<String> getChildrenNodes(final String key) {
return registryClient.getChildrenKeys(key);
}
public static String getNodeData(String key) {
return registryClient.get(key);
}
}
......@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
......@@ -37,11 +36,28 @@ DELETE_ALERT_GROUP_BY_ID_NOTES=delete alert group by id
VERIFY_ALERT_GROUP_NAME_NOTES=verify alert group name, check alert group exist or not
GRANT_ALERT_GROUP_NOTES=grant alert group
USER_IDS=user id list
EXECUTOR_TAG=executor operation
EXECUTOR_NAME=executor name
WORKER_GROUP=work group
startParams=start parameters
ALERT_GROUP_TAG=alert group related operation
ALERT_PLUGIN_INSTANCE_TAG=alert plugin instance related operation
WORK_FLOW_LINEAGE_TAG=work flow lineage related operation
UI_PLUGINS_TAG=UI plugin related operation
UPDATE_ALERT_PLUGIN_INSTANCE_NOTES=update alert plugin instance operation
CREATE_ALERT_PLUGIN_INSTANCE_NOTES=create alert plugin instance operation
DELETE_ALERT_PLUGIN_INSTANCE_NOTES=delete alert plugin instance operation
QUERY_ALERT_PLUGIN_INSTANCE_LIST_PAGING_NOTES=query alert plugin instance paging
QUERY_TOPN_LONGEST_RUNNING_PROCESS_INSTANCE_NOTES=query topN longest running process instance
ALERT_PLUGIN_INSTANCE_NAME=alert plugin instance name
ALERT_PLUGIN_DEFINE_ID=alert plugin define id
ALERT_PLUGIN_ID=alert plugin id
ALERT_PLUGIN_INSTANCE_ID=alert plugin instance id
ALERT_PLUGIN_INSTANCE_PARAMS=alert plugin instance parameters
ALERT_INSTANCE_NAME=alert instance name
VERIFY_ALERT_INSTANCE_NAME_NOTES=verify alert instance name
DATA_SOURCE_PARAM=datasource parameter
QUERY_ALL_ALERT_PLUGIN_INSTANCE_NOTES=query all alert plugin instances
GET_ALERT_PLUGIN_INSTANCE_NOTES=get alert plugin instance operation
CREATE_ALERT_GROUP_NOTES=create alert group
WORKER_GROUP_TAG=worker group related operation
......@@ -151,6 +167,9 @@ FAILURE_STRATEGY=failure strategy
RECEIVERS=receivers
RECEIVERS_CC=receivers cc
WORKER_GROUP_ID=worker server group id
PROCESS_INSTANCE_START_TIME=process instance start time
PROCESS_INSTANCE_END_TIME=process instance end time
PROCESS_INSTANCE_SIZE=process instance size
PROCESS_INSTANCE_PRIORITY=process instance priority
UPDATE_SCHEDULE_NOTES=update schedule
SCHEDULE_ID=schedule id
......@@ -169,7 +188,7 @@ PROCESS_INSTANCE_LOCATIONS=process instance node locations info (json format)
PROCESS_DEFINITION_CONNECTS=process definition node connects info (json format)
PROCESS_INSTANCE_CONNECTS=process instance node connects info (json format)
PROCESS_DEFINITION_DESC=process definition desc
PROCESS_DEFINITION_TAG=process definition related opertation
PROCESS_DEFINITION_TAG=process definition related operation
SIGNOUT_NOTES=logout
USER_PASSWORD=user password
UPDATE_PROCESS_INSTANCE_NOTES=update process instance
......@@ -179,6 +198,9 @@ LOGIN_NOTES=user login
UPDATE_PROCESS_DEFINITION_NOTES=update process definition
PROCESS_DEFINITION_ID=process definition id
PROCESS_DEFINITION_IDS=process definition ids
PROCESS_DEFINITION_CODE=process definition code
PROCESS_DEFINITION_CODE_LIST=process definition code list
IMPORT_PROCESS_DEFINITION_NOTES=import process definition
RELEASE_PROCESS_DEFINITION_NOTES=release process definition
QUERY_PROCESS_DEFINITION_BY_ID_NOTES=query process definition by id
QUERY_PROCESS_DEFINITION_LIST_NOTES=query process definition list
......@@ -188,22 +210,46 @@ PAGE_NO=page no
PROCESS_INSTANCE_ID=process instance id
PROCESS_INSTANCE_JSON=process instance info(json format)
SCHEDULE_TIME=schedule time
SYNC_DEFINE=update the information of the process instance to the process definition\
SYNC_DEFINE=update the information of the process instance to the process definition
RECOVERY_PROCESS_INSTANCE_FLAG=whether to recovery process instance
PREVIEW_SCHEDULE_NOTES=preview schedule
SEARCH_VAL=search val
USER_ID=user id
FORCE_TASK_SUCCESS=force task success
QUERY_TASK_INSTANCE_LIST_PAGING_NOTES=query task instance list paging
PROCESS_INSTANCE_NAME=process instance name
TASK_INSTANCE_ID=task instance id
VERIFY_TENANT_CODE_NOTES=verify tenant code
QUERY_UI_PLUGIN_DETAIL_BY_ID=query ui plugin detail by id
PLUGIN_ID=plugin id
QUERY_UI_PLUGINS_BY_TYPE=query ui plugins by type
ACTIVATE_USER_NOTES=active user
BATCH_ACTIVATE_USER_NOTES=batch active user
STATE=state
REPEAT_PASSWORD=repeat password
REGISTER_USER_NOTES=register user
USER_NAMES=user names
PAGE_SIZE=page size
LIMIT=limit
CREATE_WORKER_GROUP_NOTES=create worker group
WORKER_ADDR_LIST=worker address list
QUERY_WORKER_ADDRESS_LIST_NOTES=query worker address list
QUERY_WORKFLOW_LINEAGE_BY_IDS_NOTES=query workflow lineage by ids
QUERY_WORKFLOW_LINEAGE_BY_NAME_NOTES=query workflow lineage by name
VIEW_TREE_NOTES=view tree
UDF_ID=udf id
GET_NODE_LIST_BY_DEFINITION_ID_NOTES=get task node list by process definition id
GET_NODE_LIST_BY_DEFINITION_CODE_NOTES=get node list by definition code
QUERY_PROCESS_DEFINITION_BY_NAME_NOTES=query process definition by name
PROCESS_DEFINITION_ID_LIST=process definition id list
QUERY_PROCESS_DEFINITION_All_BY_PROJECT_ID_NOTES=query process definition all by project id
DELETE_PROCESS_DEFINITION_BY_ID_NOTES=delete process definition by process definition id
BATCH_DELETE_PROCESS_DEFINITION_BY_IDS_NOTES=batch delete process definition by process definition ids
BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_NOTES=batch delete process instance by process ids
QUERY_PROCESS_INSTANCE_BY_ID_NOTES=query process instance by process instance id
DELETE_PROCESS_INSTANCE_BY_ID_NOTES=delete process instance by process instance id
TASK_ID=task instance id
PROCESS_INSTANCE_IDS=process_instance ids
SKIP_LINE_NUM=skip line num
QUERY_TASK_INSTANCE_LOG_NOTES=query task instance log
DOWNLOAD_TASK_INSTANCE_LOG_NOTES=download task instance log
......@@ -217,6 +263,7 @@ EMAIL=email
PHONE=phone
QUERY_USER_LIST_NOTES=query user list
UPDATE_USER_NOTES=update user
UPDATE_QUEUE_NOTES=update queue
DELETE_USER_BY_ID_NOTES=delete user by id
GRANT_PROJECT_NOTES=GRANT PROJECT
PROJECT_IDS=project ids(string format, multiple projects separated by ",")
......@@ -228,6 +275,14 @@ VERIFY_USER_NAME_NOTES=verify user name
UNAUTHORIZED_USER_NOTES=cancel authorization
ALERT_GROUP_ID=alert group id
AUTHORIZED_USER_NOTES=authorized user
AUTHORIZE_RESOURCE_TREE_NOTES=authorize resource tree
RESOURCE_CURRENTDIR=dir of the current resource
QUERY_RESOURCE_LIST_PAGING_NOTES=query resource list paging
RESOURCE_PID=parent directory ID of the current resource
RESOURCE_FULL_NAME=resource full name
QUERY_BY_RESOURCE_NAME=query by resource name
QUERY_UDF_FUNC_LIST_NOTES=query udf funciton list
VERIFY_RESOURCE_NAME_NOTES=verify resource name
GRANT_UDF_FUNC_NOTES=grant udf function
UDF_IDS=udf ids(string format, multiple udf functions separated by ",")
GRANT_DATASOURCE_NOTES=grant datasource
......@@ -260,13 +315,13 @@ AUTHORIZED_DATA_SOURCE_NOTES=authorized data source
DELETE_SCHEDULER_BY_ID_NOTES=delete scheduler by id
QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging
EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=export process definition by id
BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES= batch export process definition by ids
QUERY_USER_CREATED_PROJECT_NOTES= query user created project
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_NOTES= query authorized and user created project
COPY_PROCESS_DEFINITION_NOTES= copy process definition notes
MOVE_PROCESS_DEFINITION_NOTES= move process definition notes
TARGET_PROJECT_ID= target project id
IS_COPY = is copy
BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES=batch export process definition by ids
QUERY_USER_CREATED_PROJECT_NOTES=query user created project
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_NOTES=query authorized and user created project
COPY_PROCESS_DEFINITION_NOTES=copy process definition notes
MOVE_PROCESS_DEFINITION_NOTES=move process definition notes
TARGET_PROJECT_ID=target project id
IS_COPY=is copy
DELETE_PROCESS_DEFINITION_VERSION_NOTES=delete process definition version
QUERY_PROCESS_DEFINITION_VERSIONS_NOTES=query process definition versions
SWITCH_PROCESS_DEFINITION_VERSION_NOTES=switch process definition version
......
......@@ -14,9 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
QUERY_SCHEDULE_LIST_NOTES=查询定时列表
PROCESS_INSTANCE_EXECUTOR_TAG=流程实例执行相关操作
UI_PLUGINS_TAG=UI插件相关操作
WORK_FLOW_LINEAGE_TAG=工作流血缘相关操作
RUN_PROCESS_INSTANCE_NOTES=运行流程实例
START_NODE_LIST=开始节点列表(节点name)
TASK_DEPEND_TYPE=任务依赖类型
......@@ -25,27 +26,32 @@ RUN_MODE=运行模式
TIMEOUT=超时时间
EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=执行流程实例的各种操作(暂停、停止、重跑、恢复等)
EXECUTE_TYPE=执行类型
EXECUTOR_TAG=流程相关操作
EXECUTOR_NAME=流程名称
START_CHECK_PROCESS_DEFINITION_NOTES=检查流程定义
DESC=备注(描述)
GROUP_NAME=组名称
WORKER_GROUP=worker群组
startParams=启动参数
GROUP_TYPE=组类型
QUERY_ALERT_GROUP_LIST_NOTES=告警组列表\
QUERY_ALERT_GROUP_LIST_NOTES=告警组列表
UPDATE_ALERT_GROUP_NOTES=编辑(更新)告警组
DELETE_ALERT_GROUP_BY_ID_NOTES=删除告警组通过ID
DELETE_ALERT_GROUP_BY_ID_NOTES=通过ID删除告警组
VERIFY_ALERT_GROUP_NAME_NOTES=检查告警组是否存在
GRANT_ALERT_GROUP_NOTES=授权告警组
PROCESS_DEFINITION_IDS=流程定义ID
PROCESS_DEFINITION_CODE=流程定义编码
PROCESS_DEFINITION_CODE_LIST=流程定义编码列表
USER_IDS=用户ID列表
ALERT_GROUP_TAG=告警组相关操作
WORKER_GROUP_TAG=Worker分组管理
SAVE_WORKER_GROUP_NOTES=创建Worker分组\
SAVE_WORKER_GROUP_NOTES=创建Worker分组
ALERT_PLUGIN_INSTANCE_TAG=告警插件实例相关操作
WORKER_GROUP_NAME=Worker分组名称
WORKER_IP_LIST=Worker ip列表,注意:多个IP地址以逗号分割\
WORKER_IP_LIST=Worker ip列表,注意:多个IP地址以逗号分割
QUERY_WORKER_GROUP_PAGING_NOTES=Worker分组管理
QUERY_WORKER_GROUP_LIST_NOTES=查询worker group分组
DELETE_WORKER_GROUP_BY_ID_NOTES=删除worker group通过ID
DELETE_WORKER_GROUP_BY_ID_NOTES=通过ID删除worker group
DATA_ANALYSIS_TAG=任务状态分析相关操作
COUNT_TASK_STATE_NOTES=任务状态统计
COUNT_PROCESS_INSTANCE_NOTES=统计流程实例状态
......@@ -72,6 +78,7 @@ DATA_SOURCE_HOST=IP主机名
DATA_SOURCE_PORT=数据源端口
DATABASE_NAME=数据库名
QUEUE_TAG=队列相关操作
QUERY_TOPN_LONGEST_RUNNING_PROCESS_INSTANCE_NOTES=查询topN最长运行流程实例
QUERY_QUEUE_LIST_NOTES=查询队列列表
QUERY_QUEUE_LIST_PAGING_NOTES=分页查询队列列表
CREATE_QUEUE_NOTES=创建队列
......@@ -84,14 +91,18 @@ UPDATE_TENANT_NOTES=更新租户
DELETE_TENANT_NOTES=删除租户
RESOURCES_TAG=资源中心相关操作
CREATE_RESOURCE_NOTES=创建资源
RESOURCE_FULL_NAME=资源全名
RESOURCE_TYPE=资源文件类型
RESOURCE_NAME=资源文件名称
RESOURCE_DESC=资源文件描述
RESOURCE_FILE=资源文件
RESOURCE_ID=资源ID
QUERY_RESOURCE_LIST_NOTES=查询资源列表
DELETE_RESOURCE_BY_ID_NOTES=删除资源通过ID
VIEW_RESOURCE_BY_ID_NOTES=浏览资源通通过ID
QUERY_BY_RESOURCE_NAME=通过资源名称查询
QUERY_UDF_FUNC_LIST_NOTES=查询UDF函数列表
VERIFY_RESOURCE_NAME_NOTES=验证资源名称
DELETE_RESOURCE_BY_ID_NOTES=通过ID删除资源
VIEW_RESOURCE_BY_ID_NOTES=通过ID浏览资源
ONLINE_CREATE_RESOURCE_NOTES=在线创建资源
SUFFIX=资源文件后缀
CONTENT=资源文件内容
......@@ -131,7 +142,7 @@ PROJECT_ID=项目ID
QUERY_PROJECT_BY_ID_NOTES=通过项目ID查询项目信息
QUERY_PROJECT_LIST_PAGING_NOTES=分页查询项目列表
QUERY_ALL_PROJECT_LIST_NOTES=查询所有项目
DELETE_PROJECT_BY_ID_NOTES=删除项目通过ID
DELETE_PROJECT_BY_ID_NOTES=通过ID删除项目
QUERY_UNAUTHORIZED_PROJECT_NOTES=查询未授权的项目
QUERY_AUTHORIZED_PROJECT_NOTES=查询授权项目
TASK_RECORD_TAG=任务记录相关操作
......@@ -156,6 +167,9 @@ LOGIN_TAG=用户登录相关操作
USER_NAME=用户名
PROJECT_NAME=项目名称
CREATE_PROCESS_DEFINITION_NOTES=创建流程定义
PROCESS_INSTANCE_START_TIME=流程实例启动时间
PROCESS_INSTANCE_END_TIME=流程实例结束时间
PROCESS_INSTANCE_SIZE=流程实例个数
PROCESS_DEFINITION_NAME=流程定义名称
PROCESS_DEFINITION_JSON=流程定义详细信息(json格式)
PROCESS_DEFINITION_LOCATIONS=流程定义节点坐标位置信息(json格式)
......@@ -173,7 +187,7 @@ LOGIN_NOTES=用户登录
UPDATE_PROCESS_DEFINITION_NOTES=更新流程定义
PROCESS_DEFINITION_ID=流程定义ID
RELEASE_PROCESS_DEFINITION_NOTES=发布流程定义
QUERY_PROCESS_DEFINITION_BY_ID_NOTES=查询流程定义通过流程定义ID
QUERY_PROCESS_DEFINITION_BY_ID_NOTES=通过流程定义ID查询流程定义
QUERY_PROCESS_DEFINITION_LIST_NOTES=查询流程定义列表
QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=分页查询流程定义列表
QUERY_ALL_DEFINITION_LIST_NOTES=查询所有流程定义
......@@ -181,21 +195,43 @@ PAGE_NO=页码号
PROCESS_INSTANCE_ID=流程实例ID
PROCESS_INSTANCE_IDS=流程实例ID集合
PROCESS_INSTANCE_JSON=流程实例信息(json格式)
PREVIEW_SCHEDULE_NOTES=定时调度预览
SCHEDULE_TIME=定时时间
SYNC_DEFINE=更新流程实例的信息是否同步到流程定义
RECOVERY_PROCESS_INSTANCE_FLAG=是否恢复流程实例
SEARCH_VAL=搜索值
FORCE_TASK_SUCCESS=强制TASK成功
QUERY_TASK_INSTANCE_LIST_PAGING_NOTES=分页查询任务实例列表
PROCESS_INSTANCE_NAME=流程实例名称
TASK_INSTANCE_ID=任务实例ID
VERIFY_TENANT_CODE_NOTES=验证租户
QUERY_UI_PLUGIN_DETAIL_BY_ID=通过ID查询UI插件详情
QUERY_UI_PLUGINS_BY_TYPE=通过类型查询UI插件
ACTIVATE_USER_NOTES=激活用户
BATCH_ACTIVATE_USER_NOTES=批量激活用户
REPEAT_PASSWORD=重复密码
REGISTER_USER_NOTES=用户注册
STATE=状态
USER_NAMES=多个用户名
PLUGIN_ID=插件ID
USER_ID=用户ID
PAGE_SIZE=页大小
LIMIT=显示多少条
UDF_ID=udf ID
AUTHORIZE_RESOURCE_TREE_NOTES=授权资源树
RESOURCE_CURRENTDIR=当前资源目录
RESOURCE_PID=资源父目录ID
QUERY_RESOURCE_LIST_PAGING_NOTES=分页查询资源列表
VIEW_TREE_NOTES=树状图
GET_NODE_LIST_BY_DEFINITION_ID_NOTES=获得任务节点列表通过流程定义ID
IMPORT_PROCESS_DEFINITION_NOTES=导入流程定义
GET_NODE_LIST_BY_DEFINITION_ID_NOTES=通过流程定义ID获得任务节点列表
PROCESS_DEFINITION_ID_LIST=流程定义id列表
QUERY_PROCESS_DEFINITION_All_BY_PROJECT_ID_NOTES=查询流程定义通过项目ID
BATCH_DELETE_PROCESS_DEFINITION_BY_IDS_NOTES=批量删除流程定义通过流程定义ID集合
DELETE_PROCESS_DEFINITION_BY_ID_NOTES=删除流程定义通过流程定义ID
QUERY_PROCESS_INSTANCE_BY_ID_NOTES=查询流程实例通过流程实例ID
DELETE_PROCESS_INSTANCE_BY_ID_NOTES=删除流程实例通过流程实例ID
QUERY_PROCESS_DEFINITION_All_BY_PROJECT_ID_NOTES=通过项目ID查询流程定义
BATCH_DELETE_PROCESS_DEFINITION_BY_IDS_NOTES=通过流程定义ID集合批量删除流程定义
BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_NOTES=通过流程实例ID集合批量删除流程实例
DELETE_PROCESS_DEFINITION_BY_ID_NOTES=通过流程定义ID删除流程定义
QUERY_PROCESS_INSTANCE_BY_ID_NOTES=通过流程实例ID查询流程实例
DELETE_PROCESS_INSTANCE_BY_ID_NOTES=通过流程实例ID删除流程实例
TASK_ID=任务实例ID
SKIP_LINE_NUM=忽略行数
QUERY_TASK_INSTANCE_LOG_NOTES=查询任务实例日志
......@@ -204,18 +240,26 @@ USERS_TAG=用户相关操作
SCHEDULER_TAG=定时相关操作
CREATE_SCHEDULE_NOTES=创建定时
CREATE_USER_NOTES=创建用户
CREATE_WORKER_GROUP_NOTES=创建Worker分组
WORKER_ADDR_LIST=worker地址列表
QUERY_WORKER_ADDRESS_LIST_NOTES=查询worker地址列表
QUERY_WORKFLOW_LINEAGE_BY_IDS_NOTES=通过IDs查询工作流血缘列表
QUERY_WORKFLOW_LINEAGE_BY_NAME_NOTES=通过名称查询工作流血缘列表
TENANT_ID=租户ID
QUEUE=使用的队列
EMAIL=邮箱
PHONE=手机号
QUERY_USER_LIST_NOTES=查询用户列表
UPDATE_USER_NOTES=更新用户
UPDATE_QUEUE_NOTES=更新队列
DELETE_USER_BY_ID_NOTES=删除用户通过ID
GRANT_PROJECT_NOTES=授权项目
PROJECT_IDS=项目IDS(字符串格式,多个项目以","分割)
GRANT_RESOURCE_NOTES=授权资源文件
RESOURCE_IDS=资源ID列表(字符串格式,多个资源ID以","分割)
GET_USER_INFO_NOTES=获取用户信息
GET_NODE_LIST_BY_DEFINITION_CODE_NOTES=通过流程定义编码查询节点列表
QUERY_PROCESS_DEFINITION_BY_NAME_NOTES=通过名称查询流程定义
LIST_USER_NOTES=用户列表
VERIFY_USER_NAME_NOTES=验证用户名
UNAUTHORIZED_USER_NOTES=取消授权
......@@ -225,11 +269,11 @@ GRANT_UDF_FUNC_NOTES=授权udf函数
UDF_IDS=udf函数id列表(字符串格式,多个udf函数ID以","分割)
GRANT_DATASOURCE_NOTES=授权数据源
DATASOURCE_IDS=数据源ID列表(字符串格式,多个数据源ID以","分割)
QUERY_SUBPROCESS_INSTANCE_BY_TASK_ID_NOTES=查询子流程实例通过任务实例ID
QUERY_PARENT_PROCESS_INSTANCE_BY_SUB_PROCESS_INSTANCE_ID_NOTES=查询父流程实例信息通过子流程实例ID
QUERY_SUBPROCESS_INSTANCE_BY_TASK_ID_NOTES=通过任务实例ID查询子流程实例
QUERY_PARENT_PROCESS_INSTANCE_BY_SUB_PROCESS_INSTANCE_ID_NOTES=通过子流程实例ID查询父流程实例信息
QUERY_PROCESS_INSTANCE_GLOBAL_VARIABLES_AND_LOCAL_VARIABLES_NOTES=查询流程实例全局变量和局部变量
VIEW_GANTT_NOTES=浏览Gantt图
SUB_PROCESS_INSTANCE_ID=子流程是咧ID
SUB_PROCESS_INSTANCE_ID=子流程实例ID
TASK_NAME=任务实例名
TASK_INSTANCE_TAG=任务实例相关操作
LOGGER_TAG=日志相关操作
......@@ -239,10 +283,25 @@ HOST=运行任务的主机IP地址
START_DATE=开始时间
END_DATE=结束时间
QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_NOTES=通过流程实例ID查询任务列表
DELETE_ALERT_PLUGIN_INSTANCE_NOTES=删除告警插件实例
CREATE_ALERT_PLUGIN_INSTANCE_NOTES=创建告警插件实例
GET_ALERT_PLUGIN_INSTANCE_NOTES=查询告警插件实例
QUERY_ALERT_PLUGIN_INSTANCE_LIST_PAGING_NOTES=分页查询告警实例列表
QUERY_ALL_ALERT_PLUGIN_INSTANCE_NOTES=查询所有告警实例列表
UPDATE_ALERT_PLUGIN_INSTANCE_NOTES=更新告警插件实例
ALERT_PLUGIN_INSTANCE_NAME=告警插件实例名称
ALERT_PLUGIN_DEFINE_ID=告警插件定义ID
ALERT_PLUGIN_ID=告警插件ID
ALERT_PLUGIN_INSTANCE_ID=告警插件实例ID
ALERT_PLUGIN_INSTANCE_PARAMS=告警插件实例参数
ALERT_INSTANCE_NAME=告警插件名称
VERIFY_ALERT_INSTANCE_NAME_NOTES=验证告警插件名称
UPDATE_DATA_SOURCE_NOTES=更新数据源
DATA_SOURCE_PARAM=数据源参数
DATA_SOURCE_ID=数据源ID
CREATE_ALERT_GROUP_NOTES=创建告警组
QUERY_DATA_SOURCE_NOTES=查询数据源通过ID
QUERY_DATA_SOURCE_LIST_BY_TYPE_NOTES=查询数据源列表通过数据源类型
QUERY_DATA_SOURCE_LIST_BY_TYPE_NOTES=通过数据源类型查询数据源列表
QUERY_DATA_SOURCE_LIST_PAGING_NOTES=分页查询数据源列表
CONNECT_DATA_SOURCE_NOTES=连接数据源
CONNECT_DATA_SOURCE_TEST_NOTES=连接数据源测试
......@@ -254,12 +313,12 @@ DELETE_SCHEDULER_BY_ID_NOTES=根据定时id删除定时数据
QUERY_ALERT_GROUP_LIST_PAGING_NOTES=分页查询告警组列表
EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=通过工作流ID导出工作流定义
BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES=批量导出工作流定义
QUERY_USER_CREATED_PROJECT_NOTES= 查询用户创建的项目
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_NOTES= 查询授权和用户创建的项目
COPY_PROCESS_DEFINITION_NOTES= 复制工作流定义
MOVE_PROCESS_DEFINITION_NOTES= 移动工作流定义
TARGET_PROJECT_ID= 目标项目ID
IS_COPY = 是否复制
QUERY_USER_CREATED_PROJECT_NOTES=查询用户创建的项目
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_NOTES=查询授权和用户创建的项目
COPY_PROCESS_DEFINITION_NOTES=复制工作流定义
MOVE_PROCESS_DEFINITION_NOTES=移动工作流定义
TARGET_PROJECT_ID=目标项目ID
IS_COPY=是否复制
DELETE_PROCESS_DEFINITION_VERSION_NOTES=删除流程历史版本
QUERY_PROCESS_DEFINITION_VERSIONS_NOTES=查询流程历史版本信息
SWITCH_PROCESS_DEFINITION_VERSION_NOTES=切换流程版本
......
......@@ -17,27 +17,31 @@
package org.apache.dolphinscheduler.api.controller;
import static org.mockito.Mockito.doNothing;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.SessionService;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
......@@ -46,8 +50,11 @@ import org.springframework.web.context.WebApplicationContext;
/**
* abstract controller test
*/
@RunWith(SpringRunner.class)
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(SpringRunner.class)
@SpringBootTest(classes = ApiApplicationServer.class)
@PrepareForTest({ RegistryCenterUtils.class, RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class AbstractControllerTest {
public static final String SESSION_ID = "sessionId";
......@@ -64,12 +71,11 @@ public class AbstractControllerTest {
protected String sessionId;
@MockBean
RegistryClient registryClient;
@Before
public void setUp() {
doNothing().when(registryClient).init();
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
PowerMockito.mockStatic(RegistryCenterUtils.class);
mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
createSession();
......
......@@ -56,7 +56,6 @@ public class MonitorControllerTest extends AbstractControllerTest {
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testListWorker() throws Exception {
......@@ -74,7 +73,6 @@ public class MonitorControllerTest extends AbstractControllerTest {
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testQueryDatabaseState() throws Exception {
MvcResult mvcResult = mockMvc.perform(get("/monitor/database")
......@@ -91,7 +89,6 @@ public class MonitorControllerTest extends AbstractControllerTest {
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testQueryZookeeperState() throws Exception {
MvcResult mvcResult = mockMvc.perform(get("/monitor/zookeeper/list")
......
......@@ -22,14 +22,25 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
......@@ -38,15 +49,26 @@ import org.springframework.util.MultiValueMap;
/**
* worker group controller test
*/
public class WorkerGroupControllerTest extends AbstractControllerTest{
public class WorkerGroupControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(WorkerGroupControllerTest.class);
@MockBean
private WorkerGroupMapper workerGroupMapper;
@MockBean
private ProcessInstanceMapper processInstanceMapper;
@Test
public void testSaveWorkerGroup() throws Exception {
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("192.168.0.1", "192.168.0.1");
serverMaps.put("192.168.0.2", "192.168.0.2");
PowerMockito.when(RegistryCenterUtils.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","cxc_work_group");
paramsMap.add("ipList","192.16.12,192.168,10,12");
paramsMap.add("addrList","192.168.0.1,192.168.0.2");
MvcResult mvcResult = mockMvc.perform(post("/worker-group/save")
.header("sessionId", sessionId)
.params(paramsMap))
......@@ -54,7 +76,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest{
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
Assert.assertTrue(result != null && result.isSuccess());
logger.info(mvcResult.getResponse().getContentAsString());
}
......@@ -71,7 +93,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest{
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
Assert.assertTrue(result != null && result.isSuccess());
logger.info(mvcResult.getResponse().getContentAsString());
}
......@@ -85,22 +107,30 @@ public class WorkerGroupControllerTest extends AbstractControllerTest{
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
Assert.assertTrue(result != null && result.isSuccess());
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testDeleteById() throws Exception {
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setId(12);
workerGroup.setName("测试");
Mockito.when(workerGroupMapper.selectById(12)).thenReturn(workerGroup);
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus("测试", Constants.NOT_TERMINATED_STATES)).thenReturn(null);
Mockito.when(workerGroupMapper.deleteById(12)).thenReturn(1);
Mockito.when(processInstanceMapper.updateProcessInstanceByWorkerGroupName("测试", "")).thenReturn(1);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id","12");
MvcResult mvcResult = mockMvc.perform(get("/worker-group/delete-by-id")
MvcResult mvcResult = mockMvc.perform(post("/worker-group/delete-by-id")
.header("sessionId", sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
Assert.assertTrue(result != null && result.isSuccess());
logger.info(mvcResult.getResponse().getContentAsString());
}
}
......@@ -26,23 +26,30 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* worker group service test
*/
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class WorkerGroupServiceTest {
......@@ -69,13 +76,13 @@ public class WorkerGroupServiceTest {
List<String> workerGroupStrList = new ArrayList<>();
workerGroupStrList.add("default");
workerGroupStrList.add("test");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath)).thenReturn(workerGroupStrList);
List<String> defaultAddressList = new ArrayList<>();
defaultAddressList.add("192.168.220.188:1234");
defaultAddressList.add("192.168.220.189:1234");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultAddressList);
Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath + "/default")).thenReturn(defaultAddressList);
Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238");
}
......@@ -119,6 +126,12 @@ public class WorkerGroupServiceTest {
PageInfo<WorkerGroup> pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
Assert.assertEquals(pageInfo.getLists().size(), 1);
}*/
@Before
public void before() {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
}
@Test
public void testQueryAllGroup() {
Map<String, Object> result = workerGroupService.queryAllGroup();
......
......@@ -14,35 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils;
import org.apache.dolphinscheduler.common.model.Server;
import java.util.List;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
/**
* zookeeper monitor utils test
*/
@Ignore
public class RegistryMonitorUtilsTest {
public class RegistryCenterUtilsTest {
@Test
public void testGetMasterList(){
RegistryMonitor registryMonitor = new RegistryMonitor();
List<Server> masterServerList = registryMonitor.getMasterServers();
List<Server> workerServerList = registryMonitor.getWorkerServers();
List<Server> masterServerList = RegistryCenterUtils.getMasterServers();
List<Server> workerServerList = RegistryCenterUtils.getWorkerServers();
Assert.assertTrue(masterServerList.size() >= 0);
Assert.assertTrue(workerServerList.size() >= 0);
}
}
\ No newline at end of file
}
......@@ -660,6 +660,16 @@ public final class Constants {
*/
public static final String PARAMETER_BUSINESS_DATE = "system.biz.date";
/**
* the absolute path of current executing task
*/
public static final String PARAMETER_TASK_EXECUTE_PATH = "system.task.execute.path";
/**
* the instance id of current task
*/
public static final String PARAMETER_TASK_INSTANCE_ID = "system.task.instance.id";
/**
* ACCEPTED
*/
......@@ -872,8 +882,8 @@ public final class Constants {
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.WAITTING_THREAD.ordinal(),
ExecutionStatus.WAITTING_DEPEND.ordinal()
ExecutionStatus.WAITING_THREAD.ordinal(),
ExecutionStatus.WAITING_DEPEND.ordinal()
};
/**
......
......@@ -50,7 +50,7 @@ public enum CommandType {
REPEAT_RUNNING(7, "repeat running a process"),
PAUSE(8, "pause a process"),
STOP(9, "stop a process"),
RECOVER_WAITTING_THREAD(10, "recover waiting thread");
RECOVER_WAITING_THREAD(10, "recover waiting thread");
CommandType(int code, String descp){
this.code = code;
......
......@@ -53,8 +53,8 @@ public enum ExecutionStatus {
SUCCESS(7, "success"),
NEED_FAULT_TOLERANCE(8, "need fault tolerance"),
KILL(9, "kill"),
WAITTING_THREAD(10, "waiting thread"),
WAITTING_DEPEND(11, "waiting depend node complete"),
WAITING_THREAD(10, "waiting thread"),
WAITING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution"),
FORCED_SUCCESS(13, "forced success");
......@@ -109,7 +109,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsWaitingThread() {
return this == WAITTING_THREAD;
return this == WAITING_THREAD;
}
/**
......@@ -136,7 +136,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsRunning() {
return this == RUNNING_EXECUTION || this == WAITTING_DEPEND || this == DELAY_EXECUTION;
return this == RUNNING_EXECUTION || this == WAITING_DEPEND || this == DELAY_EXECUTION;
}
/**
......
......@@ -26,6 +26,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
import java.util.Objects;
import java.util.TimeZone;
import org.slf4j.Logger;
......@@ -36,12 +37,44 @@ import org.slf4j.LoggerFactory;
*/
public class DateUtils {
static final long C0 = 1L;
static final long C1 = C0 * 1000L;
static final long C2 = C1 * 1000L;
static final long C3 = C2 * 1000L;
static final long C4 = C3 * 60L;
static final long C5 = C4 * 60L;
static final long C6 = C5 * 24L;
/**
* a default datetime formatter for the timestamp
*/
private static final DateTimeFormatter DEFAULT_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Logger logger = LoggerFactory.getLogger(DateUtils.class);
private DateUtils() {
throw new UnsupportedOperationException("Construct DateUtils");
}
/**
* @param timeMillis timeMillis like System.currentTimeMillis()
* @return string formatted as yyyy-MM-dd HH:mm:ss
*/
public static String formatTimeStamp(long timeMillis) {
return formatTimeStamp(timeMillis, DEFAULT_DATETIME_FORMATTER);
}
/**
* @param timeMillis timeMillis like System.currentTimeMillis()
* @param dateTimeFormatter expect formatter, like yyyy-MM-dd HH:mm:ss
* @return formatted string
*/
public static String formatTimeStamp(long timeMillis, DateTimeFormatter dateTimeFormatter) {
Objects.requireNonNull(dateTimeFormatter);
return dateTimeFormatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeMillis),
ZoneId.systemDefault()));
}
/**
* date to local datetime
*
......@@ -253,7 +286,6 @@ public class DateUtils {
}
/**
*
* format time to duration
*
* @param d1 d1
......@@ -522,14 +554,6 @@ public class DateUtils {
return TimeZone.getTimeZone(timezoneId);
}
static final long C0 = 1L;
static final long C1 = C0 * 1000L;
static final long C2 = C1 * 1000L;
static final long C3 = C2 * 1000L;
static final long C4 = C3 * 60L;
static final long C5 = C4 * 60L;
static final long C6 = C5 * 24L;
/**
* Time unit representing one thousandth of a second
*/
......@@ -543,23 +567,23 @@ public class DateUtils {
return d / (C4 / C2);
}
public static long toHours(long d) {
public static long toHours(long d) {
return d / (C5 / C2);
}
public static long toDays(long d) {
public static long toDays(long d) {
return d / (C6 / C2);
}
public static long toDurationSeconds(long d) {
public static long toDurationSeconds(long d) {
return (d % (C4 / C2)) / (C3 / C2);
}
public static long toDurationMinutes(long d) {
public static long toDurationMinutes(long d) {
return (d % (C5 / C2)) / (C4 / C2);
}
public static long toDurationHours(long d) {
public static long toDurationHours(long d) {
return (d % (C6 / C2)) / (C5 / C2);
}
......
......@@ -31,12 +31,12 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
......
......@@ -144,4 +144,5 @@ public class LoggerUtils {
, String info) {
optionalLogger.ifPresent((Logger logger) -> logger.info(info));
}
}
\ No newline at end of file
......@@ -26,7 +26,7 @@ public class ExecutionStatusTest extends TestCase {
public void testTypeIsRunning() {
assertTrue(ExecutionStatus.RUNNING_EXECUTION.typeIsRunning());
assertTrue(ExecutionStatus.WAITTING_DEPEND.typeIsRunning());
assertTrue(ExecutionStatus.WAITING_DEPEND.typeIsRunning());
assertTrue(ExecutionStatus.DELAY_EXECUTION.typeIsRunning());
}
}
\ No newline at end of file
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.TimeZone;
......@@ -41,6 +42,20 @@ public class DateUtilsTest {
Assert.assertEquals("01 09:23:08", readableDate);
}
@Test
public void testConvertTimeStampsToString() {
TimeZone defaultTimeZone = TimeZone.getDefault();
final TimeZone timeZone = TimeZone.getTimeZone("Asia/Shanghai");
TimeZone.setDefault(timeZone);
long timeMillis = 1625989249021L;
Assert.assertEquals("2021-07-11 15:40:49", DateUtils.formatTimeStamp(timeMillis));
DateTimeFormatter testFormatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
Assert.assertEquals("2021/07/11 15:40:49", DateUtils.formatTimeStamp(timeMillis, testFormatter));
TimeZone.setDefault(defaultTimeZone);
}
@Test
public void testWeek() {
Date curr = DateUtils.stringToDate("2019-02-01 00:00:00");
......
......@@ -80,6 +80,13 @@ public class AlertPluginInstance {
this.instanceName = instanceName;
}
public AlertPluginInstance(int id, String pluginInstanceParams, String instanceName, Date updateDate) {
this.id = id;
this.pluginInstanceParams = pluginInstanceParams;
this.updateTime = updateDate;
this.instanceName = instanceName;
}
public int getId() {
return id;
}
......
......@@ -627,4 +627,8 @@ public class TaskInstance implements Serializable {
public void setTaskParams(String taskParams) {
this.taskParams = taskParams;
}
public boolean isFirstRun() {
return endTime == null;
}
}
......@@ -49,4 +49,4 @@
where instance_name = #{instanceName} limit 1
</select>
</mapper>
\ No newline at end of file
</mapper>
......@@ -116,7 +116,7 @@
<if test="executorId != 0">
and instance.executor_id = #{executorId}
</if>
order by instance.start_time desc
order by instance.start_time desc,instance.end_time desc
</select>
<update id="setFailoverByHostAndStateArray">
update t_ds_process_instance
......
......@@ -61,7 +61,7 @@
<include refid="baseSqlV2">
<property name="alias" value="u"/>
</include>
,
,t.tenant_code,
case when u.queue <![CDATA[ <> ]]> '' then u.queue else q.queue_name end as queue_name
from t_ds_user u
left join t_ds_tenant t on u.tenant_id=t.id
......
......@@ -76,7 +76,7 @@ public class MasterRegistryClient {
*/
@Autowired
private ProcessService processService;
@Autowired
private RegistryClient registryClient;
/**
......@@ -327,8 +327,8 @@ public class MasterRegistryClient {
@PostConstruct
public void init() {
this.startTime = DateUtils.dateToString(new Date());
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
registryClient.init();
}
/**
......
......@@ -22,11 +22,10 @@ import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHED
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
import org.apache.dolphinscheduler.spi.register.SubscribeListener;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,8 +33,12 @@ public class MasterRegistryDataListener implements SubscribeListener {
private static final Logger logger = LoggerFactory.getLogger(MasterRegistryDataListener.class);
@Resource
MasterRegistryClient masterRegistryClient;
private MasterRegistryClient masterRegistryClient;
public MasterRegistryDataListener() {
masterRegistryClient = SpringApplicationContext.getBean(MasterRegistryClient.class);
}
@Override
public void notify(String path, DataChangeEvent event) {
......
......@@ -100,8 +100,7 @@ public class ServerNodeManager implements InitializingBean {
/**
* zk client
*/
@Autowired
private RegistryClient registryClient;
private RegistryClient registryClient = RegistryClient.getInstance();
/**
* worker group mapper
......@@ -378,7 +377,6 @@ public class ServerNodeManager implements InitializingBean {
@PreDestroy
public void destroy() {
executorService.shutdownNow();
registryClient.close();
}
}
......@@ -520,9 +520,6 @@ public class MasterExecThread implements Runnable {
taskInstance.setDelayTime(taskNode.getDelayTime());
}
//get pre task ,get all the task varPool to this task
Set<String> preTask = dag.getPreviousNodes(taskInstance.getName());
getPreVarPool(taskInstance, preTask);
return taskInstance;
}
......@@ -691,7 +688,7 @@ public class MasterExecThread implements Runnable {
private ExecutionStatus runningState(ExecutionStatus state) {
if (state == ExecutionStatus.READY_STOP
|| state == ExecutionStatus.READY_PAUSE
|| state == ExecutionStatus.WAITTING_THREAD
|| state == ExecutionStatus.WAITING_THREAD
|| state == ExecutionStatus.DELAY_EXECUTION) {
// if the running task is not completed, the state remains unchanged
return state;
......@@ -739,7 +736,7 @@ public class MasterExecThread implements Runnable {
* @return Boolean whether has waiting thread task
*/
private boolean hasWaitingThreadTask() {
List<TaskInstance> waitingList = getCompleteTaskByState(ExecutionStatus.WAITTING_THREAD);
List<TaskInstance> waitingList = getCompleteTaskByState(ExecutionStatus.WAITING_THREAD);
return CollectionUtils.isNotEmpty(waitingList);
}
......@@ -786,7 +783,7 @@ public class MasterExecThread implements Runnable {
// waiting thread
if (hasWaitingThreadTask()) {
return ExecutionStatus.WAITTING_THREAD;
return ExecutionStatus.WAITING_THREAD;
}
// pause
......@@ -1154,6 +1151,12 @@ public class MasterExecThread implements Runnable {
continue;
}
}
//init varPool only this task is the first time running
if (task.isFirstRun()) {
//get pre task ,get all the task varPool to this task
Set<String> preTask = dag.getPreviousNodes(task.getName());
getPreVarPool(task, preTask);
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
if (retryTaskIntervalOverTime(task)) {
......
......@@ -66,7 +66,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
super(taskInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
this.registryClient = SpringApplicationContext.getBean(RegistryClient.class);
this.registryClient = RegistryClient.getInstance();
}
/**
......
......@@ -23,9 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* zk monitor server impl
*/
......@@ -35,9 +35,7 @@ public class RegistryMonitorImpl extends AbstractMonitor {
/**
* zookeeper operator
*/
@Autowired
private RegistryClient registryClient;
private RegistryClient registryClient = RegistryClient.getInstance();
/**
* get active nodes map by path
......
......@@ -19,15 +19,13 @@ package org.apache.dolphinscheduler.server.registry;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
import java.util.Date;
import java.util.Set;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Date;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -46,9 +44,6 @@ public class HeartBeatTask implements Runnable {
private String serverType;
private RegistryClient registryClient;
// server stop or not
protected IStoppable stoppable = null;
public HeartBeatTask(String startTime,
double maxCpuloadAvg,
double reservedMemory,
......@@ -124,12 +119,4 @@ public class HeartBeatTask implements Runnable {
}
}
/**
* for stop server
*
* @param serverStoppable server stoppable interface
*/
public void setStoppable(IStoppable serverStoppable) {
this.stoppable = serverStoppable;
}
}
......@@ -231,7 +231,7 @@ public class DependentExecute {
if (state.typeIsRunning()
|| state == ExecutionStatus.SUBMITTED_SUCCESS
|| state == ExecutionStatus.WAITTING_THREAD) {
|| state == ExecutionStatus.WAITING_THREAD) {
return DependResult.WAITING;
} else {
return DependResult.FAILED;
......
......@@ -17,13 +17,19 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.logging.log4j.util.Strings;
import java.util.Date;
import java.util.HashMap;
......@@ -95,6 +101,67 @@ public class ParamUtils {
return globalParams;
}
/**
* parameter conversion
* @param taskExecutionContext the context of this task instance
* @param parameters the parameters
* @return global params
*/
public static Map<String,Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) {
Preconditions.checkNotNull(taskExecutionContext);
Preconditions.checkNotNull(parameters);
Map<String,Property> globalParams = getUserDefParamsMap(taskExecutionContext.getDefinedParams());
Map<String,String> globalParamsMap = taskExecutionContext.getDefinedParams();
CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
Date scheduleTime = taskExecutionContext.getScheduleTime();
Map<String,Property> localParams = parameters.getLocalParametersMap();
if (globalParams == null && localParams == null) {
return null;
}
// if it is a complement,
// you need to pass in the task instance id to locate the time
// of the process instance complement
Map<String,String> params = BusinessTimeUtils
.getBusinessTime(commandType,
scheduleTime);
if (globalParamsMap != null) {
params.putAll(globalParamsMap);
}
if (Strings.isNotBlank(taskExecutionContext.getExecutePath())) {
params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath());
}
params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId()));
if (globalParams != null && localParams != null) {
globalParams.putAll(localParams);
} else if (globalParams == null && localParams != null) {
globalParams = localParams;
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
Property property = en.getValue();
if (StringUtils.isNotEmpty(property.getValue())
&& property.getValue().startsWith("$")) {
/**
* local parameter refers to global parameter with the same name
* note: the global parameters of the process instance here are solidified parameters,
* and there are no variables in them.
*/
String val = property.getValue();
val = ParameterUtils.convertParameterPlaceholders(val, params);
property.setValue(val);
}
}
return globalParams;
}
/**
* format convert
* @param paramsMap params map
......
......@@ -18,10 +18,8 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
......@@ -34,21 +32,17 @@ public class RemoveZKNode implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(RemoveZKNode.class);
/**
* zookeeper operator
*/
@Autowired
private RegistryClient registryClient;
private RegistryClient registryClient = RegistryClient.getInstance();
public static void main(String[] args) {
new SpringApplicationBuilder(RemoveZKNode.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
if (args.length != ARGS_LENGTH) {
logger.error("Usage: <node>");
return;
......@@ -56,6 +50,5 @@ public class RemoveZKNode implements CommandLineRunner {
registryClient.remove(args[0]);
registryClient.close();
}
}
......@@ -19,6 +19,9 @@ package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
......@@ -28,13 +31,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import io.netty.channel.Channel;
......@@ -60,16 +58,15 @@ public class TaskCallbackService {
/**
* zookeeper registry center
*/
@Autowired
private RegistryClient registryClient;
/**
* netty remoting client
*/
private final NettyRemotingClient nettyRemotingClient;
public TaskCallbackService() {
this.registryClient = RegistryClient.getInstance();
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
......@@ -134,7 +131,7 @@ public class TaskCallbackService {
ThreadUtils.sleep(pause(ntries++));
}
throw new IllegalStateException(String.format("all available master nodes : %s are not reachable for task: {}", masterNodes, taskInstanceId));
throw new IllegalStateException(String.format("all available master nodes : %s are not reachable for task: %s", masterNodes, taskInstanceId));
}
public int pause(int ntries) {
......
......@@ -67,8 +67,7 @@ public class WorkerRegistryClient {
*/
private ScheduledExecutorService heartBeatExecutor;
@Autowired
RegistryClient registryClient;
private RegistryClient registryClient;
/**
* worker start time
......@@ -81,8 +80,8 @@ public class WorkerRegistryClient {
public void initWorkRegistry() {
this.workerGroups = workerConfig.getWorkerGroups();
this.startTime = DateUtils.dateToString(new Date());
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
registryClient.init();
}
/**
......@@ -162,8 +161,4 @@ public class WorkerRegistryClient {
registryClient.setStoppable(stoppable);
}
public void closeRegistry() {
unRegistry();
}
}
......@@ -48,6 +48,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
......@@ -55,7 +56,6 @@ import java.util.regex.Pattern;
import org.slf4j.Logger;
/**
* abstract command executor
*/
......@@ -74,7 +74,7 @@ public abstract class AbstractCommandExecutor {
/**
* log handler
*/
protected Consumer<List<String>> logHandler;
protected Consumer<LinkedBlockingQueue<String>> logHandler;
/**
* logger
......@@ -82,9 +82,9 @@ public abstract class AbstractCommandExecutor {
protected Logger logger;
/**
* log list
* log collection
*/
protected final List<String> logBuffer;
protected final LinkedBlockingQueue<String> logBuffer;
protected boolean logOutputIsScuccess = false;
......@@ -98,20 +98,16 @@ public abstract class AbstractCommandExecutor {
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
public AbstractCommandExecutor(Consumer<List<String>> logHandler,
public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskExecutionContext,
Logger logger) {
this.logHandler = logHandler;
this.taskExecutionContext = taskExecutionContext;
this.logger = logger;
this.logBuffer = Collections.synchronizedList(new ArrayList<>());
this.logBuffer = new LinkedBlockingQueue<>();
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
protected AbstractCommandExecutor(List<String> logBuffer) {
this.logBuffer = logBuffer;
}
/**
* build process
*
......@@ -202,9 +198,6 @@ public abstract class AbstractCommandExecutor {
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
taskExecutionContext.getExecutePath(), processId, result.getExitStatusCode(), status, process.exitValue());
// if SHELL task exit
if (status) {
// set appIds
......@@ -224,6 +217,9 @@ public abstract class AbstractCommandExecutor {
ProcessUtils.kill(taskExecutionContext);
result.setExitStatusCode(EXIT_CODE_FAILURE);
}
logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
taskExecutionContext.getExecutePath(), processId, result.getExitStatusCode(), status, process.exitValue());
return result;
}
......@@ -232,7 +228,6 @@ public abstract class AbstractCommandExecutor {
return varPool.toString();
}
/**
* cancel application
*
......@@ -329,15 +324,14 @@ public abstract class AbstractCommandExecutor {
*/
private void clear() {
List<String> markerList = new ArrayList<>();
markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>();
markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
if (!logBuffer.isEmpty()) {
// log handle
logHandler.accept(logBuffer);
logBuffer.clear();
}
logHandler.accept(markerList);
logHandler.accept(markerLog);
}
/**
......@@ -349,9 +343,7 @@ public abstract class AbstractCommandExecutor {
String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId());
ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService");
getOutputLogService.submit(() -> {
BufferedReader inReader = null;
try {
inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
logBuffer.add("welcome to use bigdata scheduling system...");
while ((line = inReader.readLine()) != null) {
......@@ -366,7 +358,6 @@ public abstract class AbstractCommandExecutor {
logger.error(e.getMessage(), e);
} finally {
logOutputIsScuccess = true;
close(inReader);
}
});
getOutputLogService.shutdown();
......@@ -460,31 +451,20 @@ public abstract class AbstractCommandExecutor {
* @return line list
*/
private List<String> convertFile2List(String filename) {
List lineList = new ArrayList<String>(100);
List<String> lineList = new ArrayList<>(100);
File file = new File(filename);
if (!file.exists()) {
return lineList;
}
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8));
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8))) {
String line = null;
while ((line = br.readLine()) != null) {
lineList.add(line);
}
} catch (Exception e) {
logger.error(String.format("read file: %s failed : ", filename), e);
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
return lineList;
}
......@@ -556,27 +536,10 @@ public abstract class AbstractCommandExecutor {
lastFlushTime = now;
/** log handle */
logHandler.accept(logBuffer);
logBuffer.clear();
}
return lastFlushTime;
}
/**
* close buffer reader
*
* @param inReader in reader
*/
private void close(BufferedReader inReader) {
if (inReader != null) {
try {
inReader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
protected List<String> commandOptions() {
return Collections.emptyList();
}
......
......@@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
......@@ -116,15 +116,15 @@ public abstract class AbstractTask {
*
* @param logs log list
*/
public void logHandle(List<String> logs) {
public void logHandle(LinkedBlockingQueue<String> logs) {
// note that the "new line" is added here to facilitate log parsing
if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString());
} else {
// note: if the logs is a SynchronizedList and will be modified concurrently,
// we should must use foreach to iterate the element, otherwise will throw a ConcurrentModifiedException(#issue 5528)
StringJoiner joiner = new StringJoiner("\n\t");
logs.forEach(joiner::add);
while (!logs.isEmpty()) {
joiner.add(logs.poll());
}
logger.info(" -> {}", joiner);
}
}
......
......@@ -32,6 +32,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.regex.Pattern;
......@@ -60,7 +61,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public PythonCommandExecutor(Consumer<List<String>> logHandler,
public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskExecutionContext,
Logger logger) {
super(logHandler,taskExecutionContext,logger);
......
......@@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.slf4j.Logger;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
......@@ -27,8 +28,11 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
/**
* shell command executor
*/
......@@ -50,16 +54,12 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public ShellCommandExecutor(Consumer<List<String>> logHandler,
public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskExecutionContext,
Logger logger) {
super(logHandler,taskExecutionContext,logger);
}
public ShellCommandExecutor(List<String> logBuffer) {
super(logBuffer);
}
@Override
protected String buildCommandFilePath() {
// command file
......@@ -78,7 +78,6 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
return OSUtils.isWindows() ? CMD : SH;
}
/**
* create command file if not exists
* @param execCommand exec command
......@@ -117,6 +116,4 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
}
}
}
......@@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.http;
package org.apache.dolphinscheduler.server.worker.task.http;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.io.Charsets;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.HttpMethod;
......@@ -27,10 +25,16 @@ import org.apache.dolphinscheduler.common.process.HttpProperty;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.commons.io.Charsets;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
import org.apache.http.client.config.RequestConfig;
......@@ -42,7 +46,6 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
......@@ -50,28 +53,27 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* http task
*/
public class HttpTask extends AbstractTask {
/**
* http parameters
*/
private HttpParameters httpParameters;
/**
* application json
*/
protected static final String APPLICATION_JSON = "application/json";
/**
* output
*/
protected String output;
/**
* http parameters
*/
private HttpParameters httpParameters;
/**
* taskExecutionContext
*/
......@@ -79,8 +81,9 @@ public class HttpTask extends AbstractTask {
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public HttpTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
......@@ -103,27 +106,30 @@ public class HttpTask extends AbstractTask {
Thread.currentThread().setName(threadLoggerInfoName);
long startTime = System.currentTimeMillis();
String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
String statusCode = null;
String body = null;
try(CloseableHttpClient client = createHttpClient();
CloseableHttpResponse response = sendRequest(client)) {
try (CloseableHttpClient client = createHttpClient();
CloseableHttpResponse response = sendRequest(client)) {
statusCode = String.valueOf(getStatusCode(response));
body = getResponseBody(response);
exitStatusCode = validResponse(body, statusCode);
long costTime = System.currentTimeMillis() - startTime;
logger.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {}Millisecond, statusCode : {}, body : {}, log : {}",
DateUtils.format2Readable(startTime), httpParameters.getUrl(),httpParameters.getHttpMethod(), costTime, statusCode, body, output);
}catch (Exception e){
logger.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {} milliseconds, statusCode : {}, body : {}, log : {}",
formatTimeStamp, httpParameters.getUrl(),
httpParameters.getHttpMethod(), costTime, statusCode, body, output);
} catch (Exception e) {
appendMessage(e.toString());
exitStatusCode = -1;
logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e);
logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e);
throw e;
}
}
/**
* send request
*
* @param client client
* @return CloseableHttpResponse
* @throws IOException io exception
......@@ -139,23 +145,24 @@ public class HttpTask extends AbstractTask {
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
List<HttpProperty> httpPropertyList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(httpParameters.getHttpParams() )){
for (HttpProperty httpProperty: httpParameters.getHttpParams()) {
if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) {
for (HttpProperty httpProperty : httpParameters.getHttpParams()) {
String jsonObject = JSONUtils.toJsonString(httpProperty);
String params = ParameterUtils.convertParameterPlaceholders(jsonObject,ParamUtils.convert(paramsMap));
logger.info("http request params:{}",params);
httpPropertyList.add(JSONUtils.parseObject(params,HttpProperty.class));
String params = ParameterUtils.convertParameterPlaceholders(jsonObject, ParamUtils.convert(paramsMap));
logger.info("http request params:{}", params);
httpPropertyList.add(JSONUtils.parseObject(params, HttpProperty.class));
}
}
addRequestParams(builder,httpPropertyList);
String requestUrl = ParameterUtils.convertParameterPlaceholders(httpParameters.getUrl(),ParamUtils.convert(paramsMap));
addRequestParams(builder, httpPropertyList);
String requestUrl = ParameterUtils.convertParameterPlaceholders(httpParameters.getUrl(), ParamUtils.convert(paramsMap));
HttpUriRequest request = builder.setUri(requestUrl).build();
setHeaders(request,httpPropertyList);
setHeaders(request, httpPropertyList);
return client.execute(request);
}
/**
* get response body
*
* @param httpResponse http response
* @return response body
* @throws ParseException parse exception
......@@ -174,6 +181,7 @@ public class HttpTask extends AbstractTask {
/**
* get status code
*
* @param httpResponse http response
* @return status code
*/
......@@ -183,11 +191,12 @@ public class HttpTask extends AbstractTask {
/**
* valid response
* @param body body
* @param statusCode status code
*
* @param body body
* @param statusCode status code
* @return exit status code
*/
protected int validResponse(String body, String statusCode){
protected int validResponse(String body, String statusCode) {
int exitStatusCode = 0;
switch (httpParameters.getHttpCheckCondition()) {
case BODY_CONTAINS:
......@@ -226,6 +235,7 @@ public class HttpTask extends AbstractTask {
/**
* append message
*
* @param message message
*/
protected void appendMessage(String message) {
......@@ -239,17 +249,18 @@ public class HttpTask extends AbstractTask {
/**
* add request params
* @param builder buidler
* @param httpPropertyList http property list
*
* @param builder buidler
* @param httpPropertyList http property list
*/
protected void addRequestParams(RequestBuilder builder,List<HttpProperty> httpPropertyList) {
if(CollectionUtils.isNotEmpty(httpPropertyList)){
protected void addRequestParams(RequestBuilder builder, List<HttpProperty> httpPropertyList) {
if (CollectionUtils.isNotEmpty(httpPropertyList)) {
ObjectNode jsonParam = JSONUtils.createObjectNode();
for (HttpProperty property: httpPropertyList){
if(property.getHttpParametersType() != null){
if (property.getHttpParametersType().equals(HttpParametersType.PARAMETER)){
for (HttpProperty property : httpPropertyList) {
if (property.getHttpParametersType() != null) {
if (property.getHttpParametersType().equals(HttpParametersType.PARAMETER)) {
builder.addParameter(property.getProp(), property.getValue());
}else if(property.getHttpParametersType().equals(HttpParametersType.BODY)){
} else if (property.getHttpParametersType().equals(HttpParametersType.BODY)) {
jsonParam.put(property.getProp(), property.getValue());
}
}
......@@ -263,12 +274,13 @@ public class HttpTask extends AbstractTask {
/**
* set headers
* @param request request
* @param httpPropertyList http property list
*
* @param request request
* @param httpPropertyList http property list
*/
protected void setHeaders(HttpUriRequest request,List<HttpProperty> httpPropertyList) {
if(CollectionUtils.isNotEmpty(httpPropertyList)){
for (HttpProperty property: httpPropertyList) {
protected void setHeaders(HttpUriRequest request, List<HttpProperty> httpPropertyList) {
if (CollectionUtils.isNotEmpty(httpPropertyList)) {
for (HttpProperty property : httpPropertyList) {
if (HttpParametersType.HEADERS.equals(property.getHttpParametersType())) {
request.addHeader(property.getProp(), property.getValue());
}
......@@ -278,6 +290,7 @@ public class HttpTask extends AbstractTask {
/**
* create http client
*
* @return CloseableHttpClient
*/
protected CloseableHttpClient createHttpClient() {
......@@ -289,6 +302,7 @@ public class HttpTask extends AbstractTask {
/**
* request config
*
* @return RequestConfig
*/
private RequestConfig requestConfig() {
......@@ -297,6 +311,7 @@ public class HttpTask extends AbstractTask {
/**
* create request builder
*
* @return RequestBuilder
*/
protected RequestBuilder createRequestBuilder() {
......
......@@ -35,8 +35,6 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.slf4j.Logger;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
......@@ -51,6 +49,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
/**
* shell task
*/
......@@ -101,7 +101,8 @@ public class ShellTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
String command = buildCommand();
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
......@@ -165,12 +166,8 @@ public class ShellTask extends AbstractTask {
private String parseScript(String script) {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
shellParameters.getLocalParametersMap(),
shellParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,shellParameters);
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) {
if (paramsMap == null) {
......
......@@ -20,6 +20,10 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
......@@ -28,23 +32,24 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.test.util.ReflectionTestUtils;
/**
* MasterRegistryClientTest
*/
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class MasterRegistryClientTest {
@InjectMocks
......@@ -53,21 +58,25 @@ public class MasterRegistryClientTest {
@Mock
private MasterConfig masterConfig;
@Mock
private RegistryClient registryClient;
@Mock
private ScheduledExecutorService heartBeatExecutor;
@Mock
private ProcessService processService;
@Before
public void before() throws Exception {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
registryClient = PowerMockito.mock(RegistryClient.class);
given(registryClient.getLock(Mockito.anyString())).willReturn(true);
given(registryClient.getMasterFailoverLockPath()).willReturn("/path");
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
doNothing().when(registryClient).handleDeadServer(Mockito.anyString(), Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setHost("127.0.0.1:8080");
......@@ -96,7 +105,6 @@ public class MasterRegistryClientTest {
@Test
public void removeNodePathTest() {
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);
//Cannot mock static methods
......
......@@ -21,23 +21,24 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* server node manager test
*/
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class ServerNodeManagerTest {
@InjectMocks
ServerNodeManager serverNodeManager;
@Mock
private RegistryClient registryClient;
private ServerNodeManager serverNodeManager;
@Mock
private WorkerGroupMapper workerGroupMapper;
......@@ -45,6 +46,12 @@ public class ServerNodeManagerTest {
@Mock
private AlertDao alertDao;
@Before
public void before() {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
serverNodeManager = PowerMockito.mock(ServerNodeManager.class);
}
@Test
public void test(){
//serverNodeManager.getWorkerGroupNodes()
......
......@@ -20,22 +20,30 @@ package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.SerializationFeature;
/**
* Test ParamUtils
*/
......@@ -82,7 +90,6 @@ public class ParamUtilsTest {
varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}");
varPoolParams.put("varPool", varProperty);
}
/**
......@@ -90,7 +97,6 @@ public class ParamUtilsTest {
*/
@Test
public void testConvert() {
//The expected value
String expected = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
......@@ -126,6 +132,83 @@ public class ParamUtilsTest {
assertNull(paramsMap2);
}
/**
* Test some new params related to task
*/
@Test
public void testConvertForParamsRelatedTask() throws Exception {
// start to form some test data for new paramters
Map<String,Property> globalParams = new HashMap<>();
Map<String,String> globalParamsMap = new HashMap<>();
Property taskInstanceIdProperty = new Property();
String propName = "task_execution_id";
String paramValue = String.format("${%s}", Constants.PARAMETER_TASK_INSTANCE_ID);
taskInstanceIdProperty.setProp(propName);
taskInstanceIdProperty.setDirect(Direct.IN);
taskInstanceIdProperty.setType(DataType.VARCHAR);
taskInstanceIdProperty.setValue(paramValue);
globalParams.put(propName,taskInstanceIdProperty);
globalParamsMap.put(propName,paramValue);
Property taskExecutionPathProperty = new Property();
propName = "task_execution_path";
paramValue = String.format("${%s}", Constants.PARAMETER_TASK_EXECUTE_PATH);
taskExecutionPathProperty.setProp(propName);
taskExecutionPathProperty.setDirect(Direct.IN);
taskExecutionPathProperty.setType(DataType.VARCHAR);
taskExecutionPathProperty.setValue(paramValue);
globalParams.put(propName,taskExecutionPathProperty);
globalParamsMap.put(propName,paramValue);
Calendar calendar = Calendar.getInstance();
calendar.set(2019,11,30);
Date date = calendar.getTime();
List<Property> globalParamList = globalParams.values().stream().collect(Collectors.toList());
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("params test");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp/test");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(0);
taskExecutionContext.setScheduleTime(date);
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
taskExecutionContext.setDefinedParams(globalParamsMap);
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\","
+ "\"localParams\":"
+ "[{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${system.task.instance.id}\"},"
+ "{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ "\",\"value\":\"${system.task.execute.path}\"}],\"resourceList\":[]}");
ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class);
//The expected value
String expected = "{\"task_execution_id\":{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"1\"},"
+ "\"task_execution_path\":{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"/tmp/test\"}}";
//The expected value when globalParams is null but localParams is not null
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters);
String result = JSONUtils.toJsonString(paramsMap);
Map<String,String> resultMap = JSONUtils.parseObject(result,Map.class);
Map<String,String> expectedMap = JSONUtils.parseObject(expected,Map.class);
result = JSONUtils.toJsonString(resultMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
expected = JSONUtils.toJsonString(expectedMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
assertEquals(expected, result);
}
/**
* Test the overload method of convert
*/
......
......@@ -14,17 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.shell;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
package org.apache.dolphinscheduler.server.worker.shell;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
......@@ -32,6 +27,13 @@ import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
......@@ -45,9 +47,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.Date;
import java.util.List;
/**
* python shell command executor test
*/
......@@ -84,21 +83,21 @@ public class ShellCommandExecutorTest {
taskProps.setTaskTimeout(360000);
taskProps.setTaskInstanceId(7657);
TaskInstance taskInstance = processService.findTaskInstanceById(7657);
// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
// taskProps.setTaskParams(taskNode.getParams());
// custom logger
// Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
// taskInstance.getProcessDefinitionId(),
// taskInstance.getProcessInstanceId(),
// taskInstance.getId()));
// AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
// TaskInstance taskInstance = processService.findTaskInstanceById(7657);
//
// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
// taskProps.setTaskParams(taskNode.getParams());
//
//
// // custom logger
// Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
// taskInstance.getProcessDefine().getCode(),
// taskInstance.getProcessDefine().getVersion(),
// taskInstance.getProcessInstanceId(),
// taskInstance.getId()));
//
//
// AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
AbstractTask task = null;
......
......@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.Assert;
import org.junit.Before;
......@@ -205,15 +206,15 @@ public class SqoopTaskTest {
@Test
public void testLogHandler() throws InterruptedException {
List<String> list = Collections.synchronizedList(new ArrayList<>());
LinkedBlockingQueue<String> loggerBuffer = new LinkedBlockingQueue<>();
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
list.add("test add log");
loggerBuffer.add("test add log");
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
sqoopTask.logHandle(list);
sqoopTask.logHandle(loggerBuffer);
}
});
thread1.start();
......
......@@ -258,8 +258,8 @@ public class ProcessService {
* @return process instance
*/
private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) {
processInstance.setState(ExecutionStatus.WAITTING_THREAD);
if (command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD) {
processInstance.setState(ExecutionStatus.WAITING_THREAD);
if (command.getCommandType() != CommandType.RECOVER_WAITING_THREAD) {
processInstance.addHistoryCmd(command.getCommandType());
}
saveProcessInstance(processInstance);
......@@ -522,7 +522,7 @@ public class ProcessService {
// process instance quit by "waiting thread" state
if (originCommand == null) {
Command command = new Command(
CommandType.RECOVER_WAITTING_THREAD,
CommandType.RECOVER_WAITING_THREAD,
processInstance.getTaskDependType(),
processInstance.getFailureStrategy(),
processInstance.getExecutorId(),
......@@ -539,14 +539,14 @@ public class ProcessService {
}
// update the command time if current command if recover from waiting
if (originCommand.getCommandType() == CommandType.RECOVER_WAITTING_THREAD) {
if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) {
originCommand.setUpdateTime(new Date());
saveCommand(originCommand);
} else {
// delete old command and create new waiting thread command
commandMapper.deleteById(originCommand.getId());
originCommand.setId(0);
originCommand.setCommandType(CommandType.RECOVER_WAITTING_THREAD);
originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
originCommand.setUpdateTime(new Date());
originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
......@@ -809,7 +809,7 @@ public class ProcessService {
break;
case START_CURRENT_TASK_PROCESS:
break;
case RECOVER_WAITTING_THREAD:
case RECOVER_WAITING_THREAD:
break;
case RECOVER_SUSPENDED_PROCESS:
// find pause tasks and init task's state
......
......@@ -44,18 +44,22 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* abstract registry client
* registry client singleton
*/
@Service
public class RegistryClient extends RegistryCenter {
private static final Logger logger = LoggerFactory.getLogger(RegistryClient.class);
private void loadRegistry() {
init();
private static RegistryClient registryClient = new RegistryClient();
private RegistryClient() {
super.init();
}
public static RegistryClient getInstance() {
return registryClient;
}
/**
......
......@@ -206,7 +206,7 @@ public class ProcessServiceTest {
processService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
Command recoverCommand = new Command();
recoverCommand.setCommandType(CommandType.RECOVER_WAITTING_THREAD);
recoverCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
processService.createRecoveryWaitingThreadCommand(recoverCommand, subProcessInstance);
Command repeatRunningCommand = new Command();
......
......@@ -21,43 +21,37 @@ import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.spi.register.Registry;
import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.Sets;
@RunWith(MockitoJUnitRunner.Silent.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
public class RegistryClientTest {
@InjectMocks
private RegistryClient registryClient;
@Mock
private Registry registry;
@Before
public void before() {
// registry=mock(Registry.class);
}
@Test
public void te() throws Exception {
doNothing().when(registry).persist(Mockito.anyString(), Mockito.anyString());
doNothing().when(registry).update(Mockito.anyString(), Mockito.anyString());
given(registry.releaseLock(Mockito.anyString())).willReturn(true);
given(registry.getChildren("/dead-servers")).willReturn(Arrays.asList("worker_127.0.0.1:8089"));
public void test() throws Exception {
Registry registry = PowerMockito.mock(Registry.class);
PowerMockito.doNothing().when(registry).persist(Mockito.anyString(), Mockito.anyString());
PowerMockito.doNothing().when(registry).update(Mockito.anyString(), Mockito.anyString());
PowerMockito.when(registry.releaseLock(Mockito.anyString())).thenReturn(true);
PowerMockito.when(registry.getChildren("/dead-servers")).thenReturn(Arrays.asList("worker_127.0.0.1:8089"));
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
registryClient = PowerMockito.mock(RegistryClient.class);
registryClient.persist("/key", "");
registryClient.update("/key", "");
registryClient.releaseLock("/key");
......
......@@ -30,8 +30,10 @@ import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -80,7 +82,7 @@ public class DolphinPluginLoader {
public void loadPlugins()
throws Exception {
for (File file : listPluginDirs(installedPluginsDir)) {
for (File file : listPluginInstanceDirs(installedPluginsDir)) {
if (file.isDirectory()) {
loadPlugin(file.getAbsolutePath());
}
......@@ -145,7 +147,7 @@ public class DolphinPluginLoader {
throws Exception {
logger.info("Classpath for {}:", dir.getName());
List<URL> urls = new ArrayList<>();
for (File file : listPluginDirs(dir)) {
for (File file : listPluginInstanceJars(dir)) {
logger.info(" {}", file);
urls.add(file.toURI().toURL());
}
......@@ -172,12 +174,28 @@ public class DolphinPluginLoader {
return new DolphinPluginClassLoader(urls, parent, DOLPHIN_SPI_PACKAGES);
}
private static List<File> listPluginDirs(File installedPluginsDir) {
private static List<File> listPluginInstanceDirs(File installedPluginsDir) {
if (installedPluginsDir != null && installedPluginsDir.isDirectory()) {
File[] files = installedPluginsDir.listFiles();
if (files != null) {
Arrays.sort(files);
return ImmutableList.copyOf(files);
Optional<File> isNotDir = Arrays.stream(files).filter(file -> !file.isDirectory()).findAny();
if (isNotDir.isPresent()) {
return ImmutableList.of(installedPluginsDir);
} else {
Arrays.sort(files);
return ImmutableList.copyOf(files);
}
}
}
return ImmutableList.of();
}
private static List<File> listPluginInstanceJars(File installedPluginsDir) {
if (installedPluginsDir != null && installedPluginsDir.isDirectory()) {
File[] files = installedPluginsDir.listFiles();
if (files != null) {
return ImmutableList.copyOf(Arrays.stream(files).filter(file -> file.isFile() && file.getName().endsWith(".jar"))
.collect(Collectors.toList()));
}
}
return ImmutableList.of();
......
......@@ -132,7 +132,7 @@ const runningType = [
},
{
desc: `${i18n.$t('Recovery waiting thread')}`,
code: 'RECOVER_WAITTING_THREAD'
code: 'RECOVER_WAITING_THREAD'
}
]
......@@ -216,14 +216,14 @@ const tasksState = {
icoUnicode: 'el-icon-remove-outline',
isSpin: false
},
WAITTING_THREAD: {
WAITING_THREAD: {
id: 10,
desc: `${i18n.$t('Waiting for thread')}`,
color: '#912eed',
icoUnicode: 'ri-time-line',
isSpin: false
},
WAITTING_DEPEND: {
WAITING_DEPEND: {
id: 11,
desc: `${i18n.$t('Waiting for dependence')}`,
color: '#5101be',
......
......@@ -72,7 +72,7 @@
]).then((data) => {
let item = data[0]
let flag = false
if (item.state !== 'WAITTING_THREAD' && item.state !== 'SUCCESS' && item.state !== 'PAUSE' && item.state !== 'FAILURE' && item.state !== 'STOP') {
if (item.state !== 'WAITING_THREAD' && item.state !== 'SUCCESS' && item.state !== 'PAUSE' && item.state !== 'FAILURE' && item.state !== 'STOP') {
flag = true
} else {
flag = false
......@@ -93,7 +93,7 @@
this.getInstancedetail(this.$route.params.id).then(res => {
let item = res
let flag = false
if (item.state !== 'WAITTING_THREAD' && item.state !== 'SUCCESS' && item.state !== 'PAUSE' && item.state !== 'FAILURE' && item.state !== 'STOP') {
if (item.state !== 'WAITING_THREAD' && item.state !== 'SUCCESS' && item.state !== 'PAUSE' && item.state !== 'FAILURE' && item.state !== 'STOP') {
flag = true
} else {
flag = false
......
......@@ -21,7 +21,7 @@
<m-list-box-f>
<template slot="name"><strong>*</strong>{{$t('Datasource')}}</template>
<template slot="content" size="small">
<el-select style="width: 100%;" v-model="type">
<el-select style="width: 100%;" v-model="type" :disabled="this.item.id">
<el-option v-for="item in datasourceTypeList" :key="item.value" :value="item.value" :label="item.label">
</el-option>
</el-select>
......
......@@ -55,10 +55,10 @@ const stateType = [
code: 'KILL',
label: `${i18n.$t('Kill')}`
}, {
code: 'WAITTING_THREAD',
code: 'WAITING_THREAD',
label: `${i18n.$t('Waiting for thread')}`
}, {
code: 'WAITTING_DEPEND',
code: 'WAITING_DEPEND',
label: `${i18n.$t('Waiting for dependency to complete')}`
}, {
code: 'DELAY_EXECUTION',
......
......@@ -19,6 +19,7 @@
ref="popup"
:ok-text="$t('Confirm')"
:nameText="$t('Related items')"
@close="_close"
@ok="_ok">
<template slot="content">
<div class="create-tenement-model">
......@@ -58,6 +59,9 @@
tmp: Boolean
},
methods: {
_close () {
this.$emit('closeRelatedItems')
},
_ok () {
if (this._verification()) {
if (this.tmp) {
......
......@@ -109,7 +109,7 @@
this.$message.warning(`${i18n.$t('Please enter group name')}`)
return false
}
if (this.alertInstanceIds) {
if (this.alertInstanceIds.length === 0) {
this.$message.warning(`${i18n.$t('Select Alarm plugin instance')}`)
return false
}
......
......@@ -108,6 +108,10 @@
this.$message.warning(`${i18n.$t('Please generate token')}`)
return false
}
if (!this.expireTime) {
this.$message.warning(`${i18n.$t('Please Select token')}`)
return false
}
return true
},
_submit () {
......
......@@ -576,6 +576,7 @@ export default {
'There is no data for this period of time': 'There is no data for this period of time',
'Worker addresses cannot be empty': 'Worker addresses cannot be empty',
'Please generate token': 'Please generate token',
'Please Select token': 'Please select the expiration time of token',
'Spark Version': 'Spark Version',
TargetDataBase: 'target database',
TargetTable: 'target table',
......
......@@ -576,6 +576,7 @@ export default {
'There is no data for this period of time': '该时间段无数据',
'Worker addresses cannot be empty': 'Worker地址不能为空',
'Please generate token': '请生成Token',
'Please Select token': '请选择Token失效时间',
'Spark Version': 'Spark版本',
TargetDataBase: '目标库',
TargetTable: '目标表',
......
......@@ -34,6 +34,11 @@
<property name="optional" value="true"/>
</module>
<module name="LineLength">
<property name="max" value="200"/>
<property name="ignorePattern" value="^ *\* *[^ ]+$"/>
</module>
<module name="TreeWalker">
<module name="OuterTypeFilename">
<property name="severity" value="error"/>
......@@ -73,11 +78,6 @@
<property name="allowNonPrintableEscapes" value="true"/>
</module>
<module name="LineLength">
<property name="max" value="200"/>
<property name="ignorePattern" value="^ *\* *[^ ]+$"/>
</module>
<module name="EmptyBlock">
<property name="option" value="TEXT"/>
<property name="tokens" value="LITERAL_TRY, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册