Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
DolphinScheduler
提交
3a351da7
DolphinScheduler
项目概览
apache
/
DolphinScheduler
上一次同步 1 年多
通知
705
Star
9572
Fork
3514
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
3a351da7
编写于
12月 08, 2021
作者:
W
wind
提交者:
GitHub
12月 08, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix failover logic (#7275)
Co-authored-by:
N
caishunfeng
<
534328519@qq.com
>
上级
f21eb8f9
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
84 addition
and
47 deletion
+84
-47
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
...cheduler/server/master/registry/MasterRegistryClient.java
+84
-47
未找到文件。
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
浏览文件 @
3a351da7
...
...
@@ -48,7 +48,9 @@ import org.apache.commons.lang.StringUtils;
import
java.util.Collections
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
...
...
@@ -194,7 +196,7 @@ public class MasterRegistryClient {
failoverMaster
(
serverHost
);
break
;
case
WORKER:
failoverWorker
(
serverHost
,
true
,
true
);
failoverWorker
(
serverHost
);
break
;
default
:
break
;
...
...
@@ -275,79 +277,114 @@ public class MasterRegistryClient {
* 3. failover all tasks when workerHost is null
*
* @param workerHost worker host
* @param needCheckWorkerAlive need check worker alive
* @param checkOwner need check process instance owner
*/
private
void
failoverWorker
(
String
workerHost
,
boolean
needCheckWorkerAlive
,
boolean
checkOwner
)
{
logger
.
info
(
"start worker[{}] failover ..."
,
workerHost
);
private
void
failoverWorker
(
String
workerHost
)
{
if
(
StringUtils
.
isEmpty
(
workerHost
))
{
return
;
}
long
startTime
=
System
.
currentTimeMillis
();
List
<
TaskInstance
>
needFailoverTaskInstanceList
=
processService
.
queryNeedFailoverTaskInstances
(
workerHost
);
for
(
TaskInstance
taskInstance
:
needFailoverTaskInstanceList
)
{
if
(
needCheckWorkerAlive
)
{
if
(!
checkTaskInstanceNeedFailover
(
taskInstance
))
{
continue
;
}
}
Map
<
Integer
,
ProcessInstance
>
processInstanceCacheMap
=
new
HashMap
<>();
logger
.
info
(
"start worker[{}] failover, task list size:{}"
,
workerHost
,
needFailoverTaskInstanceList
.
size
());
ProcessInstance
processInstance
=
processService
.
findProcessInstanceDetailById
(
taskInstance
.
getProcessInstanceId
());
if
(
workerHost
==
null
||
!
checkOwner
||
processInstance
.
getHost
().
equalsIgnoreCase
(
getLocalAddress
()))
{
// only failover the task owned myself if worker down.
for
(
TaskInstance
taskInstance
:
needFailoverTaskInstanceList
)
{
ProcessInstance
processInstance
=
processInstanceCacheMap
.
get
(
taskInstance
.
getProcessInstanceId
());
if
(
processInstance
==
null
)
{
processInstance
=
processService
.
findProcessInstanceDetailById
(
taskInstance
.
getProcessInstanceId
());
if
(
processInstance
==
null
)
{
logger
.
error
(
"failover
error, the process {} of task {} do not exists.
"
,
logger
.
error
(
"failover
task instance error, processInstance {} of taskInstance {} is null
"
,
taskInstance
.
getProcessInstanceId
(),
taskInstance
.
getId
());
continue
;
}
taskInstance
.
setProcessInstance
(
processInstance
);
TaskExecutionContext
taskExecutionContext
=
TaskExecutionContextBuilder
.
get
()
.
buildTaskInstanceRelatedInfo
(
taskInstance
)
.
buildProcessInstanceRelatedInfo
(
processInstance
)
.
create
();
// only kill yarn job if exists , the local thread has exited
ProcessUtils
.
killYarnJob
(
taskExecutionContext
);
taskInstance
.
setState
(
ExecutionStatus
.
NEED_FAULT_TOLERANCE
);
processService
.
saveTaskInstance
(
taskInstance
);
if
(!
processInstanceExecCacheManager
.
contains
(
processInstance
.
getId
()))
{
continue
;
}
WorkflowExecuteThread
workflowExecuteThreadNotify
=
processInstanceExecCacheManager
.
getByProcessInstanceId
(
processInstance
.
getId
());
StateEvent
stateEvent
=
new
StateEvent
();
stateEvent
.
setTaskInstanceId
(
taskInstance
.
getId
());
stateEvent
.
setType
(
StateEventType
.
TASK_STATE_CHANGE
);
stateEvent
.
setProcessInstanceId
(
processInstance
.
getId
());
stateEvent
.
setExecutionStatus
(
taskInstance
.
getState
());
workflowExecuteThreadNotify
.
addStateEvent
(
stateEvent
);
processInstanceCacheMap
.
put
(
processInstance
.
getId
(),
processInstance
);
}
// only failover the task owned myself if worker down.
if
(
processInstance
.
getHost
().
equalsIgnoreCase
(
getLocalAddress
()))
{
logger
.
info
(
"failover task instance id: {}, process instance id: {}"
,
taskInstance
.
getId
(),
taskInstance
.
getProcessInstanceId
());
failoverTaskInstance
(
processInstance
,
taskInstance
);
}
}
logger
.
info
(
"end worker[{}] failover
..."
,
workerHost
);
logger
.
info
(
"end worker[{}] failover
, useTime:{}ms"
,
workerHost
,
System
.
currentTimeMillis
()
-
startTime
);
}
/**
* failover master tasks
* failover master
* <p>
* failover process instance and associated task instance
*
* @param masterHost master host
*/
private
void
failoverMaster
(
String
masterHost
)
{
logger
.
info
(
"start master failover ..."
);
if
(
StringUtils
.
isEmpty
(
masterHost
))
{
return
;
}
long
startTime
=
System
.
currentTimeMillis
();
List
<
ProcessInstance
>
needFailoverProcessInstanceList
=
processService
.
queryNeedFailoverProcessInstances
(
masterHost
);
logger
.
info
(
"start master[{}] failover, process list size:{}"
,
masterHost
,
needFailoverProcessInstanceList
.
size
());
logger
.
info
(
"failover process list size:{} "
,
needFailoverProcessInstanceList
.
size
());
//updateProcessInstance host is null and insert into command
for
(
ProcessInstance
processInstance
:
needFailoverProcessInstanceList
)
{
logger
.
info
(
"failover process instance id: {} host:{}"
,
processInstance
.
getId
(),
processInstance
.
getHost
());
if
(
Constants
.
NULL
.
equals
(
processInstance
.
getHost
()))
{
continue
;
}
logger
.
info
(
"failover process instance id: {}"
,
processInstance
.
getId
());
List
<
TaskInstance
>
validTaskInstanceList
=
processService
.
findValidTaskListByProcessId
(
processInstance
.
getId
());
for
(
TaskInstance
taskInstance
:
validTaskInstanceList
)
{
if
(
Constants
.
NULL
.
equals
(
taskInstance
.
getHost
()))
{
continue
;
}
logger
.
info
(
"failover task instance id: {}, process instance id: {}"
,
taskInstance
.
getId
(),
taskInstance
.
getProcessInstanceId
());
failoverTaskInstance
(
processInstance
,
taskInstance
);
}
//updateProcessInstance host is null and insert into command
processService
.
processNeedFailoverProcessInstances
(
processInstance
);
}
failoverWorker
(
masterHost
,
true
,
false
);
logger
.
info
(
"master failover end"
);
logger
.
info
(
"master[{}] failover end, useTime:{}ms"
,
masterHost
,
System
.
currentTimeMillis
()
-
startTime
);
}
private
void
failoverTaskInstance
(
ProcessInstance
processInstance
,
TaskInstance
taskInstance
)
{
if
(
taskInstance
==
null
)
{
logger
.
error
(
"failover task instance error, taskInstance is null"
);
return
;
}
if
(
processInstance
==
null
)
{
logger
.
error
(
"failover task instance error, processInstance {} of taskInstance {} is null"
,
taskInstance
.
getProcessInstanceId
(),
taskInstance
.
getId
());
return
;
}
if
(!
checkTaskInstanceNeedFailover
(
taskInstance
))
{
return
;
}
taskInstance
.
setProcessInstance
(
processInstance
);
TaskExecutionContext
taskExecutionContext
=
TaskExecutionContextBuilder
.
get
()
.
buildTaskInstanceRelatedInfo
(
taskInstance
)
.
buildProcessInstanceRelatedInfo
(
processInstance
)
.
create
();
// only kill yarn job if exists , the local thread has exited
ProcessUtils
.
killYarnJob
(
taskExecutionContext
);
taskInstance
.
setState
(
ExecutionStatus
.
NEED_FAULT_TOLERANCE
);
processService
.
saveTaskInstance
(
taskInstance
);
WorkflowExecuteThread
workflowExecuteThreadNotify
=
processInstanceExecCacheManager
.
getByProcessInstanceId
(
processInstance
.
getId
());
if
(
workflowExecuteThreadNotify
==
null
)
{
return
;
}
StateEvent
stateEvent
=
new
StateEvent
();
stateEvent
.
setTaskInstanceId
(
taskInstance
.
getId
());
stateEvent
.
setType
(
StateEventType
.
TASK_STATE_CHANGE
);
stateEvent
.
setProcessInstanceId
(
processInstance
.
getId
());
stateEvent
.
setExecutionStatus
(
taskInstance
.
getState
());
workflowExecuteThreadNotify
.
addStateEvent
(
stateEvent
);
}
/**
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录