Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
pentaLiker
DolphinScheduler
提交
8f5f1a61
DolphinScheduler
项目概览
pentaLiker
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8f5f1a61
编写于
11月 09, 2020
作者:
B
baoliang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
merge from 1.3.3-release
上级
cbf47dad
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
144 addition
and
114 deletion
+144
-114
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
...inscheduler/server/master/processor/TaskAckProcessor.java
+3
-25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
...eduler/server/master/processor/TaskResponseProcessor.java
+4
-26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
...uler/server/master/processor/queue/TaskResponseEvent.java
+31
-5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
...er/server/master/processor/queue/TaskResponseService.java
+45
-13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
...g/apache/dolphinscheduler/server/worker/WorkerServer.java
+11
-0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
...scheduler/server/worker/processor/NettyRemoteChannel.java
+6
-0
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
...cheduler/server/worker/processor/TaskCallbackService.java
+20
-10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
...heduler/server/worker/processor/TaskExecuteProcessor.java
+15
-24
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
...phinscheduler/server/worker/runner/TaskExecuteThread.java
+6
-8
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
...heduler/server/master/processor/TaskAckProcessorTest.java
+1
-1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
...erver/master/processor/queue/TaskResponseServiceTest.java
+2
-2
未找到文件。
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
浏览文件 @
8f5f1a61
...
...
@@ -17,14 +17,9 @@
package
org.apache.dolphinscheduler.server.master.processor
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
SLEEP_TIME_MILLIS
;
import
org.apache.dolphinscheduler.common.enums.ExecutionStatus
;
import
org.apache.dolphinscheduler.common.thread.Stopper
;
import
org.apache.dolphinscheduler.common.thread.ThreadUtils
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.dolphinscheduler.common.utils.Preconditions
;
import
org.apache.dolphinscheduler.dao.entity.TaskInstance
;
import
org.apache.dolphinscheduler.remote.command.Command
;
import
org.apache.dolphinscheduler.remote.command.CommandType
;
import
org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand
;
...
...
@@ -35,7 +30,6 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent
;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService
;
import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext
;
import
org.apache.dolphinscheduler.service.process.ProcessService
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -59,16 +53,9 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
private
final
TaskInstanceCacheManager
taskInstanceCacheManager
;
/**
* processService
*/
private
ProcessService
processService
;
public
TaskAckProcessor
()
{
public
TaskAckProcessor
(){
this
.
taskResponseService
=
SpringApplicationContext
.
getBean
(
TaskResponseService
.
class
);
this
.
taskInstanceCacheManager
=
SpringApplicationContext
.
getBean
(
TaskInstanceCacheManagerImpl
.
class
);
this
.
processService
=
SpringApplicationContext
.
getBean
(
ProcessService
.
class
);
}
/**
...
...
@@ -94,19 +81,10 @@ public class TaskAckProcessor implements NettyRequestProcessor {
workerAddress
,
taskAckCommand
.
getExecutePath
(),
taskAckCommand
.
getLogPath
(),
taskAckCommand
.
getTaskInstanceId
());
taskAckCommand
.
getTaskInstanceId
(),
channel
);
taskResponseService
.
addResponse
(
taskResponseEvent
);
while
(
Stopper
.
isRunning
())
{
TaskInstance
taskInstance
=
processService
.
findTaskInstanceById
(
taskAckCommand
.
getTaskInstanceId
());
if
(
taskInstance
!=
null
&&
ackStatus
.
typeIsRunning
())
{
break
;
}
ThreadUtils
.
sleep
(
SLEEP_TIME_MILLIS
);
}
}
}
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
浏览文件 @
8f5f1a61
...
...
@@ -17,14 +17,9 @@
package
org.apache.dolphinscheduler.server.master.processor
;
import
static
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
SLEEP_TIME_MILLIS
;
import
org.apache.dolphinscheduler.common.enums.ExecutionStatus
;
import
org.apache.dolphinscheduler.common.thread.Stopper
;
import
org.apache.dolphinscheduler.common.thread.ThreadUtils
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.dolphinscheduler.common.utils.Preconditions
;
import
org.apache.dolphinscheduler.dao.entity.TaskInstance
;
import
org.apache.dolphinscheduler.remote.command.Command
;
import
org.apache.dolphinscheduler.remote.command.CommandType
;
import
org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand
;
...
...
@@ -34,7 +29,6 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent
;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService
;
import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext
;
import
org.apache.dolphinscheduler.service.process.ProcessService
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -58,15 +52,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
private
final
TaskInstanceCacheManager
taskInstanceCacheManager
;
/**
* processService
*/
private
ProcessService
processService
;
public
TaskResponseProcessor
()
{
public
TaskResponseProcessor
(){
this
.
taskResponseService
=
SpringApplicationContext
.
getBean
(
TaskResponseService
.
class
);
this
.
taskInstanceCacheManager
=
SpringApplicationContext
.
getBean
(
TaskInstanceCacheManagerImpl
.
class
);
this
.
processService
=
SpringApplicationContext
.
getBean
(
ProcessService
.
class
);
}
/**
...
...
@@ -85,26 +73,16 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager
.
cacheTaskInstance
(
responseCommand
);
ExecutionStatus
responseStatus
=
ExecutionStatus
.
of
(
responseCommand
.
getStatus
());
// TaskResponseEvent
TaskResponseEvent
taskResponseEvent
=
TaskResponseEvent
.
newResult
(
ExecutionStatus
.
of
(
responseCommand
.
getStatus
()),
responseCommand
.
getEndTime
(),
responseCommand
.
getProcessId
(),
responseCommand
.
getAppIds
(),
responseCommand
.
getTaskInstanceId
(),
responseCommand
.
getVarPool
()
);
responseCommand
.
getVarPool
()
,
channel
);
taskResponseService
.
addResponse
(
taskResponseEvent
);
while
(
Stopper
.
isRunning
())
{
TaskInstance
taskInstance
=
processService
.
findTaskInstanceById
(
taskResponseEvent
.
getTaskInstanceId
());
if
(
taskInstance
!=
null
&&
responseStatus
.
typeIsFinished
())
{
break
;
}
ThreadUtils
.
sleep
(
SLEEP_TIME_MILLIS
);
}
}
}
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
浏览文件 @
8f5f1a61
...
...
@@ -18,10 +18,14 @@
package
org.apache.dolphinscheduler.server.master.processor.queue
;
import
com.fasterxml.jackson.annotation.JsonFormat
;
import
org.apache.dolphinscheduler.common.enums.Event
;
import
org.apache.dolphinscheduler.common.enums.ExecutionStatus
;
import
java.util.Date
;
import
io.netty.channel.Channel
;
/**
* task event
*/
...
...
@@ -83,8 +87,19 @@ public class TaskResponseEvent {
* varPool
*/
private
String
varPool
;
/**
* channel
*/
private
Channel
channel
;
public
static
TaskResponseEvent
newAck
(
ExecutionStatus
state
,
Date
startTime
,
String
workerAddress
,
String
executePath
,
String
logPath
,
int
taskInstanceId
)
{
public
static
TaskResponseEvent
newAck
(
ExecutionStatus
state
,
Date
startTime
,
String
workerAddress
,
String
executePath
,
String
logPath
,
int
taskInstanceId
,
Channel
channel
)
{
TaskResponseEvent
event
=
new
TaskResponseEvent
();
event
.
setState
(
state
);
event
.
setStartTime
(
startTime
);
...
...
@@ -93,10 +108,17 @@ public class TaskResponseEvent {
event
.
setLogPath
(
logPath
);
event
.
setTaskInstanceId
(
taskInstanceId
);
event
.
setEvent
(
Event
.
ACK
);
event
.
setChannel
(
channel
);
return
event
;
}
public
static
TaskResponseEvent
newResult
(
ExecutionStatus
state
,
Date
endTime
,
int
processId
,
String
appIds
,
int
taskInstanceId
,
String
varPool
)
{
public
static
TaskResponseEvent
newResult
(
ExecutionStatus
state
,
Date
endTime
,
int
processId
,
String
appIds
,
int
taskInstanceId
,
String
varPool
,
Channel
channel
)
{
TaskResponseEvent
event
=
new
TaskResponseEvent
();
event
.
setState
(
state
);
event
.
setEndTime
(
endTime
);
...
...
@@ -105,6 +127,7 @@ public class TaskResponseEvent {
event
.
setTaskInstanceId
(
taskInstanceId
);
event
.
setEvent
(
Event
.
RESULT
);
event
.
setVarPool
(
varPool
);
event
.
setChannel
(
channel
);
return
event
;
}
...
...
@@ -196,8 +219,11 @@ public class TaskResponseEvent {
this
.
event
=
event
;
}
public
enum
Event
{
ACK
,
RESULT
;
public
Channel
getChannel
()
{
return
channel
;
}
public
void
setChannel
(
Channel
channel
)
{
this
.
channel
=
channel
;
}
}
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
浏览文件 @
8f5f1a61
...
...
@@ -17,7 +17,13 @@
package
org.apache.dolphinscheduler.server.master.processor.queue
;
import
io.netty.channel.Channel
;
import
org.apache.dolphinscheduler.common.enums.Event
;
import
org.apache.dolphinscheduler.common.enums.ExecutionStatus
;
import
org.apache.dolphinscheduler.common.thread.Stopper
;
import
org.apache.dolphinscheduler.dao.entity.TaskInstance
;
import
org.apache.dolphinscheduler.remote.command.DBTaskAckCommand
;
import
org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand
;
import
org.apache.dolphinscheduler.service.process.ProcessService
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -123,24 +129,50 @@ public class TaskResponseService {
* @param taskResponseEvent taskResponseEvent
*/
private
void
persist
(
TaskResponseEvent
taskResponseEvent
){
TaskResponseEvent
.
Event
event
=
taskResponseEvent
.
getEvent
();
Event
event
=
taskResponseEvent
.
getEvent
();
Channel
channel
=
taskResponseEvent
.
getChannel
();
switch
(
event
){
case
ACK:
processService
.
changeTaskState
(
taskResponseEvent
.
getState
(),
taskResponseEvent
.
getStartTime
(),
taskResponseEvent
.
getWorkerAddress
(),
taskResponseEvent
.
getExecutePath
(),
taskResponseEvent
.
getLogPath
(),
taskResponseEvent
.
getTaskInstanceId
());
try
{
TaskInstance
taskInstance
=
processService
.
findTaskInstanceById
(
taskResponseEvent
.
getTaskInstanceId
());
if
(
taskInstance
!=
null
){
processService
.
changeTaskState
(
taskResponseEvent
.
getState
(),
taskResponseEvent
.
getStartTime
(),
taskResponseEvent
.
getWorkerAddress
(),
taskResponseEvent
.
getExecutePath
(),
taskResponseEvent
.
getLogPath
(),
taskResponseEvent
.
getTaskInstanceId
());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand
taskAckCommand
=
new
DBTaskAckCommand
(
ExecutionStatus
.
SUCCESS
.
getCode
(),
taskResponseEvent
.
getTaskInstanceId
());
channel
.
writeAndFlush
(
taskAckCommand
.
convert2Command
());
}
catch
(
Exception
e
){
logger
.
error
(
"worker ack master error"
,
e
);
DBTaskAckCommand
taskAckCommand
=
new
DBTaskAckCommand
(
ExecutionStatus
.
FAILURE
.
getCode
(),-
1
);
channel
.
writeAndFlush
(
taskAckCommand
.
convert2Command
());
}
break
;
case
RESULT:
processService
.
changeTaskState
(
taskResponseEvent
.
getState
(),
taskResponseEvent
.
getEndTime
(),
taskResponseEvent
.
getProcessId
(),
taskResponseEvent
.
getAppIds
(),
taskResponseEvent
.
getTaskInstanceId
(),
taskResponseEvent
.
getVarPool
());
try
{
TaskInstance
taskInstance
=
processService
.
findTaskInstanceById
(
taskResponseEvent
.
getTaskInstanceId
());
if
(
taskInstance
!=
null
){
processService
.
changeTaskState
(
taskResponseEvent
.
getState
(),
taskResponseEvent
.
getEndTime
(),
taskResponseEvent
.
getProcessId
(),
taskResponseEvent
.
getAppIds
(),
taskResponseEvent
.
getTaskInstanceId
(),
taskResponseEvent
.
getVarPool
()
);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand
taskResponseCommand
=
new
DBTaskResponseCommand
(
ExecutionStatus
.
SUCCESS
.
getCode
(),
taskResponseEvent
.
getTaskInstanceId
());
channel
.
writeAndFlush
(
taskResponseCommand
.
convert2Command
());
}
catch
(
Exception
e
){
logger
.
error
(
"worker response master error"
,
e
);
DBTaskResponseCommand
taskResponseCommand
=
new
DBTaskResponseCommand
(
ExecutionStatus
.
FAILURE
.
getCode
(),-
1
);
channel
.
writeAndFlush
(
taskResponseCommand
.
convert2Command
());
}
break
;
default
:
throw
new
IllegalArgumentException
(
"invalid event type : "
+
event
);
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
浏览文件 @
8f5f1a61
...
...
@@ -22,9 +22,12 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import
org.apache.dolphinscheduler.remote.command.CommandType
;
import
org.apache.dolphinscheduler.remote.config.NettyServerConfig
;
import
org.apache.dolphinscheduler.server.worker.config.WorkerConfig
;
import
org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor
;
import
org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor
;
import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor
;
import
org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor
;
import
org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry
;
import
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread
;
import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -70,6 +73,9 @@ public class WorkerServer {
@Autowired
private
SpringApplicationContext
springApplicationContext
;
@Autowired
private
RetryReportTaskStatusThread
retryReportTaskStatusThread
;
/**
* worker server startup
*
...
...
@@ -95,11 +101,16 @@ public class WorkerServer {
this
.
nettyRemotingServer
=
new
NettyRemotingServer
(
serverConfig
);
this
.
nettyRemotingServer
.
registerProcessor
(
CommandType
.
TASK_EXECUTE_REQUEST
,
new
TaskExecuteProcessor
());
this
.
nettyRemotingServer
.
registerProcessor
(
CommandType
.
TASK_KILL_REQUEST
,
new
TaskKillProcessor
());
this
.
nettyRemotingServer
.
registerProcessor
(
CommandType
.
DB_TASK_ACK
,
new
DBTaskAckProcessor
());
this
.
nettyRemotingServer
.
registerProcessor
(
CommandType
.
DB_TASK_RESPONSE
,
new
DBTaskResponseProcessor
());
this
.
nettyRemotingServer
.
start
();
// worker registry
this
.
workerRegistry
.
registry
();
// retry report task status
this
.
retryReportTaskStatusThread
.
start
();
/**
* register hooks, which are called before the process exits
*/
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
浏览文件 @
8f5f1a61
...
...
@@ -50,6 +50,12 @@ public class NettyRemoteChannel {
this
.
opaque
=
opaque
;
}
public
NettyRemoteChannel
(
Channel
channel
)
{
this
.
channel
=
channel
;
this
.
host
=
ChannelUtils
.
toAddress
(
channel
);
this
.
opaque
=
-
1
;
}
public
Channel
getChannel
()
{
return
channel
;
}
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
浏览文件 @
8f5f1a61
...
...
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import
org.apache.dolphinscheduler.common.utils.CollectionUtils
;
import
org.apache.dolphinscheduler.remote.NettyRemotingClient
;
import
org.apache.dolphinscheduler.remote.command.Command
;
import
org.apache.dolphinscheduler.remote.command.CommandType
;
import
org.apache.dolphinscheduler.remote.config.NettyClientConfig
;
import
org.apache.dolphinscheduler.remote.utils.Host
;
import
org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter
;
...
...
@@ -66,6 +67,8 @@ public class TaskCallbackService {
public
TaskCallbackService
(){
final
NettyClientConfig
clientConfig
=
new
NettyClientConfig
();
this
.
nettyRemotingClient
=
new
NettyRemotingClient
(
clientConfig
);
this
.
nettyRemotingClient
.
registerProcessor
(
CommandType
.
DB_TASK_ACK
,
new
DBTaskAckProcessor
());
this
.
nettyRemotingClient
.
registerProcessor
(
CommandType
.
DB_TASK_RESPONSE
,
new
DBTaskResponseProcessor
());
}
/**
...
...
@@ -83,16 +86,17 @@ public class TaskCallbackService {
* @return callback channel
*/
private
NettyRemoteChannel
getRemoteChannel
(
int
taskInstanceId
){
Channel
newChannel
;
NettyRemoteChannel
nettyRemoteChannel
=
REMOTE_CHANNELS
.
get
(
taskInstanceId
);
if
(
nettyRemoteChannel
=
=
null
){
throw
new
IllegalArgumentException
(
"nettyRemoteChannel is empty, should call addRemoteChannel first"
);
}
if
(
nettyRemoteChannel
.
isActive
()){
return
nettyRemoteChannel
;
}
Channel
newChannel
=
nettyRemotingClient
.
getChannel
(
nettyRemoteChannel
.
getHost
()
);
if
(
newChannel
!=
null
){
return
getRemoteChannel
(
newChannel
,
nettyRemoteChannel
.
getOpaque
(),
taskInstanceId
);
if
(
nettyRemoteChannel
!
=
null
){
if
(
nettyRemoteChannel
.
isActive
()){
return
nettyRemoteChannel
;
}
newChannel
=
nettyRemotingClient
.
getChannel
(
nettyRemoteChannel
.
getHost
())
;
if
(
newChannel
!=
null
){
return
getRemoteChannel
(
newChannel
,
nettyRemoteChannel
.
getOpaque
(),
taskInstanceId
);
}
}
logger
.
warn
(
"original master : {} for task : {} is not reachable, random select master"
,
nettyRemoteChannel
.
getHost
(),
...
...
@@ -116,7 +120,7 @@ public class TaskCallbackService {
for
(
String
masterNode
:
masterNodes
)
{
newChannel
=
nettyRemotingClient
.
getChannel
(
Host
.
of
(
masterNode
));
if
(
newChannel
!=
null
)
{
return
getRemoteChannel
(
newChannel
,
nettyRemoteChannel
.
getOpaque
(),
taskInstanceId
);
return
getRemoteChannel
(
newChannel
,
taskInstanceId
);
}
}
masterNodes
=
null
;
...
...
@@ -138,6 +142,12 @@ public class TaskCallbackService {
return
remoteChannel
;
}
private
NettyRemoteChannel
getRemoteChannel
(
Channel
newChannel
,
int
taskInstanceId
){
NettyRemoteChannel
remoteChannel
=
new
NettyRemoteChannel
(
newChannel
);
addRemoteChannel
(
taskInstanceId
,
remoteChannel
);
return
remoteChannel
;
}
/**
* remove callback channels
* @param taskInstanceId taskInstanceId
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
浏览文件 @
8f5f1a61
...
...
@@ -17,6 +17,7 @@
package
org.apache.dolphinscheduler.server.worker.processor
;
import
org.apache.dolphinscheduler.common.enums.Event
;
import
org.apache.dolphinscheduler.common.enums.ExecutionStatus
;
import
org.apache.dolphinscheduler.common.enums.TaskType
;
import
org.apache.dolphinscheduler.common.thread.ThreadUtils
;
...
...
@@ -34,19 +35,18 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor
;
import
org.apache.dolphinscheduler.server.entity.TaskExecutionContext
;
import
org.apache.dolphinscheduler.server.utils.LogUtils
;
import
org.apache.dolphinscheduler.server.worker.cache.ResponceCache
;
import
org.apache.dolphinscheduler.server.worker.config.WorkerConfig
;
import
org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread
;
import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext
;
import
java.util.Date
;
import
java.util.Optional
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.ExecutorService
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.github.rholder.retry.RetryException
;
import
io.netty.channel.Channel
;
...
...
@@ -100,15 +100,16 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger
.
error
(
"task execution context is null"
);
return
;
}
taskExecutionContext
.
setHost
(
NetUtils
.
getHost
()
+
":"
+
workerConfig
.
getListenPort
());
// custom logger
Logger
taskLogger
=
LoggerFactory
.
getLogger
(
LoggerUtils
.
buildTaskId
(
LoggerUtils
.
TASK_LOGGER_INFO_PREFIX
,
taskExecutionContext
.
getProcessDefineId
(),
taskExecutionContext
.
getProcessInstanceId
(),
taskExecutionContext
.
getTaskInstanceId
()));
taskExecutionContext
.
setHost
(
NetUtils
.
getHost
()
+
":"
+
workerConfig
.
getListenPort
());
taskExecutionContext
.
setStartTime
(
new
Date
());
taskExecutionContext
.
setLogPath
(
LogUtils
.
getTaskLogPath
(
taskExecutionContext
));
// local execute path
String
execLocalPath
=
getExecLocalPath
(
taskExecutionContext
);
logger
.
info
(
"task instance local execute path : {} "
,
execLocalPath
);
...
...
@@ -126,27 +127,17 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService
.
addRemoteChannel
(
taskExecutionContext
.
getTaskInstanceId
(),
new
NettyRemoteChannel
(
channel
,
command
.
getOpaque
()));
if
(
DateUtils
.
getRemainTime
(
taskExecutionContext
.
getFirstSubmitTime
(),
taskExecutionContext
.
getDelayTime
()
*
60L
)
>
0
)
{
taskExecutionContext
.
setCurrentExecutionStatus
(
ExecutionStatus
.
DELAY_EXECUTION
);
taskExecutionContext
.
setStartTime
(
null
);
}
else
{
taskExecutionContext
.
setCurrentExecutionStatus
(
ExecutionStatus
.
RUNNING_EXECUTION
);
taskExecutionContext
.
setStartTime
(
new
Date
());
}
this
.
doAck
(
taskExecutionContext
);
// tell master the status of this task (RUNNING_EXECUTION or DELAY_EXECUTION)
final
Command
ackCommand
=
buildAckCommand
(
taskExecutionContext
).
convert2Command
();
// submit task
workerExecService
.
submit
(
new
TaskExecuteThread
(
taskExecutionContext
,
taskCallbackService
,
taskLogger
));
}
try
{
RetryerUtils
.
retryCall
(()
->
{
taskCallbackService
.
sendAck
(
taskExecutionContext
.
getTaskInstanceId
(),
ackCommand
);
return
Boolean
.
TRUE
;
});
// submit task
workerExecService
.
submit
(
new
TaskExecuteThread
(
taskExecutionContext
,
taskCallbackService
,
taskLogger
));
}
catch
(
ExecutionException
|
RetryException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
private
void
doAck
(
TaskExecutionContext
taskExecutionContext
){
// tell master that task is in executing
TaskExecuteAckCommand
ackCommand
=
buildAckCommand
(
taskExecutionContext
);
ResponceCache
.
get
().
cache
(
taskExecutionContext
.
getTaskInstanceId
(),
ackCommand
.
convert2Command
(),
Event
.
ACK
);
taskCallbackService
.
sendAck
(
taskExecutionContext
.
getTaskInstanceId
(),
ackCommand
.
convert2Command
());
}
/**
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
浏览文件 @
8f5f1a61
...
...
@@ -17,6 +17,7 @@
package
org.apache.dolphinscheduler.server.worker.runner
;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.dolphinscheduler.common.enums.Event
;
import
org.apache.dolphinscheduler.common.enums.ExecutionStatus
;
import
org.apache.dolphinscheduler.common.enums.TaskType
;
import
org.apache.dolphinscheduler.common.model.TaskNode
;
...
...
@@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import
org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand
;
import
org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand
;
import
org.apache.dolphinscheduler.server.entity.TaskExecutionContext
;
import
org.apache.dolphinscheduler.server.worker.cache.ResponceCache
;
import
org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager
;
import
org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl
;
import
org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService
;
...
...
@@ -165,14 +167,10 @@ public class TaskExecuteThread implements Runnable {
responseCommand
.
setProcessId
(
task
.
getProcessId
());
responseCommand
.
setAppIds
(
task
.
getAppIds
());
}
finally
{
try
{
taskExecutionContext
.
setCurrentExecutionStatus
(
ExecutionStatus
.
of
(
responseCommand
.
getStatus
()));
taskExecutionContextCacheManager
.
removeByTaskInstanceId
(
taskExecutionContext
.
getTaskInstanceId
());
taskCallbackService
.
sendResult
(
taskExecutionContext
.
getTaskInstanceId
(),
responseCommand
.
convert2Command
());
}
catch
(
Exception
e
)
{
ThreadUtils
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
taskCallbackService
.
sendResult
(
taskExecutionContext
.
getTaskInstanceId
(),
responseCommand
.
convert2Command
());
}
taskExecutionContextCacheManager
.
removeByTaskInstanceId
(
taskExecutionContext
.
getTaskInstanceId
());
ResponceCache
.
get
().
cache
(
taskExecutionContext
.
getTaskInstanceId
(),
responseCommand
.
convert2Command
(),
Event
.
RESULT
);
taskCallbackService
.
sendResult
(
taskExecutionContext
.
getTaskInstanceId
(),
responseCommand
.
convert2Command
());
}
}
...
...
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
浏览文件 @
8f5f1a61
...
...
@@ -91,7 +91,7 @@ public class TaskAckProcessorTest {
PowerMockito
.
when
(
channel
.
remoteAddress
()).
thenReturn
(
socketAddress
);
PowerMockito
.
mockStatic
(
TaskResponseEvent
.
class
);
PowerMockito
.
when
(
TaskResponseEvent
.
newAck
(
Mockito
.
any
(),
Mockito
.
any
(),
Mockito
.
anyString
(),
Mockito
.
anyString
(),
Mockito
.
anyString
(),
Mockito
.
anyInt
()))
PowerMockito
.
when
(
TaskResponseEvent
.
newAck
(
Mockito
.
any
(),
Mockito
.
any
(),
Mockito
.
anyString
(),
Mockito
.
anyString
(),
Mockito
.
anyString
(),
Mockito
.
anyInt
()
,
channel
))
.
thenReturn
(
taskResponseEvent
);
TaskInstance
taskInstance
=
PowerMockito
.
mock
(
TaskInstance
.
class
);
PowerMockito
.
when
(
processService
.
findTaskInstanceById
(
Mockito
.
any
())).
thenReturn
(
taskInstance
);
...
...
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
浏览文件 @
8f5f1a61
...
...
@@ -46,7 +46,7 @@ public class TaskResponseServiceTest {
@Test
public
void
testAdd
(){
TaskResponseEvent
taskResponseEvent
=
TaskResponseEvent
.
newAck
(
ExecutionStatus
.
RUNNING_EXECUTION
,
new
Date
(),
""
,
""
,
""
,
1
);
""
,
""
,
""
,
1
,
null
);
taskResponseService
.
addResponse
(
taskResponseEvent
);
Assert
.
assertTrue
(
taskResponseService
.
getEventQueue
().
size
()
==
1
);
try
{
...
...
@@ -60,7 +60,7 @@ public class TaskResponseServiceTest {
@Test
public
void
testStop
(){
TaskResponseEvent
taskResponseEvent
=
TaskResponseEvent
.
newAck
(
ExecutionStatus
.
RUNNING_EXECUTION
,
new
Date
(),
""
,
""
,
""
,
1
);
""
,
""
,
""
,
1
,
null
);
taskResponseService
.
addResponse
(
taskResponseEvent
);
taskResponseService
.
stop
();
Assert
.
assertTrue
(
taskResponseService
.
getEventQueue
().
size
()
==
0
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录