Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
53ae7b79
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
53ae7b79
编写于
6月 04, 2022
作者:
D
dapan1121
提交者:
GitHub
6月 04, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13442 from taosdata/feature/qnode
feat: rescheduler timeout task
上级
380c4609
7f61dbc8
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
396 addition
and
135 deletion
+396
-135
include/libs/qworker/qworker.h
include/libs/qworker/qworker.h
+2
-0
include/util/taoserror.h
include/util/taoserror.h
+3
-1
source/client/src/clientMain.c
source/client/src/clientMain.c
+2
-1
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
+1
-0
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+2
-0
source/dnode/mgmt/mgmt_qnode/inc/qmInt.h
source/dnode/mgmt/mgmt_qnode/inc/qmInt.h
+2
-0
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
+2
-0
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+2
-0
source/dnode/mnode/impl/src/mndQuery.c
source/dnode/mnode/impl/src/mndQuery.c
+8
-0
source/dnode/qnode/src/qnode.c
source/dnode/qnode/src/qnode.c
+8
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-0
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+9
-1
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+1
-1
source/libs/qworker/inc/qwMsg.h
source/libs/qworker/inc/qwMsg.h
+1
-0
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+2
-1
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+40
-5
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+30
-10
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+57
-9
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+180
-67
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+42
-39
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/libs/qworker/qworker.h
浏览文件 @
53ae7b79
...
...
@@ -58,6 +58,8 @@ typedef struct {
int32_t
qWorkerInit
(
int8_t
nodeType
,
int32_t
nodeId
,
SQWorkerCfg
*
cfg
,
void
**
qWorkerMgmt
,
const
SMsgCb
*
pMsgCb
);
int32_t
qWorkerPreprocessQueryMsg
(
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
int64_t
ts
);
int32_t
qWorkerProcessCQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
int64_t
ts
);
...
...
include/util/taoserror.h
浏览文件 @
53ae7b79
...
...
@@ -563,7 +563,9 @@ int32_t* taosGetErrno();
//scheduler&qworker
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502)
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503)
#define TSDB_CODE_SCH_IGNORE_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503)
#define TSDB_CODE_SCH_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504)
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2550)
//parser
#define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600)
...
...
source/client/src/clientMain.c
浏览文件 @
53ae7b79
...
...
@@ -66,10 +66,11 @@ void taos_cleanup(void) {
hbMgrCleanUp
();
rpcCleanup
();
catalogDestroy
();
schedulerDestroy
();
rpcCleanup
();
tscInfo
(
"all local resources released"
);
taosCleanupCfg
();
taosCloseLog
();
...
...
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
浏览文件 @
53ae7b79
...
...
@@ -56,6 +56,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t
mmProcessAlterReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessGetMonitorInfoReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessGetLoadsReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mndPreprocessQueryMsg
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
);
// mmWorker.c
int32_t
mmStartWorker
(
SMnodeMgmt
*
pMgmt
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
53ae7b79
...
...
@@ -89,6 +89,8 @@ int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
int32_t
mmPutNodeMsgToQueryQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
mndPreprocessQueryMsg
(
pMgmt
->
pMnode
,
pMsg
);
return
mmPutNodeMsgToWorker
(
&
pMgmt
->
queryWorker
,
pMsg
);
}
...
...
source/dnode/mgmt/mgmt_qnode/inc/qmInt.h
浏览文件 @
53ae7b79
...
...
@@ -51,6 +51,8 @@ int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t
qmPutNodeMsgToFetchQueue
(
SQnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qmPutNodeMsgToMonitorQueue
(
SQnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qndPreprocessQueryMsg
(
SQnode
*
pQnode
,
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
浏览文件 @
53ae7b79
...
...
@@ -57,6 +57,8 @@ static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
}
int32_t
qmPutNodeMsgToQueryQueue
(
SQnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
qndPreprocessQueryMsg
(
pMgmt
->
pQnode
,
pMsg
);
return
qmPutNodeMsgToWorker
(
&
pMgmt
->
queryWorker
,
pMsg
);
}
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
53ae7b79
...
...
@@ -250,6 +250,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
switch
(
qtype
)
{
case
QUERY_QUEUE
:
vnodePreprocessQueryMsg
(
pVnode
->
pImpl
,
pMsg
);
dTrace
(
"vgId:%d, msg:%p put into vnode-query queue"
,
pVnode
->
vgId
,
pMsg
);
taosWriteQitem
(
pVnode
->
pQueryQ
,
pMsg
);
break
;
...
...
source/dnode/mnode/impl/src/mndQuery.c
浏览文件 @
53ae7b79
...
...
@@ -18,6 +18,14 @@
#include "mndMnode.h"
#include "qworker.h"
int32_t
mndPreprocessQueryMsg
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
)
{
if
(
TDMT_VND_QUERY
!=
pMsg
->
msgType
)
{
return
0
;
}
return
qWorkerPreprocessQueryMsg
(
pMnode
->
pQuery
,
pMsg
);
}
int32_t
mndProcessQueryMsg
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
-
1
;
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
...
...
source/dnode/qnode/src/qnode.c
浏览文件 @
53ae7b79
...
...
@@ -63,6 +63,14 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
return
0
;
}
int32_t
qndPreprocessQueryMsg
(
SQnode
*
pQnode
,
SRpcMsg
*
pMsg
)
{
if
(
TDMT_VND_QUERY
!=
pMsg
->
msgType
)
{
return
0
;
}
return
qWorkerPreprocessQueryMsg
(
pQnode
->
pQuery
,
pMsg
);
}
int32_t
qndProcessQueryMsg
(
SQnode
*
pQnode
,
int64_t
ts
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
-
1
;
SReadHandle
handle
=
{.
pMsgCb
=
&
pQnode
->
msgCb
};
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
53ae7b79
...
...
@@ -56,6 +56,7 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg);
int32_t
vnodeProcessWriteReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
);
int32_t
vnodeProcessCMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodePreprocessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
);
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
53ae7b79
...
...
@@ -190,7 +190,15 @@ _err:
return
-
1
;
}
int
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int32_t
vnodePreprocessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
if
(
TDMT_VND_QUERY
!=
pMsg
->
msgType
)
{
return
0
;
}
return
qWorkerPreprocessQueryMsg
(
pVnode
->
pQuery
,
pMsg
);
}
int32_t
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in vnode query queue is processing"
);
SReadHandle
handle
=
{.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
,
.
vnode
=
pVnode
,
.
pMsgCb
=
&
pVnode
->
msgCb
};
switch
(
pMsg
->
msgType
)
{
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
53ae7b79
...
...
@@ -33,7 +33,7 @@ extern "C" {
#define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC
3
000
#define QW_DEFAULT_HEARTBEAT_MSEC
5
000
enum
{
QW_PHASE_PRE_QUERY
=
1
,
...
...
source/libs/qworker/inc/qwMsg.h
浏览文件 @
53ae7b79
...
...
@@ -23,6 +23,7 @@ extern "C" {
#include "qwInt.h"
#include "dataSinkMgt.h"
int32_t
qwPrerocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
int8_t
taskType
,
int8_t
explain
);
int32_t
qwProcessCQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessReady
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
...
...
source/libs/qworker/src/qwDbg.c
浏览文件 @
53ae7b79
...
...
@@ -36,7 +36,8 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
break
;
case
JOB_TASK_STATUS_NOT_START
:
if
(
newStatus
!=
JOB_TASK_STATUS_CANCELLED
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_DROPPING
&&
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
source/libs/qworker/src/qwMsg.c
浏览文件 @
53ae7b79
...
...
@@ -248,8 +248,8 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerPr
ocessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
int64_t
ts
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
int32_t
qWorkerPr
eprocessQueryMsg
(
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
...
...
@@ -257,9 +257,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateTimeInQueue
(
mgmt
,
ts
,
QUERY_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
queryProcessed
,
1
);
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
...
@@ -277,6 +274,44 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
SQWMsg
qwMsg
=
{.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
};
QW_SCH_TASK_DLOG
(
"prerocessQuery start, handle:%p"
,
pMsg
->
info
.
handle
);
QW_ERR_RET
(
qwPrerocessQuery
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"prerocessQuery end, handle:%p"
,
pMsg
->
info
.
handle
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
int64_t
ts
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateTimeInQueue
(
mgmt
,
ts
,
QUERY_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
queryProcessed
,
1
);
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
msg
->
sId
;
msg
->
queryId
=
msg
->
queryId
;
msg
->
taskId
=
msg
->
taskId
;
msg
->
refId
=
msg
->
refId
;
msg
->
phyLen
=
msg
->
phyLen
;
msg
->
sqlLen
=
msg
->
sqlLen
;
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
};
char
*
sql
=
strndup
(
msg
->
msg
,
msg
->
sqlLen
);
QW_SCH_TASK_DLOG
(
"processQuery start, node:%p, handle:%p, sql:%s"
,
node
,
pMsg
->
info
.
handle
,
sql
);
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
53ae7b79
...
...
@@ -248,11 +248,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_TASK_DLOG
(
"start to handle event at phase %s"
,
qwPhaseStr
(
phase
));
if
(
QW_PHASE_PRE_QUERY
==
phase
)
{
QW_ERR_JRET
(
qwAddAcquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
}
else
{
QW_ERR_JRET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
}
QW_ERR_JRET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
...
@@ -285,7 +281,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
break
;
}
QW_ERR_JRET
(
qw
Add
TaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXECUTING
));
QW_ERR_JRET
(
qw
Update
TaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXECUTING
));
break
;
}
case
QW_PHASE_PRE_FETCH
:
{
...
...
@@ -437,7 +433,7 @@ _return:
QW_RET
(
code
);
}
int32_t
qwPr
ocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
int8_t
taskType
,
int8_t
explain
)
{
int32_t
qwPr
erocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
int32_t
code
=
0
;
bool
queryRsped
=
false
;
SSubplan
*
plan
=
NULL
;
...
...
@@ -448,6 +444,32 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET
(
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS
(),
&
qwMsg
->
connInfo
));
QW_ERR_JRET
(
qwAddAcquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
ctx
->
ctrlConnInfo
=
qwMsg
->
connInfo
;
QW_ERR_JRET
(
qwAddTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_NOT_START
));
_return:
if
(
ctx
)
{
QW_UPDATE_RSP_CODE
(
ctx
,
code
);
qwReleaseTaskCtx
(
mgmt
,
ctx
);
}
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
int8_t
taskType
,
int8_t
explain
)
{
int32_t
code
=
0
;
bool
queryRsped
=
false
;
SSubplan
*
plan
=
NULL
;
SQWPhaseInput
input
=
{
0
};
qTaskInfo_t
pTaskInfo
=
NULL
;
DataSinkHandle
sinkHandle
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_QUERY
,
&
input
,
NULL
));
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
...
...
@@ -455,8 +477,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
atomic_store_8
(
&
ctx
->
taskType
,
taskType
);
atomic_store_8
(
&
ctx
->
explain
,
explain
);
ctx
->
ctrlConnInfo
=
qwMsg
->
connInfo
;
/*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/
code
=
qStringToSubplan
(
qwMsg
->
msg
,
&
plan
);
...
...
@@ -663,7 +683,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
// TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED
QW_ERR_JRET
(
qwA
ddA
cquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_ERR_JRET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
53ae7b79
...
...
@@ -32,6 +32,10 @@ extern "C" {
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_TASK_MAX_EXEC_TIMES 5
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum
{
...
...
@@ -51,6 +55,7 @@ typedef struct SSchTrans {
typedef
struct
SSchHbTrans
{
SRWLatch
lock
;
int64_t
taskNum
;
SRpcCtx
rpcCtx
;
SSchTrans
trans
;
}
SSchHbTrans
;
...
...
@@ -114,7 +119,8 @@ typedef struct SSchTaskCallbackParam {
uint64_t
queryId
;
int64_t
refId
;
uint64_t
taskId
;
void
*
transport
;
int32_t
execIdx
;
void
*
pTrans
;
}
SSchTaskCallbackParam
;
typedef
struct
SSchHbCallbackParam
{
...
...
@@ -148,25 +154,36 @@ typedef struct SSchLevel {
SArray
*
subTasks
;
// Element is SQueryTask
}
SSchLevel
;
typedef
struct
SSchTaskProfile
{
int64_t
startTs
;
int64_t
execUseTime
[
SCH_TASK_MAX_EXEC_TIMES
];
int64_t
waitTime
;
int64_t
endTs
;
}
SSchTaskProfile
;
typedef
struct
SSchTask
{
uint64_t
taskId
;
// task id
SRWLatch
lock
;
// task lock
int32_t
maxExecTimes
;
// task may exec times
int32_t
execIdx
;
// task current execute try index
SSchLevel
*
level
;
// level
SRWLatch
planLock
;
// task update plan lock
SSubplan
*
plan
;
// subplan
char
*
msg
;
// operator tree
int32_t
msgLen
;
// msg length
int8_t
status
;
// task status
int32_t
lastMsgType
;
// last sent msg type
int
32_t
tryTimes
;
// task already tried times
int
64_t
timeoutUsec
;
// taks timeout useconds before reschedule
SQueryNodeAddr
succeedAddr
;
// task executed success node address
int8_t
candidateIdx
;
// current try condidation index
SArray
*
candidateAddrs
;
// condidate node addresses, element is SQueryNodeAddr
S
Array
*
execNodes
;
// all tried node for current task, element is SSchNodeInfo
S
QueryProfileSummary
summary
;
// task execution summary
S
HashObj
*
execNodes
;
// all tried node for current task, element is SSchNodeInfo
S
SchTaskProfile
profile
;
// task execution profile
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
void
*
handle
;
// task send handle
void
*
handle
;
// task send handle
bool
registerdHb
;
// registered in hb
}
SSchTask
;
typedef
struct
SSchJobAttr
{
...
...
@@ -215,8 +232,39 @@ typedef struct SSchJob {
extern
SSchedulerMgmt
schMgmt
;
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us; \
if (0 == (_task)->execIdx) { \
(_task)->profile.startTs = us; \
} \
} while (0)
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx]; \
} while (0)
#define SCH_LOG_TASK_END_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx]; \
(_task)->profile.endTs = us; \
} while (0)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock)
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
...
...
@@ -284,7 +332,7 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t
schLaunchTaskImpl
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
);
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
);
int32_t
schBuildAndSendHbMsg
(
SQueryNodeEpId
*
nodeEpId
);
int32_t
schBuildAndSendHbMsg
(
SQueryNodeEpId
*
nodeEpId
,
SArray
*
taskAction
);
int32_t
schCloneSMsgSendInfo
(
void
*
src
,
void
**
dst
);
int32_t
schValidateAndBuildJob
(
SQueryPlan
*
pDag
,
SSchJob
*
pJob
);
void
schFreeJobImpl
(
void
*
job
);
...
...
@@ -301,10 +349,9 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t
schProcessOnExplainDone
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRetrieveTableRsp
*
pRsp
);
void
schProcessOnDataFetched
(
SSchJob
*
job
);
int32_t
schGetTaskInJob
(
SSchJob
*
pJob
,
uint64_t
taskId
,
SSchTask
**
pTask
);
int32_t
schUpdateTaskExecNodeHandle
(
SSchTask
*
pTask
,
void
*
handle
,
int32_t
rspCode
);
void
schFreeRpcCtxVal
(
const
void
*
arg
);
int32_t
schMakeBrokenLinkVal
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcBrokenlinkVal
*
brokenVal
,
bool
isHb
);
int32_t
sch
RecordTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
void
*
handle
);
int32_t
sch
AppendTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
execIdx
);
int32_t
schExecStaticExplainJob
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
SSchResInfo
*
pRes
,
bool
sync
);
int32_t
schExecJobImpl
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
...
...
@@ -318,7 +365,8 @@ int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *p
int32_t
schAsyncExecJob
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
SSchResInfo
*
pRes
);
int32_t
schFetchRows
(
SSchJob
*
pJob
);
int32_t
schAsyncFetchRows
(
SSchJob
*
pJob
);
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
void
*
handle
,
int32_t
rspCode
);
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
dropExecNode
,
void
*
handle
,
int32_t
execIdx
);
int32_t
schProcessOnTaskStatusRsp
(
SQueryNodeEpId
*
pEpId
,
SArray
*
pStatusList
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
53ae7b79
...
...
@@ -28,11 +28,14 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("release jobId:0x%"PR
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
pTask
->
plan
=
pPlan
;
pTask
->
level
=
pLevel
;
pTask
->
execIdx
=
-
1
;
pTask
->
maxExecTimes
=
SCH_TASK_MAX_EXEC_TIMES
;
pTask
->
timeoutUsec
=
SCH_DEFAULT_TASK_TIMEOUT_USEC
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
pTask
->
taskId
=
schGenTaskId
();
pTask
->
execNodes
=
taos
ArrayInit
(
SCH_MAX_CANDIDATE_EP_NUM
,
sizeof
(
SSchNodeInfo
)
);
pTask
->
execNodes
=
taos
HashInit
(
SCH_MAX_CANDIDATE_EP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
if
(
NULL
==
pTask
->
execNodes
)
{
SCH_TASK_ELOG
(
"taos
Array
Init %d execNodes failed"
,
SCH_MAX_CANDIDATE_EP_NUM
);
SCH_TASK_ELOG
(
"taos
Hash
Init %d execNodes failed"
,
SCH_MAX_CANDIDATE_EP_NUM
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -123,7 +126,34 @@ _return:
SCH_RET
(
code
);
}
void
schFreeTask
(
SSchTask
*
pTask
)
{
void
schDeregisterTaskHb
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
!
pTask
->
registerdHb
)
{
return
;
}
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
SQueryNodeEpId
epId
=
{
0
};
epId
.
nodeId
=
addr
->
nodeId
;
SEp
*
pEp
=
SCH_GET_CUR_EP
(
addr
);
strcpy
(
epId
.
ep
.
fqdn
,
pEp
->
fqdn
);
epId
.
ep
.
port
=
pEp
->
port
;
SSchHbTrans
*
hb
=
taosHashGet
(
schMgmt
.
hbConnections
,
&
epId
,
sizeof
(
SQueryNodeEpId
));
if
(
NULL
==
hb
)
{
SCH_TASK_ELOG
(
"nodeId %d fqdn %s port %d not in hb connections"
,
epId
.
nodeId
,
epId
.
ep
.
fqdn
,
epId
.
ep
.
port
);
return
;
}
atomic_sub_fetch_64
(
&
hb
->
taskNum
,
1
);
pTask
->
registerdHb
=
false
;
}
void
schFreeTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
schDeregisterTaskHb
(
pJob
,
pTask
);
if
(
pTask
->
candidateAddrs
)
{
taosArrayDestroy
(
pTask
->
candidateAddrs
);
}
...
...
@@ -139,7 +169,7 @@ void schFreeTask(SSchTask *pTask) {
}
if
(
pTask
->
execNodes
)
{
taos
ArrayDestroy
(
pTask
->
execNodes
);
taos
HashCleanup
(
pTask
->
execNodes
);
}
}
...
...
@@ -329,45 +359,52 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
sch
RecordTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
void
*
handle
)
{
SSchNodeInfo
nodeInfo
=
{.
addr
=
*
addr
,
.
handle
=
handle
};
int32_t
sch
AppendTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
execIdx
)
{
SSchNodeInfo
nodeInfo
=
{.
addr
=
*
addr
,
.
handle
=
NULL
};
if
(
NULL
==
taosArrayPush
(
pTask
->
execNodes
,
&
nodeInfo
))
{
SCH_TASK_ELOG
(
"taos
ArrayPush nodeInfo to execNodes list
failed, errno:%d"
,
errno
);
if
(
taosHashPut
(
pTask
->
execNodes
,
&
execIdx
,
sizeof
(
execIdx
),
&
nodeInfo
,
sizeof
(
nodeInfo
)
))
{
SCH_TASK_ELOG
(
"taos
HashPut nodeInfo to execNodes
failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_DLOG
(
"task execNode
recorded, handle:%p"
,
handle
);
SCH_TASK_DLOG
(
"task execNode
added, execIdx:%d"
,
execIdx
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schDropTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
handle
)
{
int32_t
schDropTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
handle
,
int32_t
execIdx
)
{
if
(
NULL
==
pTask
->
execNodes
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
num
=
taosArrayGetSize
(
pTask
->
execNodes
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SSchNodeInfo
*
pNode
=
taosArrayGet
(
pTask
->
execNodes
,
i
);
if
(
pNode
->
handle
==
handle
)
{
taosArrayRemove
(
pTask
->
execNodes
,
i
);
break
;
}
taosHashRemove
(
pTask
->
execNodes
,
&
execIdx
,
sizeof
(
execIdx
));
if
(
execIdx
!=
pTask
->
execIdx
)
{
// ignore it
SCH_RET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
void
*
handle
,
int32_t
rspCode
)
{
SCH_SET_TASK_HANDLE
(
pTask
,
handle
);
int32_t
schUpdateTaskExecNode
(
SSchTask
*
pTask
,
void
*
handle
,
int32_t
execIdx
)
{
if
(
taosHashGetSize
(
pTask
->
execNodes
)
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
SSchNodeInfo
*
nodeInfo
=
taosHashGet
(
pTask
->
execNodes
,
&
execIdx
,
sizeof
(
execIdx
));
nodeInfo
->
handle
=
handle
;
schUpdateTaskExecNodeHandle
(
pTask
,
handle
,
rspCode
);
return
TSDB_CODE_SUCCESS
;
}
if
(
msgType
==
TDMT_SCH_LINK_BROKEN
)
{
schDropTaskExecNode
(
pJob
,
pTask
,
handle
);
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
dropExecNode
,
void
*
handle
,
int32_t
execIdx
)
{
if
(
dropExecNode
)
{
SCH_RET
(
schDropTaskExecNode
(
pJob
,
pTask
,
handle
,
execIdx
));
}
SCH_SET_TASK_HANDLE
(
pTask
,
handle
);
schUpdateTaskExecNode
(
pTask
,
handle
,
execIdx
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -672,7 +709,6 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
int32_t
schTaskCheckSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
bool
*
needRetry
)
{
int8_t
status
=
0
;
++
pTask
->
tryTimes
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
*
needRetry
=
false
;
...
...
@@ -680,9 +716,19 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return
TSDB_CODE_SUCCESS
;
}
if
(
pTask
->
tryTimes
>=
REQUEST_MAX_TRY_TIMES
)
{
if
(
TSDB_CODE_SCH_TIMEOUT_ERROR
==
errCode
)
{
pTask
->
maxExecTimes
++
;
if
(
pTask
->
timeoutUsec
<
SCH_MAX_TASK_TIMEOUT_USEC
)
{
pTask
->
timeoutUsec
*=
2
;
if
(
pTask
->
timeoutUsec
>
SCH_MAX_TASK_TIMEOUT_USEC
)
{
pTask
->
timeoutUsec
=
SCH_MAX_TASK_TIMEOUT_USEC
;
}
}
}
if
((
pTask
->
execIdx
+
1
)
>=
pTask
->
maxExecTimes
)
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry since reach max try times,
tryTimes:%d"
,
pTask
->
tryTimes
);
SCH_TASK_DLOG
(
"task no more retry since reach max try times,
execIdx:%d"
,
pTask
->
execIdx
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -694,16 +740,16 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
// TODO CHECK epList/condidateList
if
(
SCH_IS_DATA_SRC_TASK
(
pTask
))
{
if
(
pTask
->
tryTimes
>=
SCH_TASK_NUM_OF_EPS
(
&
pTask
->
plan
->
execNode
))
{
if
(
(
pTask
->
execIdx
+
1
)
>=
SCH_TASK_NUM_OF_EPS
(
&
pTask
->
plan
->
execNode
))
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry since all ep tried,
tryTimes:%d, epNum:%d"
,
pTask
->
tryTimes
,
SCH_TASK_DLOG
(
"task no more retry since all ep tried,
execIdx:%d, epNum:%d"
,
pTask
->
execIdx
,
SCH_TASK_NUM_OF_EPS
(
&
pTask
->
plan
->
execNode
));
return
TSDB_CODE_SUCCESS
;
}
}
else
{
int32_t
candidateNum
=
taosArrayGetSize
(
pTask
->
candidateAddrs
);
if
((
pTask
->
candidateIdx
+
1
)
>=
candidateNum
)
{
if
((
pTask
->
candidateIdx
+
1
)
>=
candidateNum
&&
(
TSDB_CODE_SCH_TIMEOUT_ERROR
!=
errCode
)
)
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d"
,
pTask
->
candidateIdx
,
candidateNum
);
...
...
@@ -712,7 +758,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
}
*
needRetry
=
true
;
SCH_TASK_DLOG
(
"task need the %dth retry, errCode:%x - %s"
,
pTask
->
tryTimes
,
errCode
,
tstrerror
(
errCode
));
SCH_TASK_DLOG
(
"task need the %dth retry, errCode:%x - %s"
,
pTask
->
execIdx
+
1
,
errCode
,
tstrerror
(
errCode
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -728,10 +774,15 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
schLaunchTasksInFlowCtrlList
(
pJob
,
pTask
));
}
schDeregisterTaskHb
(
pJob
,
pTask
);
if
(
SCH_IS_DATA_SRC_TASK
(
pTask
))
{
SCH_SWITCH_EPSET
(
&
pTask
->
plan
->
execNode
);
}
else
{
++
pTask
->
candidateIdx
;
int32_t
candidateNum
=
taosArrayGetSize
(
pTask
->
candidateAddrs
);
if
(
++
pTask
->
candidateIdx
>=
candidateNum
)
{
pTask
->
candidateIdx
=
0
;
}
}
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
pTask
));
...
...
@@ -906,6 +957,12 @@ void schProcessOnDataFetched(SSchJob *job) {
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
)
{
int8_t
status
=
0
;
if
(
errCode
==
TSDB_CODE_SCH_TIMEOUT_ERROR
)
{
SCH_LOG_TASK_WAIT_TS
(
pTask
);
}
else
{
SCH_LOG_TASK_END_TS
(
pTask
);
}
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_DLOG
(
"task failed not processed cause of job status, job status:%s"
,
jobTaskStatusStr
(
status
));
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
...
...
@@ -989,6 +1046,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_DLOG
(
"taskOnSuccess, status:%s"
,
SCH_GET_TASK_STATUS_STR
(
pTask
));
SCH_LOG_TASK_END_TS
(
pTask
);
SCH_ERR_JRET
(
schMoveTaskToSuccList
(
pJob
,
pTask
,
&
moved
));
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_PARTIAL_SUCCEED
);
...
...
@@ -1105,6 +1164,92 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
return
TSDB_CODE_SUCCESS
;
}
void
schDropTaskOnExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
NULL
==
pTask
->
execNodes
)
{
SCH_TASK_DLOG
(
"no exec address, status:%s"
,
SCH_GET_TASK_STATUS_STR
(
pTask
));
return
;
}
int32_t
size
=
(
int32_t
)
taosHashGetSize
(
pTask
->
execNodes
);
if
(
size
<=
0
)
{
SCH_TASK_DLOG
(
"task has no execNodes, no need to drop it, status:%s"
,
SCH_GET_TASK_STATUS_STR
(
pTask
));
return
;
}
SSchNodeInfo
*
nodeInfo
=
taosHashIterate
(
pTask
->
execNodes
,
NULL
);
while
(
nodeInfo
)
{
SCH_SET_TASK_HANDLE
(
pTask
,
nodeInfo
->
handle
);
schBuildAndSendMsg
(
pJob
,
pTask
,
&
nodeInfo
->
addr
,
TDMT_VND_DROP_TASK
);
nodeInfo
=
taosHashIterate
(
pTask
->
execNodes
,
nodeInfo
);
}
SCH_TASK_DLOG
(
"task has %d exec address"
,
size
);
}
int32_t
schRescheduleTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
SCH_IS_DATA_SRC_QRY_TASK
(
pTask
))
{
return
TSDB_CODE_SUCCESS
;
}
SCH_LOCK_TASK
(
pTask
);
if
(
JOB_TASK_STATUS_EXECUTING
==
pTask
->
status
&&
pJob
->
fetchTask
!=
pTask
)
{
schDropTaskOnExecNode
(
pJob
,
pTask
);
taosHashClear
(
pTask
->
execNodes
);
schProcessOnTaskFailure
(
pJob
,
pTask
,
TSDB_CODE_SCH_TIMEOUT_ERROR
);
}
SCH_UNLOCK_TASK
(
pTask
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schProcessOnTaskStatusRsp
(
SQueryNodeEpId
*
pEpId
,
SArray
*
pStatusList
)
{
int32_t
taskNum
=
(
int32_t
)
taosArrayGetSize
(
pStatusList
);
SSchTask
*
pTask
=
NULL
;
qDebug
(
"%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d"
,
taskNum
,
pEpId
->
nodeId
,
pEpId
->
ep
.
fqdn
,
pEpId
->
ep
.
port
);
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
STaskStatus
*
taskStatus
=
taosArrayGet
(
pStatusList
,
i
);
SSchJob
*
pJob
=
schAcquireJob
(
taskStatus
->
refId
);
if
(
NULL
==
pJob
)
{
qWarn
(
"job not found, refId:0x%"
PRIx64
",QID:0x%"
PRIx64
",TID:0x%"
PRIx64
,
taskStatus
->
refId
,
taskStatus
->
queryId
,
taskStatus
->
taskId
);
// TODO DROP TASK FROM SERVER!!!!
continue
;
}
SCH_JOB_DLOG
(
"TID:0x%"
PRIx64
" task status in server: %s"
,
taskStatus
->
taskId
,
jobTaskStatusStr
(
taskStatus
->
status
));
pTask
=
NULL
;
schGetTaskInJob
(
pJob
,
taskStatus
->
taskId
,
&
pTask
);
if
(
NULL
==
pTask
)
{
// TODO DROP TASK FROM SERVER!!!!
schReleaseJob
(
taskStatus
->
refId
);
continue
;
}
if
(
taskStatus
->
status
==
JOB_TASK_STATUS_FAILED
)
{
// RECORD AND HANDLE ERROR!!!!
schReleaseJob
(
taskStatus
->
refId
);
continue
;
}
if
(
taskStatus
->
status
==
JOB_TASK_STATUS_NOT_START
&&
SCH_TASK_TIMEOUT
(
pTask
))
{
schRescheduleTask
(
pJob
,
pTask
);
}
schReleaseJob
(
taskStatus
->
refId
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schSaveJobQueryRes
(
SSchJob
*
pJob
,
SQueryTableRsp
*
rsp
)
{
if
(
rsp
->
tbFName
[
0
])
{
if
(
NULL
==
pJob
->
execRes
.
res
)
{
...
...
@@ -1156,25 +1301,16 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schUpdateTaskExecNodeHandle
(
SSchTask
*
pTask
,
void
*
handle
,
int32_t
rspCode
)
{
if
(
rspCode
||
NULL
==
pTask
->
execNodes
||
taosArrayGetSize
(
pTask
->
execNodes
)
>
1
||
taosArrayGetSize
(
pTask
->
execNodes
)
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
SSchNodeInfo
*
nodeInfo
=
taosArrayGet
(
pTask
->
execNodes
,
0
);
nodeInfo
->
handle
=
handle
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schLaunchTaskImpl
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int8_t
status
=
0
;
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pTask
->
level
->
taskLaunchedNum
,
1
);
pTask
->
execIdx
++
;
SCH_LOG_TASK_START_TS
(
pTask
);
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_DLOG
(
"no need to launch task cause of job status, job status:%s"
,
jobTaskStatusStr
(
status
));
...
...
@@ -1257,29 +1393,6 @@ int32_t schLaunchJob(SSchJob *pJob) {
return
TSDB_CODE_SUCCESS
;
}
void
schDropTaskOnExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
NULL
==
pTask
->
execNodes
)
{
SCH_TASK_DLOG
(
"no exec address, status:%s"
,
SCH_GET_TASK_STATUS_STR
(
pTask
));
return
;
}
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pTask
->
execNodes
);
if
(
size
<=
0
)
{
SCH_TASK_DLOG
(
"task has no execNodes, no need to drop it, status:%s"
,
SCH_GET_TASK_STATUS_STR
(
pTask
));
return
;
}
SSchNodeInfo
*
nodeInfo
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
nodeInfo
=
(
SSchNodeInfo
*
)
taosArrayGet
(
pTask
->
execNodes
,
i
);
SCH_SET_TASK_HANDLE
(
pTask
,
nodeInfo
->
handle
);
schBuildAndSendMsg
(
pJob
,
pTask
,
&
nodeInfo
->
addr
,
TDMT_VND_DROP_TASK
);
}
SCH_TASK_DLOG
(
"task has %d exec address"
,
size
);
}
void
schDropTaskInHashList
(
SSchJob
*
pJob
,
SHashObj
*
list
)
{
if
(
!
SCH_IS_NEED_DROP_JOB
(
pJob
))
{
...
...
@@ -1332,7 +1445,7 @@ void schFreeJobImpl(void *job) {
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
->
subTasks
);
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
schFreeTask
(
pTask
);
schFreeTask
(
p
Job
,
p
Task
);
}
taosArrayDestroy
(
pLevel
->
subTasks
);
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
53ae7b79
...
...
@@ -92,8 +92,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"rsp not processed cause of job status, job status:%s, rspCode:0x%x"
,
jobTaskStatusStr
(
status
),
rspCode
);
SCH_TASK_ELOG
(
"rsp not processed cause of job status, job status:%s, rspCode:0x%x"
,
jobTaskStatusStr
(
status
),
rspCode
);
taosMemoryFreeClear
(
msg
);
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
...
...
@@ -344,7 +343,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
_return:
taosMemoryFreeClear
(
msg
);
taosMemoryFreeClear
(
msg
);
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
...
...
@@ -364,13 +363,25 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET
(
schGetTaskInJob
(
pJob
,
pParam
->
taskId
,
&
pTask
));
SCH_LOCK_TASK
(
pTask
);
SCH_TASK_DLOG
(
"rsp msg received, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
msgType
,
pMsg
->
handle
,
rspCode
));
if
(
pParam
->
execIdx
!=
pTask
->
execIdx
)
{
SCH_TASK_DLOG
(
"execIdx %d mis-match current execIdx %d"
,
pParam
->
execIdx
,
pTask
->
execIdx
);
goto
_return
;
}
bool
dropExecNode
=
(
msgType
==
TDMT_SCH_LINK_BROKEN
||
rspCode
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
);
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
dropExecNode
,
pMsg
->
handle
,
pParam
->
execIdx
));
SCH_ERR_JRET
(
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
));
_return:
if
(
pTask
)
{
SCH_UNLOCK_TASK
(
pTask
);
}
if
(
pJob
)
{
schReleaseJob
(
pParam
->
refId
);
...
...
@@ -426,7 +437,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
SSchTrans
trans
=
{.
pTrans
=
hbParam
->
pTrans
,
.
pHandle
=
NULL
};
SCH_ERR_RET
(
schUpdateHbConnection
(
&
hbParam
->
nodeEpId
,
&
trans
));
SCH_ERR_RET
(
schBuildAndSendHbMsg
(
&
hbParam
->
nodeEpId
));
SCH_ERR_RET
(
schBuildAndSendHbMsg
(
&
hbParam
->
nodeEpId
,
NULL
));
}
else
{
SCH_ERR_RET
(
schHandleCallback
(
param
,
pMsg
,
TDMT_SCH_LINK_BROKEN
,
code
));
}
...
...
@@ -454,7 +465,8 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType,
param
->
queryId
=
pJob
->
queryId
;
param
->
refId
=
pJob
->
refId
;
param
->
taskId
=
SCH_TASK_ID
(
pTask
);
param
->
transport
=
pJob
->
pTrans
;
param
->
pTrans
=
pJob
->
pTrans
;
param
->
execIdx
=
pTask
->
execIdx
;
msgSendInfo
->
param
=
param
;
msgSendInfo
->
fp
=
fp
;
...
...
@@ -661,7 +673,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *
}
int32_t
schBuildAndSendHbMsg
(
SQueryNodeEpId
*
nodeEpId
)
{
int32_t
schBuildAndSendHbMsg
(
SQueryNodeEpId
*
nodeEpId
,
SArray
*
taskAction
)
{
SSchedulerHbReq
req
=
{
0
};
int32_t
code
=
0
;
SRpcCtx
rpcCtx
=
{
0
};
...
...
@@ -717,7 +729,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
__async_send_cb_fn_t
fp
=
NULL
;
SCH_ERR_JRET
(
schGetCallbackFp
(
msgType
,
&
fp
));
param
->
transport
=
trans
.
pTrans
;
param
->
pTrans
=
trans
.
pTrans
;
pMsgSendInfo
->
param
=
param
;
pMsgSendInfo
->
msgInfo
.
pData
=
msg
;
...
...
@@ -763,15 +775,26 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
strcpy
(
epId
.
ep
.
fqdn
,
pEp
->
fqdn
);
epId
.
ep
.
port
=
pEp
->
port
;
SSchHbTrans
*
hb
=
taosHashGet
(
schMgmt
.
hbConnections
,
&
epId
,
sizeof
(
SQueryNodeEpId
));
if
(
NULL
==
hb
)
{
bool
exist
=
false
;
SCH_ERR_RET
(
schRegisterHbConnection
(
pJob
,
pTask
,
&
epId
,
&
exist
));
if
(
!
exist
)
{
SCH_ERR_RET
(
schBuildAndSendHbMsg
(
&
epId
));
SSchHbTrans
*
hb
=
NULL
;
while
(
true
)
{
hb
=
taosHashGet
(
schMgmt
.
hbConnections
,
&
epId
,
sizeof
(
SQueryNodeEpId
));
if
(
NULL
==
hb
)
{
bool
exist
=
false
;
SCH_ERR_RET
(
schRegisterHbConnection
(
pJob
,
pTask
,
&
epId
,
&
exist
));
if
(
!
exist
)
{
SCH_ERR_RET
(
schBuildAndSendHbMsg
(
&
epId
,
NULL
));
}
continue
;
}
break
;
}
atomic_add_fetch_64
(
&
hb
->
taskNum
,
1
);
pTask
->
registerdHb
=
true
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -810,33 +833,12 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
}
SSchTrans
trans
=
{
0
};
trans
.
pTrans
=
pParam
->
transport
;
trans
.
pTrans
=
pParam
->
pTrans
;
trans
.
pHandle
=
pMsg
->
handle
;
SCH_ERR_JRET
(
schUpdateHbConnection
(
&
rsp
.
epId
,
&
trans
));
int32_t
taskNum
=
(
int32_t
)
taosArrayGetSize
(
rsp
.
taskStatus
);
qDebug
(
"%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d"
,
taskNum
,
rsp
.
epId
.
nodeId
,
rsp
.
epId
.
ep
.
fqdn
,
rsp
.
epId
.
ep
.
port
);
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
STaskStatus
*
taskStatus
=
taosArrayGet
(
rsp
.
taskStatus
,
i
);
SSchJob
*
pJob
=
schAcquireJob
(
taskStatus
->
refId
);
if
(
NULL
==
pJob
)
{
qWarn
(
"job not found, refId:0x%"
PRIx64
",QID:0x%"
PRIx64
",TID:0x%"
PRIx64
,
taskStatus
->
refId
,
taskStatus
->
queryId
,
taskStatus
->
taskId
);
// TODO DROP TASK FROM SERVER!!!!
continue
;
}
// TODO
SCH_JOB_DLOG
(
"TID:0x%"
PRIx64
" task status in server: %s"
,
taskStatus
->
taskId
,
jobTaskStatusStr
(
taskStatus
->
status
));
schReleaseJob
(
taskStatus
->
refId
);
}
SCH_ERR_JRET
(
schProcessOnTaskStatusRsp
(
&
rsp
.
epId
,
rsp
.
taskStatus
));
_return:
...
...
@@ -856,7 +858,8 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
param
->
queryId
=
pJob
->
queryId
;
param
->
refId
=
pJob
->
refId
;
param
->
taskId
=
SCH_TASK_ID
(
pTask
);
param
->
transport
=
pJob
->
pTrans
;
param
->
pTrans
=
pJob
->
pTrans
;
param
->
taskId
=
pTask
->
taskId
;
*
pParam
=
param
;
...
...
@@ -1158,7 +1161,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
(
rpcCtx
.
args
?
&
rpcCtx
:
NULL
)));
if
(
msgType
==
TDMT_VND_QUERY
)
{
SCH_ERR_RET
(
sch
RecordTaskExecNode
(
pJob
,
pTask
,
addr
,
trans
.
pHandle
));
SCH_ERR_RET
(
sch
AppendTaskExecNode
(
pJob
,
pTask
,
addr
,
pTask
->
execIdx
));
}
return
TSDB_CODE_SUCCESS
;
...
...
source/util/src/terror.c
浏览文件 @
53ae7b79
...
...
@@ -444,6 +444,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup
//scheduler
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_STATUS_ERROR
,
"scheduler status error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_INTERNAL_ERROR
,
"scheduler internal error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_TIMEOUT_ERROR
,
"Task timeout"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QW_MSG_ERROR
,
"Invalid msg order"
)
// parser
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录