Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
80798dd9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
80798dd9
编写于
3月 28, 2023
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: job retry issue
上级
31152651
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
119 addition
and
65 deletion
+119
-65
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+1
-1
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+2
-0
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+21
-9
source/libs/scheduler/src/schFlowCtrl.c
source/libs/scheduler/src/schFlowCtrl.c
+0
-1
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+39
-3
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+27
-15
source/libs/scheduler/src/schStatus.c
source/libs/scheduler/src/schStatus.c
+3
-0
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+24
-35
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+1
-1
未找到文件。
include/libs/qcom/query.h
浏览文件 @
80798dd9
...
...
@@ -33,6 +33,7 @@ typedef enum {
JOB_TASK_STATUS_INIT
,
JOB_TASK_STATUS_EXEC
,
JOB_TASK_STATUS_PART_SUCC
,
JOB_TASK_STATUS_FETCH
,
JOB_TASK_STATUS_SUCC
,
JOB_TASK_STATUS_FAIL
,
JOB_TASK_STATUS_DROP
,
...
...
source/common/src/tdatablock.c
浏览文件 @
80798dd9
...
...
@@ -1180,7 +1180,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
void
blockDataEmpty
(
SSDataBlock
*
pDataBlock
)
{
SDataBlockInfo
*
pInfo
=
&
pDataBlock
->
info
;
if
(
pInfo
->
capacity
==
0
||
pInfo
->
rows
>
pDataBlock
->
info
.
capacity
)
{
if
(
pInfo
->
capacity
==
0
)
{
return
;
}
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
80798dd9
...
...
@@ -194,6 +194,8 @@ char* jobTaskStatusStr(int32_t status) {
return
"EXECUTING"
;
case
JOB_TASK_STATUS_PART_SUCC
:
return
"PARTIAL_SUCCEED"
;
case
JOB_TASK_STATUS_FETCH
:
return
"FETCHING"
;
case
JOB_TASK_STATUS_SUCC
:
return
"SUCCEED"
;
case
JOB_TASK_STATUS_FAIL
:
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
80798dd9
...
...
@@ -193,7 +193,7 @@ typedef struct SSchLevel {
int32_t
taskSucceed
;
int32_t
taskNum
;
int32_t
taskLaunchedNum
;
int32_t
taskDoneNum
;
int32_t
task
Exec
DoneNum
;
SArray
*
subTasks
;
// Element is SSchTask
}
SSchLevel
;
...
...
@@ -340,6 +340,9 @@ extern SSchedulerMgmt schMgmt;
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
#define SCH_TASK_ALREADY_LAUNCHED(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_EXEC)
#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC)
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
...
...
@@ -361,6 +364,7 @@ extern SSchedulerMgmt schMgmt;
(SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_MULTI_LEVEL_LAUNCHED(_job) ((_job)->levelIdx != ((_job)->levelNum - 1))
#define SCH_SET_JOB_TYPE(_job, type) \
do { \
...
...
@@ -377,16 +381,24 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect))
#define SCH_REDIRECT_MSGTYPE(_msgType) \
((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
(_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) \
((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_LOW_LEVEL_NETWORK_ERR(_job, _task, _code) \
(SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx))
#define SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code) \
(SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx))
#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) \
(SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect)
#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code))
#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code))))
#define SCH_TASK_NEED_RETRY(_msgType, _code) \
((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
...
...
@@ -562,7 +574,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t
schExecJob
(
SSchJob
*
pJob
,
SSchedulerReq
*
pReq
);
int32_t
schDumpJobExecRes
(
SSchJob
*
pJob
,
SExecResult
*
pRes
);
int32_t
schUpdateTaskCandidateAddr
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SEpSet
*
pEpSet
);
int32_t
schHandle
Redirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
);
int32_t
schHandle
TaskSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
);
void
schProcessOnOpEnd
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
,
int32_t
errCode
);
int32_t
schProcessOnOpBegin
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
);
void
schProcessOnCbEnd
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
);
...
...
source/libs/scheduler/src/schFlowCtrl.c
浏览文件 @
80798dd9
...
...
@@ -282,7 +282,6 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
}
int32_t
code
=
schLaunchTasksInFlowCtrlListImpl
(
pJob
,
ctrl
);
;
SCH_ERR_RET
(
code
);
return
code
;
// to avoid compiler error
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
80798dd9
...
...
@@ -108,10 +108,18 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_PART_SUCC
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
)
{
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_FETCH
)
{
SCH_ERR_JRET
(
TSDB_CODE_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_FETCH
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
)
{
SCH_ERR_JRET
(
TSDB_CODE_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_SUCC
:
case
JOB_TASK_STATUS_FAIL
:
...
...
@@ -550,9 +558,9 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
}
SSchLevel
*
pLevel
=
pTask
->
level
;
int32_t
doneNum
=
atomic_add_fetch_32
(
&
pLevel
->
taskDoneNum
,
1
);
int32_t
doneNum
=
atomic_add_fetch_32
(
&
pLevel
->
task
Exec
DoneNum
,
1
);
if
(
doneNum
==
pLevel
->
taskNum
)
{
pJob
->
levelIdx
--
;
atomic_sub_fetch_32
(
&
pJob
->
levelIdx
,
1
)
;
pLevel
=
taosArrayGet
(
pJob
->
levels
,
pJob
->
levelIdx
);
for
(
int32_t
i
=
0
;
i
<
pLevel
->
taskNum
;
++
i
)
{
...
...
@@ -562,6 +570,10 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
continue
;
}
if
(
SCH_TASK_ALREADY_LAUNCHED
(
pTask
))
{
continue
;
}
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
pTask
));
}
}
...
...
@@ -811,6 +823,30 @@ void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
}
}
int32_t
schChkResetJobRetry
(
SSchJob
*
pJob
,
int32_t
rspCode
)
{
if
(
pJob
->
status
>=
JOB_TASK_STATUS_PART_SUCC
)
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
if
(
pJob
->
fetched
)
{
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
SCH_TASK_ELOG
(
"already fetched while got error %s"
,
tstrerror
(
rspCode
));
SCH_ERR_JRET
(
rspCode
);
}
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_EXEC
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schHandleJobRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SCH_ERR_JRET
(
schChkResetJobRetry
(
pJob
,
rspCode
));
}
bool
schChkCurrentOp
(
SSchJob
*
pJob
,
int32_t
op
,
int8_t
sync
)
{
bool
r
=
false
;
SCH_LOCK
(
SCH_READ
,
&
pJob
->
opStatus
.
lock
);
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
80798dd9
...
...
@@ -36,7 +36,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_QW_MSG_ERROR
);
}
if
(
taskStatus
!=
JOB_TASK_STATUS_
PART_SUCC
)
{
if
(
taskStatus
!=
JOB_TASK_STATUS_
FETCH
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%s, rspType:%s"
,
jobTaskStatusStr
(
taskStatus
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_QW_MSG_ERROR
);
...
...
@@ -137,25 +137,12 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
return
TSDB_CODE_SUCCESS
;
}
// Note: no more task error processing, handled in function internal
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
execId
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
schProcessResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
execId
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
char
*
msg
=
pMsg
->
pData
;
int32_t
msgSize
=
pMsg
->
len
;
int32_t
msgType
=
pMsg
->
msgType
;
bool
dropExecNode
=
(
msgType
==
TDMT_SCH_LINK_BROKEN
||
SCH_NETWORK_ERR
(
rspCode
));
if
(
SCH_IS_QUERY_JOB
(
pJob
))
{
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
dropExecNode
,
pMsg
->
handle
,
execId
));
}
SCH_ERR_JRET
(
schValidateRspMsgType
(
pJob
,
pTask
,
msgType
));
int32_t
reqType
=
IsReq
(
pMsg
)
?
pMsg
->
msgType
:
(
pMsg
->
msgType
-
1
);
if
(
SCH_TASK_NEED_REDIRECT
(
pTask
,
reqType
,
rspCode
,
pMsg
->
len
))
{
SCH_RET
(
schHandleRedirect
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
));
}
pTask
->
redirectCtx
.
inRedirect
=
false
;
switch
(
msgType
)
{
...
...
@@ -423,6 +410,31 @@ _return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
// Note: no more task error processing, handled in function internal
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
execId
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int32_t
msgType
=
pMsg
->
msgType
;
bool
dropExecNode
=
(
msgType
==
TDMT_SCH_LINK_BROKEN
||
SCH_NETWORK_ERR
(
rspCode
));
if
(
SCH_IS_QUERY_JOB
(
pJob
))
{
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
dropExecNode
,
pMsg
->
handle
,
execId
));
}
SCH_ERR_JRET
(
schValidateRspMsgType
(
pJob
,
pTask
,
msgType
));
int32_t
reqType
=
IsReq
(
pMsg
)
?
pMsg
->
msgType
:
(
pMsg
->
msgType
-
1
);
if
(
SCH_JOB_NEED_RETRY
(
pJob
,
pTask
,
reqType
,
rspCode
))
{
SCH_RET
(
schHandleJobRetry
());
}
else
if
(
SCH_TASKSET_NEED_RETRY
(
pJob
,
pTask
,
reqType
,
rspCode
))
{
SCH_RET
(
schHandleTaskSetRetry
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
));
}
pTask
->
redirectCtx
.
inRedirect
=
false
;
SCH_RET
(
schProcessResponseMsg
(
pJob
,
pTask
,
execId
,
pMsg
,
rspCode
));
}
int32_t
schHandleCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SSchTaskCallbackParam
*
pParam
=
(
SSchTaskCallbackParam
*
)
param
;
...
...
source/libs/scheduler/src/schStatus.c
浏览文件 @
80798dd9
...
...
@@ -34,6 +34,9 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
case
JOB_TASK_STATUS_PART_SUCC
:
SCH_ERR_JRET
(
schProcessOnJobPartialSuccess
(
pJob
));
break
;
case
JOB_TASK_STATUS_FETCH
:
SCH_ERR_JRET
(
schJobFetchRows
(
pJob
));
break
;
case
JOB_TASK_STATUS_SUCC
:
break
;
case
JOB_TASK_STATUS_FAIL
:
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
80798dd9
...
...
@@ -404,21 +404,7 @@ _return:
return
TSDB_CODE_SUCCESS
;
}
int32_t
schDoTaskRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SCH_TASK_DLOG
(
"task will be redirected now, status:%s, code:%s"
,
SCH_GET_TASK_STATUS_STR
(
pTask
),
tstrerror
(
rspCode
));
if
(
NULL
==
pData
)
{
pTask
->
retryTimes
=
0
;
}
if
(
!
NO_RET_REDIRECT_ERROR
(
rspCode
))
{
SCH_UPDATE_REDICT_CODE
(
pJob
,
rspCode
);
}
SCH_ERR_JRET
(
schChkUpdateRedirectCtx
(
pJob
,
pTask
,
pData
?
pData
->
pEpSet
:
NULL
,
rspCode
));
void
schResetTaskForRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
pTask
->
waitRetry
=
true
;
schDropTaskOnExecNode
(
pJob
,
pTask
);
...
...
@@ -426,10 +412,27 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
schRemoveTaskFromExecList
(
pJob
,
pTask
);
schDeregisterTaskHb
(
pJob
,
pTask
);
atomic_sub_fetch_32
(
&
pTask
->
level
->
taskLaunchedNum
,
1
);
if
(
SCH_TASK_EXEC_DONE
(
pTask
))
{
atomic_sub_fetch_32
(
&
pTask
->
level
->
taskExecDoneNum
,
1
);
}
taosMemoryFreeClear
(
pTask
->
msg
);
pTask
->
msgLen
=
0
;
pTask
->
lastMsgType
=
0
;
memset
(
&
pTask
->
succeedAddr
,
0
,
sizeof
(
pTask
->
succeedAddr
));
}
int32_t
schDoTaskRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SCH_TASK_DLOG
(
"task will be redirected now, status:%s, code:%s"
,
SCH_GET_TASK_STATUS_STR
(
pTask
),
tstrerror
(
rspCode
));
if
(
!
NO_RET_REDIRECT_ERROR
(
rspCode
))
{
SCH_UPDATE_REDICT_CODE
(
pJob
,
rspCode
);
}
SCH_ERR_JRET
(
schChkUpdateRedirectCtx
(
pJob
,
pTask
,
pData
?
pData
->
pEpSet
:
NULL
,
rspCode
));
schResetTaskForRetry
(
pJob
,
pTask
);
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
))
{
if
(
pData
&&
pData
->
pEpSet
)
{
...
...
@@ -445,12 +448,6 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
SCH_TASK_DLOG
(
"switch task target node %d epset to %d/%d"
,
addr
->
nodeId
,
addr
->
epSet
.
inUse
,
addr
->
epSet
.
numOfEps
);
}
if
(
SCH_TASK_NEED_FLOW_CTRL
(
pJob
,
pTask
))
{
if
(
JOB_TASK_STATUS_EXEC
==
SCH_GET_TASK_STATUS
(
pTask
))
{
SCH_ERR_JRET
(
schLaunchTasksInFlowCtrlList
(
pJob
,
pTask
));
}
}
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_INIT
);
SCH_ERR_JRET
(
schDelayLaunchTask
(
pJob
,
pTask
));
...
...
@@ -486,20 +483,10 @@ _return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
int32_t
schHandle
Redirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
schHandle
TaskSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
if
(
JOB_TASK_STATUS_PART_SUCC
==
pJob
->
status
)
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
if
(
pJob
->
fetched
)
{
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
SCH_TASK_ELOG
(
"already fetched while got error %s"
,
tstrerror
(
rspCode
));
SCH_ERR_JRET
(
rspCode
);
}
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_EXEC
);
}
SCH_ERR_JRET
(
schChkResetJobRetry
(
pJob
,
rspCode
));
if
(
SYNC_OTHER_LEADER_REDIRECT_ERROR
(
rspCode
))
{
if
(
NULL
==
pData
->
pEpSet
)
{
...
...
@@ -510,6 +497,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
}
code
=
schDoTaskRedirect
(
pJob
,
pTask
,
pData
,
rspCode
);
taosMemoryFreeClear
(
pData
->
pData
);
taosMemoryFreeClear
(
pData
->
pEpSet
);
...
...
@@ -645,7 +633,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return
TSDB_CODE_SUCCESS
;
}
if
(
!
SCH_NEED_RETRY
(
pTask
->
lastMsgType
,
errCode
))
{
if
(
!
SCH_
TASK_
NEED_RETRY
(
pTask
->
lastMsgType
,
errCode
))
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry cause of errCode, errCode:%x - %s"
,
errCode
,
tstrerror
(
errCode
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1067,7 +1055,6 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_ERR_JRET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
// NOTE: race condition: the task should be put into the hash table before send msg to server
if
(
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_EXEC
)
{
SCH_ERR_JRET
(
schPushTaskToExecList
(
pJob
,
pTask
));
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_EXEC
);
...
...
@@ -1272,6 +1259,8 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
return
TSDB_CODE_SUCCESS
;
}
SCH_SET_TASK_STATUS
(
pJob
->
fetchTask
,
JOB_TASK_STATUS_FETCH
);
if
(
SCH_IS_LOCAL_EXEC_TASK
(
pJob
,
pJob
->
fetchTask
))
{
SCH_ERR_JRET
(
schExecLocalFetch
(
pJob
,
pJob
->
fetchTask
));
}
else
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
80798dd9
...
...
@@ -91,7 +91,7 @@ int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq) {
SCH_ERR_JRET
(
schHandleOpBeginEvent
(
jobId
,
&
pJob
,
SCH_OP_FETCH
,
pReq
));
SCH_ERR_JRET
(
sch
JobFetchRows
(
pJob
));
SCH_ERR_JRET
(
sch
SwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_FETCH
,
pReq
));
_return:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录