Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
DolphinScheduler
提交
aa8b88a8
DolphinScheduler
项目概览
apache
/
DolphinScheduler
上一次同步 1 年多
通知
707
Star
9572
Fork
3514
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
aa8b88a8
编写于
7月 11, 2022
作者:
C
caishunfeng
提交者:
GitHub
7月 11, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[Feature-10871] add workflow executing data query (#10875)
* add workflow executing data query * fix sonar check for interrupted
上级
553159f7
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
719 addition
and
76 deletion
+719
-76
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
...e/dolphinscheduler/api/controller/ExecutorController.java
+29
-10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
...inscheduler/api/controller/ProcessInstanceController.java
+26
-22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
...in/java/org/apache/dolphinscheduler/api/enums/Status.java
+1
-0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
.../apache/dolphinscheduler/api/service/ExecutorService.java
+8
-0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
...olphinscheduler/api/service/impl/ExecutorServiceImpl.java
+25
-0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
...cheduler/api/service/impl/ProcessInstanceServiceImpl.java
+34
-32
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java
...r/server/master/controller/WorkflowExecuteController.java
+51
-0
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java
...ster/processor/WorkflowExecutingDataRequestProcessor.java
+65
-0
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
...e/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+5
-0
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java
...phinscheduler/server/master/service/ExecutingService.java
+75
-0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
...g/apache/dolphinscheduler/remote/command/CommandType.java
+11
-1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java
...r/remote/command/WorkflowExecutingDataRequestCommand.java
+50
-0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java
.../remote/command/WorkflowExecutingDataResponseCommand.java
+51
-0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java
...e/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java
+109
-0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java
...pache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java
+154
-0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
...scheduler/remote/processor/StateEventCallbackService.java
+25
-11
未找到文件。
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
浏览文件 @
aa8b88a8
...
...
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.controller;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
BATCH_EXECUTE_PROCESS_INSTANCE_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
CHECK_PROCESS_DEFINITION_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
EXECUTE_PROCESS_INSTANCE_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_EXECUTING_WORKFLOW_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
START_PROCESS_INSTANCE_ERROR
;
import
org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation
;
...
...
@@ -38,11 +39,23 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import
org.apache.dolphinscheduler.common.enums.WarningType
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.dolphinscheduler.dao.entity.User
;
import
org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto
;
import
org.apache.commons.lang3.StringUtils
;
import
java.text.MessageFormat
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.stream.Collectors
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.web.bind.annotation.GetMapping
;
import
org.springframework.web.bind.annotation.PathVariable
;
import
org.springframework.web.bind.annotation.PostMapping
;
import
org.springframework.web.bind.annotation.RequestAttribute
;
...
...
@@ -58,16 +71,6 @@ import io.swagger.annotations.ApiOperation;
import
io.swagger.annotations.ApiParam
;
import
springfox.documentation.annotations.ApiIgnore
;
import
org.apache.commons.lang3.StringUtils
;
import
java.text.MessageFormat
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.stream.Collectors
;
/**
* executor controller
*/
...
...
@@ -361,4 +364,20 @@ public class ExecutorController extends BaseController {
Map
<
String
,
Object
>
result
=
execService
.
startCheckByProcessDefinedCode
(
processDefinitionCode
);
return
returnDataList
(
result
);
}
/**
* query execute data of processInstance from master
*/
@ApiOperation
(
value
=
"queryExecutingWorkflow"
,
notes
=
"QUERY_WORKFLOW_EXECUTE_DATA"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"processInstanceId"
,
value
=
"PROCESS_INSTANCE_ID"
,
required
=
true
,
dataType
=
"Int"
,
example
=
"100"
)
})
@GetMapping
(
value
=
"/query-executing-workflow"
)
@ResponseStatus
(
HttpStatus
.
OK
)
@ApiException
(
QUERY_EXECUTING_WORKFLOW_ERROR
)
@AccessLogAnnotation
public
Result
queryExecutingWorkflow
(
@RequestParam
(
"id"
)
Integer
processInstanceId
)
{
WorkflowExecuteDto
workflowExecuteDto
=
execService
.
queryExecutingWorkflowByProcessInstanceId
(
processInstanceId
);
return
Result
.
success
(
workflowExecuteDto
);
}
}
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
浏览文件 @
aa8b88a8
...
...
@@ -17,19 +17,17 @@
package
org.apache.dolphinscheduler.api.controller
;
import
java.io.IOException
;
import
java.text.MessageFormat
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
DELETE_PROCESS_INSTANCE_BY_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_PROCESS_INSTANCE_BY_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
UPDATE_PROCESS_INSTANCE_ERROR
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiImplicitParam
;
import
io.swagger.annotations.ApiImplicitParams
;
import
io.swagger.annotations.ApiOperation
;
import
io.swagger.annotations.ApiParam
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation
;
import
org.apache.dolphinscheduler.api.enums.Status
;
import
org.apache.dolphinscheduler.api.exceptions.ApiException
;
...
...
@@ -40,6 +38,16 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import
org.apache.dolphinscheduler.dao.entity.ProcessInstance
;
import
org.apache.dolphinscheduler.dao.entity.User
;
import
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus
;
import
org.apache.commons.lang3.StringUtils
;
import
java.io.IOException
;
import
java.text.MessageFormat
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -54,17 +62,13 @@ import org.springframework.web.bind.annotation.RequestMapping;
import
org.springframework.web.bind.annotation.RequestParam
;
import
org.springframework.web.bind.annotation.ResponseStatus
;
import
org.springframework.web.bind.annotation.RestController
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiImplicitParam
;
import
io.swagger.annotations.ApiImplicitParams
;
import
io.swagger.annotations.ApiOperation
;
import
io.swagger.annotations.ApiParam
;
import
springfox.documentation.annotations.ApiIgnore
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
DELETE_PROCESS_INSTANCE_BY_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_PROCESS_INSTANCE_BY_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_ERROR
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
enums
.
Status
.
UPDATE_PROCESS_INSTANCE_ERROR
;
/**
* process instance controller
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
浏览文件 @
aa8b88a8
...
...
@@ -216,6 +216,7 @@ public enum Status {
QUERY_AUTHORIZED_USER
(
10183
,
"query authorized user error"
,
"查询拥有项目权限的用户错误"
),
PROJECT_NOT_EXIST
(
10190
,
"This project was not found. Please refresh page."
,
"该项目不存在,请刷新页面"
),
TASK_INSTANCE_HOST_IS_NULL
(
10191
,
"task instance host is null"
,
"任务实例host为空"
),
QUERY_EXECUTING_WORKFLOW_ERROR
(
10192
,
"query executing workflow error"
,
"查询运行的工作流实例错误"
),
UDF_FUNCTION_NOT_EXIST
(
20001
,
"UDF function not found"
,
"UDF函数不存在"
),
UDF_FUNCTION_EXISTS
(
20002
,
"UDF function already exists"
,
"UDF函数已存在"
),
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
浏览文件 @
aa8b88a8
...
...
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import
org.apache.dolphinscheduler.common.enums.WarningType
;
import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition
;
import
org.apache.dolphinscheduler.dao.entity.User
;
import
org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto
;
import
java.util.Map
;
...
...
@@ -111,4 +112,11 @@ public interface ExecutorService {
* @return
*/
Map
<
String
,
Object
>
forceStartTaskInstance
(
User
loginUser
,
int
queueId
);
/**
* query executing workflow data in Master memory
* @param processInstanceId
* @return
*/
WorkflowExecuteDto
queryExecutingWorkflowByProcessInstanceId
(
Integer
processInstanceId
);
}
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
浏览文件 @
aa8b88a8
...
...
@@ -69,6 +69,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import
org.apache.dolphinscheduler.plugin.task.api.TaskConstants
;
import
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus
;
import
org.apache.dolphinscheduler.remote.command.StateEventChangeCommand
;
import
org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand
;
import
org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand
;
import
org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto
;
import
org.apache.dolphinscheduler.remote.processor.StateEventCallbackService
;
import
org.apache.dolphinscheduler.remote.utils.Host
;
import
org.apache.dolphinscheduler.service.cron.CronUtils
;
...
...
@@ -991,4 +994,26 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
return
null
;
}
/**
* query executing data of processInstance by master
* @param processInstanceId
* @return
*/
@Override
public
WorkflowExecuteDto
queryExecutingWorkflowByProcessInstanceId
(
Integer
processInstanceId
)
{
ProcessInstance
processInstance
=
processService
.
findProcessInstanceDetailById
(
processInstanceId
);
if
(
processInstance
==
null
)
{
return
null
;
}
Host
host
=
new
Host
(
processInstance
.
getHost
());
WorkflowExecutingDataRequestCommand
requestCommand
=
new
WorkflowExecutingDataRequestCommand
();
requestCommand
.
setProcessInstanceId
(
processInstanceId
);
org
.
apache
.
dolphinscheduler
.
remote
.
command
.
Command
command
=
stateEventCallbackService
.
sendSync
(
host
,
requestCommand
.
convert2Command
());
if
(
command
==
null
)
{
return
null
;
}
WorkflowExecutingDataResponseCommand
responseCommand
=
JSONUtils
.
parseObject
(
command
.
getBody
(),
WorkflowExecutingDataResponseCommand
.
class
);
return
responseCommand
.
getWorkflowExecuteDto
();
}
}
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
浏览文件 @
aa8b88a8
...
...
@@ -17,26 +17,17 @@
package
org.apache.dolphinscheduler.api.service.impl
;
import
java.io.BufferedReader
;
import
java.io.ByteArrayInputStream
;
import
java.io.IOException
;
import
java.io.InputStreamReader
;
import
java.nio.charset.StandardCharsets
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
import
com.baomidou.mybatisplus.core.metadata.IPage
;
import
com.baomidou.mybatisplus.extension.plugins.pagination.Page
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
constants
.
ApiFuncIdentificationConstant
.
INSTANCE_DELETE
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
constants
.
ApiFuncIdentificationConstant
.
INSTANCE_UPDATE
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
constants
.
ApiFuncIdentificationConstant
.
WORKFLOW_INSTANCE
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
DATA_LIST
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
DEPENDENT_SPLIT
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
GLOBAL_PARAMS
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
LOCAL_PARAMS
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
PROCESS_INSTANCE_STATE
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
TASK_LIST
;
import
static
org
.
apache
.
dolphinscheduler
.
plugin
.
task
.
api
.
TaskConstants
.
TASK_TYPE_DEPENDENT
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.dolphinscheduler.api.dto.gantt.GanttDto
;
import
org.apache.dolphinscheduler.api.dto.gantt.Task
;
import
org.apache.dolphinscheduler.api.enums.Status
;
...
...
@@ -51,7 +42,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import
org.apache.dolphinscheduler.api.utils.Result
;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.dolphinscheduler.common.enums.Flag
;
import
org.apache.dolphinscheduler.service.expand.CuringParamsService
;
import
org.apache.dolphinscheduler.common.graph.DAG
;
import
org.apache.dolphinscheduler.common.model.TaskNode
;
import
org.apache.dolphinscheduler.common.model.TaskNodeRelation
;
...
...
@@ -81,21 +71,34 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus
;
import
org.apache.dolphinscheduler.plugin.task.api.model.Property
;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode
;
import
org.apache.dolphinscheduler.service.expand.CuringParamsService
;
import
org.apache.dolphinscheduler.service.process.ProcessService
;
import
org.apache.dolphinscheduler.service.task.TaskPluginManager
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
java.io.BufferedReader
;
import
java.io.ByteArrayInputStream
;
import
java.io.IOException
;
import
java.io.InputStreamReader
;
import
java.nio.charset.StandardCharsets
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
constants
.
ApiFuncIdentificationConstant
.
INSTANCE_DELETE
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
constants
.
ApiFuncIdentificationConstant
.
INSTANCE_UPDATE
;
import
static
org
.
apache
.
dolphinscheduler
.
api
.
constants
.
ApiFuncIdentificationConstant
.
WORKFLOW_INSTANCE
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
DATA_LIST
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
DEPENDENT_SPLIT
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
GLOBAL_PARAMS
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
LOCAL_PARAMS
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
PROCESS_INSTANCE_STATE
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
TASK_LIST
;
import
static
org
.
apache
.
dolphinscheduler
.
plugin
.
task
.
api
.
TaskConstants
.
TASK_TYPE_DEPENDENT
;
import
com.baomidou.mybatisplus.core.metadata.IPage
;
import
com.baomidou.mybatisplus.extension.plugins.pagination.Page
;
/**
* process instance service impl
...
...
@@ -459,7 +462,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
String
locations
,
int
timeout
,
String
tenantCode
)
{
Project
project
=
projectMapper
.
queryByCode
(
projectCode
);
//check user access for project
Map
<
String
,
Object
>
result
=
projectService
.
checkProjectAndAuth
(
loginUser
,
project
,
projectCode
,
INSTANCE_UPDATE
);
Map
<
String
,
Object
>
result
=
projectService
.
checkProjectAndAuth
(
loginUser
,
project
,
projectCode
,
INSTANCE_UPDATE
);
if
(
result
.
get
(
Constants
.
STATUS
)
!=
Status
.
SUCCESS
)
{
return
result
;
}
...
...
@@ -833,5 +836,4 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
public
List
<
ProcessInstance
>
queryByProcessDefineCode
(
Long
processDefinitionCode
,
int
size
)
{
return
processInstanceMapper
.
queryByProcessDefineCode
(
processDefinitionCode
,
size
);
}
}
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java
0 → 100644
浏览文件 @
aa8b88a8
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.server.master.controller
;
import
org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto
;
import
org.apache.dolphinscheduler.server.master.service.ExecutingService
;
import
java.util.Optional
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.web.bind.annotation.GetMapping
;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.RequestParam
;
import
org.springframework.web.bind.annotation.ResponseStatus
;
import
org.springframework.web.bind.annotation.RestController
;
@RestController
@RequestMapping
(
"/workflow/execute"
)
public
class
WorkflowExecuteController
{
@Autowired
private
ExecutingService
executingService
;
/**
* query workflow execute data in memory
* @param processInstanceId
* @return
*/
@GetMapping
(
""
)
@ResponseStatus
(
HttpStatus
.
OK
)
public
WorkflowExecuteDto
queryExecuteData
(
@RequestParam
(
"id"
)
int
processInstanceId
)
{
Optional
<
WorkflowExecuteDto
>
workflowExecuteDtoOptional
=
executingService
.
queryWorkflowExecutingData
(
processInstanceId
);
return
workflowExecuteDtoOptional
.
orElse
(
null
);
}
}
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java
0 → 100644
浏览文件 @
aa8b88a8
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.server.master.processor
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.dolphinscheduler.remote.command.Command
;
import
org.apache.dolphinscheduler.remote.command.CommandType
;
import
org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand
;
import
org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand
;
import
org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto
;
import
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor
;
import
org.apache.dolphinscheduler.server.master.service.ExecutingService
;
import
java.util.Optional
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
com.google.common.base.Preconditions
;
import
io.netty.channel.Channel
;
/**
* workflow executing data process from api/master
*/
@Component
public
class
WorkflowExecutingDataRequestProcessor
implements
NettyRequestProcessor
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
WorkflowExecutingDataRequestProcessor
.
class
);
@Autowired
private
ExecutingService
executingService
;
@Override
public
void
process
(
Channel
channel
,
Command
command
)
{
Preconditions
.
checkArgument
(
CommandType
.
WORKFLOW_EXECUTING_DATA_REQUEST
==
command
.
getType
(),
String
.
format
(
"invalid command type: %s"
,
command
.
getType
()));
WorkflowExecutingDataRequestCommand
requestCommand
=
JSONUtils
.
parseObject
(
command
.
getBody
(),
WorkflowExecutingDataRequestCommand
.
class
);
logger
.
info
(
"received command, processInstanceId:{}"
,
requestCommand
.
getProcessInstanceId
());
Optional
<
WorkflowExecuteDto
>
workflowExecuteDtoOptional
=
executingService
.
queryWorkflowExecutingData
(
requestCommand
.
getProcessInstanceId
());
WorkflowExecutingDataResponseCommand
responseCommand
=
new
WorkflowExecutingDataResponseCommand
();
workflowExecuteDtoOptional
.
ifPresent
(
responseCommand:
:
setWorkflowExecuteDto
);
channel
.
writeAndFlush
(
responseCommand
.
convert2Command
(
command
.
getOpaque
()));
}
}
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
浏览文件 @
aa8b88a8
...
...
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponsePr
import
org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor
;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor
;
import
org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor
;
import
org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor
;
import
javax.annotation.PostConstruct
;
...
...
@@ -74,6 +75,9 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private
LoggerRequestProcessor
loggerRequestProcessor
;
@Autowired
private
WorkflowExecutingDataRequestProcessor
workflowExecutingDataRequestProcessor
;
@PostConstruct
private
void
init
()
{
// init remoting server
...
...
@@ -88,6 +92,7 @@ public class MasterRPCServer implements AutoCloseable {
this
.
nettyRemotingServer
.
registerProcessor
(
CommandType
.
TASK_WAKEUP_EVENT_REQUEST
,
taskEventProcessor
);
this
.
nettyRemotingServer
.
registerProcessor
(
CommandType
.
CACHE_EXPIRE
,
cacheProcessor
);
this
.
nettyRemotingServer
.
registerProcessor
(
CommandType
.
TASK_RECALL
,
taskRecallProcessor
);
this
.
nettyRemotingServer
.
registerProcessor
(
CommandType
.
WORKFLOW_EXECUTING_DATA_REQUEST
,
workflowExecutingDataRequestProcessor
);
// logger server
this
.
nettyRemotingServer
.
registerProcessor
(
CommandType
.
GET_LOG_BYTES_REQUEST
,
loggerRequestProcessor
);
...
...
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java
0 → 100644
浏览文件 @
aa8b88a8
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.server.master.service
;
import
org.apache.dolphinscheduler.dao.entity.TaskInstance
;
import
org.apache.dolphinscheduler.remote.dto.TaskInstanceExecuteDto
;
import
org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto
;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager
;
import
org.apache.dolphinscheduler.server.master.controller.WorkflowExecuteController
;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable
;
import
org.apache.commons.beanutils.BeanUtils
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.compress.utils.Lists
;
import
java.lang.reflect.InvocationTargetException
;
import
java.util.List
;
import
java.util.Optional
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
/**
* executing service, to query executing data from memory, such workflow instance
*/
@Component
public
class
ExecutingService
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
WorkflowExecuteController
.
class
);
@Autowired
private
ProcessInstanceExecCacheManager
processInstanceExecCacheManager
;
public
Optional
<
WorkflowExecuteDto
>
queryWorkflowExecutingData
(
Integer
processInstanceId
)
{
WorkflowExecuteRunnable
workflowExecuteRunnable
=
processInstanceExecCacheManager
.
getByProcessInstanceId
(
processInstanceId
);
if
(
workflowExecuteRunnable
==
null
)
{
logger
.
info
(
"workflow execute data not found, maybe it has finished, workflow id:{}"
,
processInstanceId
);
return
Optional
.
empty
();
}
try
{
WorkflowExecuteDto
workflowExecuteDto
=
new
WorkflowExecuteDto
();
BeanUtils
.
copyProperties
(
workflowExecuteDto
,
workflowExecuteRunnable
.
getProcessInstance
());
List
<
TaskInstanceExecuteDto
>
taskInstanceList
=
Lists
.
newArrayList
();
if
(
CollectionUtils
.
isNotEmpty
(
workflowExecuteRunnable
.
getAllTaskInstances
()))
{
for
(
TaskInstance
taskInstance
:
workflowExecuteRunnable
.
getAllTaskInstances
())
{
TaskInstanceExecuteDto
taskInstanceExecuteDto
=
new
TaskInstanceExecuteDto
();
BeanUtils
.
copyProperties
(
taskInstanceExecuteDto
,
taskInstance
);
taskInstanceList
.
add
(
taskInstanceExecuteDto
);
}
}
workflowExecuteDto
.
setTaskInstances
(
taskInstanceList
);
return
Optional
.
of
(
workflowExecuteDto
);
}
catch
(
IllegalAccessException
|
InvocationTargetException
e
)
{
logger
.
error
(
"query workflow execute data fail, workflow id:{}"
,
processInstanceId
,
e
);
}
return
Optional
.
empty
();
}
}
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
浏览文件 @
aa8b88a8
...
...
@@ -158,5 +158,15 @@ public enum CommandType {
/**
* task state event request
*/
TASK_WAKEUP_EVENT_REQUEST
;
TASK_WAKEUP_EVENT_REQUEST
,
/**
* workflow executing data request, from api to master
*/
WORKFLOW_EXECUTING_DATA_REQUEST
,
/**
* workflow executing data response, from master to api
*/
WORKFLOW_EXECUTING_DATA_RESPONSE
;
}
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java
0 → 100644
浏览文件 @
aa8b88a8
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.remote.command
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
java.io.Serializable
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
/**
* workflow executing data request, from api to master
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public
class
WorkflowExecutingDataRequestCommand
implements
Serializable
{
private
Integer
processInstanceId
;
/**
* package request command
*
* @return command
*/
public
Command
convert2Command
()
{
Command
command
=
new
Command
();
command
.
setType
(
CommandType
.
WORKFLOW_EXECUTING_DATA_REQUEST
);
byte
[]
body
=
JSONUtils
.
toJsonByteArray
(
this
);
command
.
setBody
(
body
);
return
command
;
}
}
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java
0 → 100644
浏览文件 @
aa8b88a8
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.remote.command
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto
;
import
java.io.Serializable
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
/**
* workflow executing data response, from master to api
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public
class
WorkflowExecutingDataResponseCommand
implements
Serializable
{
private
WorkflowExecuteDto
workflowExecuteDto
;
/**
* package request command
*
* @return command
*/
public
Command
convert2Command
(
long
opaque
)
{
Command
command
=
new
Command
(
opaque
);
command
.
setType
(
CommandType
.
WORKFLOW_EXECUTING_DATA_RESPONSE
);
byte
[]
body
=
JSONUtils
.
toJsonByteArray
(
this
);
command
.
setBody
(
body
);
return
command
;
}
}
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java
0 → 100644
浏览文件 @
aa8b88a8
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.remote.dto
;
import
org.apache.dolphinscheduler.common.enums.Flag
;
import
org.apache.dolphinscheduler.common.enums.Priority
;
import
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus
;
import
java.util.Date
;
import
java.util.Map
;
import
lombok.Data
;
@Data
public
class
TaskInstanceExecuteDto
{
private
int
id
;
private
String
name
;
private
String
taskType
;
private
int
processInstanceId
;
private
long
taskCode
;
private
int
taskDefinitionVersion
;
private
String
processInstanceName
;
private
int
taskGroupPriority
;
private
ExecutionStatus
state
;
private
Date
firstSubmitTime
;
private
Date
submitTime
;
private
Date
startTime
;
private
Date
endTime
;
private
String
host
;
private
String
executePath
;
private
String
logPath
;
private
int
retryTimes
;
private
Flag
alertFlag
;
private
int
pid
;
private
String
appLink
;
private
Flag
flag
;
private
String
duration
;
private
int
maxRetryTimes
;
private
int
retryInterval
;
private
Priority
taskInstancePriority
;
private
Priority
processInstancePriority
;
private
String
workerGroup
;
private
Long
environmentCode
;
private
String
environmentConfig
;
private
int
executorId
;
private
String
varPool
;
private
String
executorName
;
private
Map
<
String
,
String
>
resources
;
private
int
delayTime
;
private
String
taskParams
;
private
int
dryRun
;
private
int
taskGroupId
;
private
Integer
cpuQuota
;
private
Integer
memoryMax
;
}
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java
0 → 100644
浏览文件 @
aa8b88a8
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.remote.dto
;
import
org.apache.dolphinscheduler.common.enums.CommandType
;
import
org.apache.dolphinscheduler.common.enums.FailureStrategy
;
import
org.apache.dolphinscheduler.common.enums.Flag
;
import
org.apache.dolphinscheduler.common.enums.Priority
;
import
org.apache.dolphinscheduler.common.enums.TaskDependType
;
import
org.apache.dolphinscheduler.common.enums.WarningType
;
import
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus
;
import
java.util.Collection
;
import
java.util.Date
;
import
lombok.Getter
;
import
lombok.Setter
;
@Setter
@Getter
public
class
WorkflowExecuteDto
{
private
int
id
;
private
String
name
;
private
Long
processDefinitionCode
;
private
int
processDefinitionVersion
;
private
ExecutionStatus
state
;
/**
* recovery flag for failover
*/
private
Flag
recovery
;
private
Date
startTime
;
private
Date
endTime
;
private
int
runTimes
;
private
String
host
;
private
CommandType
commandType
;
private
String
commandParam
;
/**
* node depend type
*/
private
TaskDependType
taskDependType
;
private
int
maxTryTimes
;
/**
* failure strategy when task failed.
*/
private
FailureStrategy
failureStrategy
;
/**
* warning type
*/
private
WarningType
warningType
;
private
Integer
warningGroupId
;
private
Date
scheduleTime
;
private
Date
commandStartTime
;
/**
* user define parameters string
*/
private
String
globalParams
;
/**
* executor id
*/
private
int
executorId
;
/**
* executor name
*/
private
String
executorName
;
/**
* tenant code
*/
private
String
tenantCode
;
/**
* queue
*/
private
String
queue
;
/**
* process is sub process
*/
private
Flag
isSubProcess
;
/**
* history command
*/
private
String
historyCmd
;
/**
* depend processes schedule time
*/
private
String
dependenceScheduleTimes
;
private
String
duration
;
private
Priority
processInstancePriority
;
private
String
workerGroup
;
private
Long
environmentCode
;
private
int
timeout
;
private
int
tenantId
;
/**
* varPool string
*/
private
String
varPool
;
private
int
nextProcessInstanceId
;
private
int
dryRun
;
private
Date
restartTime
;
private
boolean
isBlocked
;
private
Collection
<
TaskInstanceExecuteDto
>
taskInstances
;
}
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
浏览文件 @
aa8b88a8
...
...
@@ -17,11 +17,13 @@
package
org.apache.dolphinscheduler.remote.processor
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
HTTP_CONNECTION_REQUEST_TIMEOUT
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
SLEEP_TIME_MILLIS
;
import
org.apache.dolphinscheduler.remote.NettyRemotingClient
;
import
org.apache.dolphinscheduler.remote.command.Command
;
import
org.apache.dolphinscheduler.remote.config.NettyClientConfig
;
import
org.apache.dolphinscheduler.remote.exceptions.RemotingException
;
import
org.apache.dolphinscheduler.remote.utils.Host
;
import
java.util.Optional
;
...
...
@@ -110,17 +112,6 @@ public class StateEventCallbackService {
REMOTE_CHANNELS
.
remove
(
host
);
}
/**
* Send the command to target address, this method doesn't guarantee the command send success.
*
* @param command command need tp send
*/
public
void
sendResult
(
String
address
,
int
port
,
Command
command
)
{
logger
.
info
(
"send result, host:{}, command:{}"
,
address
,
command
.
toString
());
Host
host
=
new
Host
(
address
,
port
);
sendResult
(
host
,
command
);
}
/**
* Send the command to target host, this method doesn't guarantee the command send success.
*
...
...
@@ -133,4 +124,27 @@ public class StateEventCallbackService {
nettyRemoteChannel
.
writeAndFlush
(
command
);
});
}
/**
* send sync and return response command
* @param host
* @param requestCommand
* @return
* @throws RemotingException
* @throws InterruptedException
*/
public
Command
sendSync
(
Host
host
,
Command
requestCommand
)
{
try
{
return
this
.
nettyRemotingClient
.
sendSync
(
host
,
requestCommand
,
HTTP_CONNECTION_REQUEST_TIMEOUT
);
}
catch
(
InterruptedException
e
)
{
logger
.
error
(
"send sync fail, host:{}, command:{}"
,
host
,
requestCommand
,
e
);
Thread
.
currentThread
().
interrupt
();
}
catch
(
RemotingException
e
)
{
logger
.
error
(
"send sync fail, host:{}, command:{}"
,
host
,
requestCommand
,
e
);
}
finally
{
this
.
nettyRemotingClient
.
closeChannel
(
host
);
}
return
null
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录