Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e0b56ea5
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e0b56ea5
编写于
7月 16, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: fix query retry issue
上级
dce0f06e
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
132 addition
and
76 deletion
+132
-76
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
+1
-0
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+3
-3
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-0
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+4
-2
source/libs/qworker/inc/qwMsg.h
source/libs/qworker/inc/qwMsg.h
+1
-0
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+39
-19
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+0
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+12
-11
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+9
-8
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+32
-14
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+6
-6
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+24
-12
未找到文件。
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
浏览文件 @
e0b56ea5
...
...
@@ -81,6 +81,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
taosWriteQitem
(
pMgmt
->
queryWorker
.
queue
,
pMsg
);
return
0
;
case
READ_QUEUE
:
case
FETCH_QUEUE
:
dTrace
(
"msg:%p, is created and will put into qnode-fetch queue"
,
pMsg
);
taosWriteQitem
(
pMgmt
->
fetchWorker
.
queue
,
pMsg
);
return
0
;
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
e0b56ea5
...
...
@@ -163,7 +163,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:0x%"
PRIx64
" the %d task type %s initialized"
,
pJob
->
queryId
,
taskIdx
,
ctgTaskTypeStr
(
task
.
type
));
qDebug
(
"QID:0x%"
PRIx64
" the %d
th
task type %s initialized"
,
pJob
->
queryId
,
taskIdx
,
ctgTaskTypeStr
(
task
.
type
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -178,7 +178,7 @@ int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:0x%"
PRIx64
" the %d task type %s initialized"
,
pJob
->
queryId
,
taskIdx
,
ctgTaskTypeStr
(
task
.
type
));
qDebug
(
"QID:0x%"
PRIx64
" the %d
th
task type %s initialized"
,
pJob
->
queryId
,
taskIdx
,
ctgTaskTypeStr
(
task
.
type
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -264,7 +264,7 @@ int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:0x%"
PRIx64
"
[%dth]
task type %s initialized"
,
pJob
->
queryId
,
taskIdx
,
ctgTaskTypeStr
(
task
.
type
));
qDebug
(
"QID:0x%"
PRIx64
"
the %dth
task type %s initialized"
,
pJob
->
queryId
,
taskIdx
,
ctgTaskTypeStr
(
task
.
type
));
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
e0b56ea5
...
...
@@ -1978,6 +1978,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
qDebug
(
"%s fetch rsp received, index:%d, rows:%d"
,
pSourceDataInfo
->
taskId
,
index
,
pRsp
->
numOfRows
);
}
else
{
pSourceDataInfo
->
code
=
code
;
qDebug
(
"%s fetch rsp received, index:%d, error:%d"
,
pSourceDataInfo
->
taskId
,
index
,
tstrerror
(
code
));
}
pSourceDataInfo
->
status
=
EX_SOURCE_DATA_READY
;
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
e0b56ea5
...
...
@@ -77,7 +77,7 @@ typedef struct SQWDebug {
bool
dumpEnable
;
bool
sleepSimulate
;
bool
deadSimulate
;
bool
tmp
;
bool
redirectSimulate
;
}
SQWDebug
;
extern
SQWDebug
gQWDebug
;
...
...
@@ -380,7 +380,9 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
);
int32_t
qwDbgBuildAndSendRedirectRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
,
SEpSet
*
pEpSet
);
int32_t
qwAddTaskCtx
(
QW_FPARAMS_DEF
);
int32_t
qwDbgResponseRedirect
(
SQWMsg
*
qwMsg
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
);
void
qwDbgSimulateRedirect
(
SQWMsg
*
qwMsg
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
);
void
qwDbgSimulateSleep
(
void
);
void
qwDbgSimulateDead
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
);
#ifdef __cplusplus
...
...
source/libs/qworker/inc/qwMsg.h
浏览文件 @
e0b56ea5
...
...
@@ -46,6 +46,7 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t
qwBuildAndSendHbRsp
(
SRpcHandleInfo
*
pConn
,
SSchedulerHbRsp
*
rsp
,
int32_t
code
);
int32_t
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
);
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorker
*
mgmt
,
uint64_t
sId
,
SRpcHandleInfo
*
pConn
);
int32_t
qwBuildAndSendDropMsg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
);
#ifdef __cplusplus
}
...
...
source/libs/qworker/src/qwDbg.c
浏览文件 @
e0b56ea5
...
...
@@ -9,7 +9,7 @@
#include "tmsg.h"
#include "tname.h"
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
true
,
.
tmp
=
false
};
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
false
,
.
redirectSimulate
=
false
,
.
deadSimulate
=
false
,
.
sleepSimulate
=
false
};
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
if
(
!
gQWDebug
.
statusEnable
)
{
...
...
@@ -147,8 +147,17 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwDbgResponseRedirect
(
SQWMsg
*
qwMsg
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
)
{
if
(
gQWDebug
.
tmp
)
{
void
qwDbgSimulateRedirect
(
SQWMsg
*
qwMsg
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
)
{
static
int32_t
ignoreTime
=
0
;
if
(
*
rsped
)
{
return
;
}
if
(
gQWDebug
.
redirectSimulate
)
{
if
(
++
ignoreTime
<=
10
)
{
return
;
}
if
(
TDMT_SCH_QUERY
==
qwMsg
->
msgType
&&
(
0
==
taosRand
()
%
3
))
{
SEpSet
epSet
=
{
0
};
epSet
.
inUse
=
1
;
...
...
@@ -163,44 +172,55 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
ctx
->
phase
=
QW_PHASE_POST_QUERY
;
qwDbgBuildAndSendRedirectRsp
(
qwMsg
->
msgType
+
1
,
&
qwMsg
->
connInfo
,
TSDB_CODE_RPC_REDIRECT
,
&
epSet
);
*
rsped
=
true
;
return
TSDB_CODE_SUCCESS
;
return
;
}
if
(
TDMT_SCH_MERGE_QUERY
==
qwMsg
->
msgType
&&
(
0
==
taosRand
()
%
3
))
{
QW_SET_PHASE
(
ctx
,
QW_PHASE_POST_QUERY
);
qwDbgBuildAndSendRedirectRsp
(
qwMsg
->
msgType
+
1
,
&
qwMsg
->
connInfo
,
TSDB_CODE_RPC_REDIRECT
,
NULL
);
*
rsped
=
true
;
return
TSDB_CODE_SUCCESS
;
return
;
}
if
((
TDMT_SCH_FETCH
==
qwMsg
->
msgType
)
&&
(
0
==
taosRand
()
%
3
))
{
if
((
TDMT_SCH_FETCH
==
qwMsg
->
msgType
)
&&
(
0
==
taosRand
()
%
9
))
{
qwDbgBuildAndSendRedirectRsp
(
qwMsg
->
msgType
+
1
,
&
qwMsg
->
connInfo
,
TSDB_CODE_RPC_REDIRECT
,
NULL
);
*
rsped
=
true
;
return
TSDB_CODE_SUCCESS
;
return
;
}
}
*
rsped
=
false
;
return
TSDB_CODE_SUCCESS
;
}
void
qwDbgSimulateSleep
()
{
void
qwDbgSimulateSleep
(
void
)
{
if
(
!
gQWDebug
.
sleepSimulate
)
{
return
;
}
taosSsleep
(
taosRand
()
%
10
);
static
int32_t
ignoreTime
=
0
;
if
(
++
ignoreTime
>
10
)
{
taosSsleep
(
taosRand
()
%
20
);
}
}
void
qwDbgSimulateDead
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
int32_t
msgType
)
{
void
qwDbgSimulateDead
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
)
{
if
(
!
gQWDebug
.
deadSimulate
)
{
return
;
}
SRpcHandleInfo
*
pConn
=
((
msgType
==
TDMT_SCH_FETCH
||
msgType
==
TDMT_SCH_MERGE_FETCH
)
?
&
ctx
->
dataConnInfo
:
&
ctx
->
ctrlConnInfo
);
qwBuildAndSendErrorRsp
(
msgType
+
1
,
pConn
,
TSDB_CODE_RPC_BROKEN_LINK
);
if
(
*
rsped
)
{
return
;
}
static
int32_t
ignoreTime
=
0
;
qwDropTask
(
QW_FPARAMS
());
if
(
++
ignoreTime
>
10
&&
0
==
taosRand
()
%
9
)
{
SRpcHandleInfo
*
pConn
=
((
ctx
->
msgType
==
TDMT_SCH_FETCH
||
ctx
->
msgType
==
TDMT_SCH_MERGE_FETCH
)
?
&
ctx
->
dataConnInfo
:
&
ctx
->
ctrlConnInfo
);
qwBuildAndSendErrorRsp
(
ctx
->
msgType
+
1
,
pConn
,
TSDB_CODE_RPC_BROKEN_LINK
);
qwBuildAndSendDropMsg
(
QW_FPARAMS
(),
pConn
);
*
rsped
=
true
;
return
;
}
}
...
...
@@ -236,9 +256,9 @@ int32_t qwDbgEnableDebug(char *option) {
return
TSDB_CODE_SUCCESS
;
}
if
(
0
==
strcasecmp
(
option
,
"
tmp
"
))
{
gQWDebug
.
tmp
=
true
;
qError
(
"qw
tmp
debug enabled"
);
if
(
0
==
strcasecmp
(
option
,
"
redirect
"
))
{
gQWDebug
.
redirectSimulate
=
true
;
qError
(
"qw
redirect
debug enabled"
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/qworker/src/qwMsg.c
浏览文件 @
e0b56ea5
...
...
@@ -198,7 +198,6 @@ int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
int32_t
code
=
tmsgPutToQueue
(
&
mgmt
->
msgCb
,
FETCH_QUEUE
,
&
pNewMsg
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
QW_SCH_TASK_ELOG
(
"put drop task msg to queue failed, vgId:%d, code:%s"
,
mgmt
->
nodeId
,
tstrerror
(
code
));
rpcFreeCont
(
req
);
QW_ERR_RET
(
code
);
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
e0b56ea5
...
...
@@ -83,6 +83,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
// if *taskHandle is NULL, it's killed right now
if
(
taskHandle
)
{
qwDbgSimulateSleep
();
code
=
qExecTask
(
taskHandle
,
&
pRes
,
&
useconds
);
if
(
code
)
{
if
(
code
!=
TSDB_CODE_OPS_NOT_SUPPORT
)
{
...
...
@@ -431,12 +432,12 @@ _return:
bool
rsped
=
false
;
SQWMsg
qwMsg
=
{.
msgType
=
ctx
->
msgType
,
.
connInfo
=
ctx
->
ctrlConnInfo
};
qwDbgResponseRedirect
(
&
qwMsg
,
ctx
,
&
rsped
);
qwDbgSimulateRedirect
(
&
qwMsg
,
ctx
,
&
rsped
);
qwDbgSimulateDead
(
QW_FPARAMS
(),
ctx
,
&
rsped
);
if
(
!
rsped
)
{
qwBuildAndSendQueryRsp
(
input
->
msgType
+
1
,
&
ctx
->
ctrlConnInfo
,
code
,
ctx
);
}
QW_TASK_DLOG
(
"query msg rsped, handle:%p, code:%x - %s"
,
ctx
->
ctrlConnInfo
.
handle
,
code
,
tstrerror
(
code
));
QW_TASK_DLOG
(
"query msg rsped, handle:%p, code:%x - %s"
,
ctx
->
ctrlConnInfo
.
handle
,
code
,
tstrerror
(
code
));
}
}
if
(
ctx
)
{
...
...
@@ -656,13 +657,12 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
ctx
->
msgType
=
qwMsg
->
msgType
;
ctx
->
dataConnInfo
=
qwMsg
->
connInfo
;
SOutputData
sOutput
=
{
0
};
QW_ERR_JRET
(
qwGetQueryResFromSink
(
QW_FPARAMS
(),
ctx
,
&
dataLen
,
&
rsp
,
&
sOutput
));
if
(
NULL
==
rsp
)
{
ctx
->
dataConnInfo
=
qwMsg
->
connInfo
;
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
);
}
else
{
bool
qComplete
=
(
DS_BUF_EMPTY
==
sOutput
.
bufStatus
&&
sOutput
.
queryEnd
);
...
...
@@ -708,12 +708,15 @@ _return:
if
(
code
||
rsp
)
{
bool
rsped
=
false
;
qwDbgResponseRedirect
(
qwMsg
,
ctx
,
&
rsped
);
if
(
ctx
)
{
qwDbgSimulateRedirect
(
qwMsg
,
ctx
,
&
rsped
);
qwDbgSimulateDead
(
QW_FPARAMS
(),
ctx
,
&
rsped
);
}
if
(
!
rsped
)
{
qwBuildAndSendFetchRsp
(
qwMsg
->
msgType
+
1
,
&
qwMsg
->
connInfo
,
rsp
,
dataLen
,
code
);
QW_TASK_DLOG
(
"%s send, handle:%p, code:%x - %s, dataLen:%d"
,
TMSG_INFO
(
qwMsg
->
msgType
+
1
),
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
),
dataLen
);
}
QW_TASK_DLOG
(
"%s send, handle:%p, code:%x - %s, dataLen:%d"
,
TMSG_INFO
(
qwMsg
->
msgType
+
1
),
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
),
dataLen
);
}
QW_RET
(
TSDB_CODE_SUCCESS
);
...
...
@@ -745,8 +748,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if
(
!
dropped
)
{
ctx
->
ctrlConnInfo
=
qwMsg
->
connInfo
;
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
);
}
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
e0b56ea5
...
...
@@ -55,13 +55,11 @@ typedef enum {
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_MAX_CANDIDATE_EP_NUM (TSDB_MAX_REPLICA + 100)
#define SCH_DEFAULT_MAX_RETRY_NUM 6
typedef
struct
SSchDebug
{
bool
lockEnable
;
...
...
@@ -275,7 +273,8 @@ typedef struct SSchJob {
int32_t
errCode
;
SRWLatch
resLock
;
SExecResult
execRes
;
void
*
resData
;
//TODO free it or not
void
*
fetchRes
;
//TODO free it or not
bool
fetched
;
int32_t
resNumOfRows
;
SSchResInfo
userRes
;
const
char
*
sql
;
...
...
@@ -327,7 +326,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))))
#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_
LINK_BROKEN || (_msgType) == TDMT_SCH_
QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) (SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
...
...
@@ -369,6 +368,8 @@ extern SSchedulerMgmt schMgmt;
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_TLOG(param, ...) \
qTrace("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
...
...
@@ -442,7 +443,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx);
int32_t
schGetCallbackFp
(
int32_t
msgType
,
__async_send_cb_fn_t
*
fp
);
bool
schJobNeedToStop
(
SSchJob
*
pJob
,
int8_t
*
pStatus
);
int32_t
schProcessOnTaskSuccess
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schSaveJob
Query
Res
(
SSchJob
*
pJob
,
SQueryTableRsp
*
rsp
);
int32_t
schSaveJob
Exec
Res
(
SSchJob
*
pJob
,
SQueryTableRsp
*
rsp
);
int32_t
schProcessOnExplainDone
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRetrieveTableRsp
*
pRsp
);
void
schProcessOnDataFetched
(
SSchJob
*
job
);
int32_t
schGetTaskInJob
(
SSchJob
*
pJob
,
uint64_t
taskId
,
SSchTask
**
pTask
);
...
...
@@ -493,7 +494,7 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void
schDirectPostJobRes
(
SSchedulerReq
*
pReq
,
int32_t
errCode
);
int32_t
schHandleJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
);
int32_t
schHandleJobDrop
(
SSchJob
*
pJob
,
int32_t
errCode
);
bool
schChkCurrentOp
(
SSchJob
*
pJob
,
int32_t
op
,
bool
sync
);
bool
schChkCurrentOp
(
SSchJob
*
pJob
,
int32_t
op
,
int8_t
sync
);
extern
SSchDebug
gSCHDebug
;
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
e0b56ea5
...
...
@@ -110,7 +110,7 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_PART_SUCC
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_DROP
)
{
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -389,13 +389,18 @@ int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
int32_t
schDumpJobFetchRes
(
SSchJob
*
pJob
,
void
**
pData
)
{
int32_t
code
=
0
;
if
(
pJob
->
resData
&&
((
SRetrieveTableRsp
*
)
pJob
->
resData
)
->
completed
)
{
SCH_ERR_RET
(
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCC
,
NULL
));
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
pJob
->
fetched
=
true
;
if
(
pJob
->
fetchRes
&&
((
SRetrieveTableRsp
*
)
pJob
->
fetchRes
)
->
completed
)
{
SCH_ERR_JRET
(
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCC
,
NULL
));
}
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
resData
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
resData
,
*
pData
,
NULL
))
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
fetchRes
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
fetchRes
,
*
pData
,
NULL
))
{
continue
;
}
...
...
@@ -414,7 +419,11 @@ int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) {
SCH_JOB_DLOG
(
"fetch done, totalRows:%d"
,
pJob
->
resNumOfRows
);
return
TSDB_CODE_SUCCESS
;
_return:
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
return
code
;
}
int32_t
schNotifyUserExecRes
(
SSchJob
*
pJob
)
{
...
...
@@ -512,8 +521,12 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
}
int32_t
schProcessOnJobPartialSuccess
(
SSchJob
*
pJob
)
{
schPostJobRes
(
pJob
,
SCH_OP_EXEC
);
int32_t
schProcessOnJobPartialSuccess
(
SSchJob
*
pJob
)
{
if
(
schChkCurrentOp
(
pJob
,
SCH_OP_FETCH
,
-
1
))
{
SCH_ERR_RET
(
schLaunchFetchTask
(
pJob
));
}
else
{
schPostJobRes
(
pJob
,
0
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -526,7 +539,7 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
SCH_TASK_DLOG
(
"got explain rsp, rows:%d, complete:%d"
,
htonl
(
pRsp
->
numOfRows
),
pRsp
->
completed
);
atomic_store_32
(
&
pJob
->
resNumOfRows
,
htonl
(
pRsp
->
numOfRows
));
atomic_store_ptr
(
&
pJob
->
resData
,
pRsp
);
atomic_store_ptr
(
&
pJob
->
fetchRes
,
pRsp
);
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_SUCC
);
...
...
@@ -561,7 +574,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schSaveJob
Query
Res
(
SSchJob
*
pJob
,
SQueryTableRsp
*
rsp
)
{
int32_t
schSaveJob
Exec
Res
(
SSchJob
*
pJob
,
SQueryTableRsp
*
rsp
)
{
if
(
rsp
->
tbFName
[
0
])
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
...
...
@@ -600,7 +613,7 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
int32_t
schLaunchJob
(
SSchJob
*
pJob
)
{
if
(
EXPLAIN_MODE_STATIC
==
pJob
->
attr
.
explainMode
)
{
SCH_ERR_RET
(
qExecStaticExplain
(
pJob
->
pDag
,
(
SRetrieveTableRsp
**
)
&
pJob
->
resData
));
SCH_ERR_RET
(
qExecStaticExplain
(
pJob
->
pDag
,
(
SRetrieveTableRsp
**
)
&
pJob
->
fetchRes
));
SCH_ERR_RET
(
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_PART_SUCC
,
NULL
));
}
else
{
SSchLevel
*
level
=
taosArrayGet
(
pJob
->
levels
,
pJob
->
levelIdx
);
...
...
@@ -661,7 +674,7 @@ void schFreeJobImpl(void *job) {
qDestroyQueryPlan
(
pJob
->
pDag
);
taosMemoryFreeClear
(
pJob
->
userRes
.
execRes
);
taosMemoryFreeClear
(
pJob
->
resData
);
taosMemoryFreeClear
(
pJob
->
fetchRes
);
taosMemoryFree
(
pJob
);
int32_t
jobNum
=
atomic_sub_fetch_32
(
&
schMgmt
.
jobNum
,
1
);
...
...
@@ -795,9 +808,14 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) {
}
}
bool
schChkCurrentOp
(
SSchJob
*
pJob
,
int32_t
op
,
bool
sync
)
{
bool
schChkCurrentOp
(
SSchJob
*
pJob
,
int32_t
op
,
int8_t
sync
)
{
bool
r
=
false
;
SCH_LOCK
(
SCH_READ
,
&
pJob
->
opStatus
.
lock
);
bool
r
=
(
pJob
->
opStatus
.
op
==
op
)
&&
(
pJob
->
opStatus
.
syncReq
==
sync
);
if
(
sync
>=
0
)
{
r
=
(
pJob
->
opStatus
.
op
==
op
)
&&
(
pJob
->
opStatus
.
syncReq
==
sync
);
}
else
{
r
=
(
pJob
->
opStatus
.
op
==
op
);
}
SCH_UNLOCK
(
SCH_READ
,
&
pJob
->
opStatus
.
lock
);
return
r
;
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
e0b56ea5
...
...
@@ -256,7 +256,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET
(
rsp
->
code
);
SCH_ERR_JRET
(
schSaveJob
Query
Res
(
pJob
,
rsp
));
SCH_ERR_JRET
(
schSaveJob
Exec
Res
(
pJob
,
rsp
));
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
rsp
->
affectedRows
);
...
...
@@ -277,8 +277,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
if
(
pJob
->
resData
)
{
SCH_TASK_ELOG
(
"explain result is already generated, res:%p"
,
pJob
->
resData
);
if
(
pJob
->
fetchRes
)
{
SCH_TASK_ELOG
(
"explain result is already generated, res:%p"
,
pJob
->
fetchRes
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
...
...
@@ -325,13 +325,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
return
TSDB_CODE_SUCCESS
;
}
if
(
pJob
->
resData
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
resData
);
if
(
pJob
->
fetchRes
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
fetchRes
);
taosMemoryFreeClear
(
rsp
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
atomic_store_ptr
(
&
pJob
->
resData
,
rsp
);
atomic_store_ptr
(
&
pJob
->
fetchRes
,
rsp
);
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
htonl
(
rsp
->
numOfRows
));
if
(
rsp
->
completed
)
{
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
e0b56ea5
...
...
@@ -47,10 +47,10 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
void
schInitTaskRetryTimes
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSchLevel
*
pLevel
)
{
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
)
||
(
!
SCH_IS_QUERY_JOB
(
pJob
))
||
(
SCH_ALL
!=
schMgmt
.
cfg
.
schPolicy
))
{
pTask
->
maxRetryTimes
=
SCH_
MAX_CANDIDATE_EP
_NUM
;
pTask
->
maxRetryTimes
=
SCH_
DEFAULT_MAX_RETRY
_NUM
;
}
else
{
int32_t
nodeNum
=
taosArrayGetSize
(
pJob
->
nodeList
);
pTask
->
maxRetryTimes
=
TMAX
(
nodeNum
,
SCH_
MAX_CANDIDATE_EP
_NUM
);
pTask
->
maxRetryTimes
=
TMAX
(
nodeNum
,
SCH_
DEFAULT_MAX_RETRY
_NUM
);
}
pTask
->
maxExecTimes
=
pTask
->
maxRetryTimes
*
(
pLevel
->
level
+
1
);
...
...
@@ -64,11 +64,11 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
pTask
->
execId
=
-
1
;
pTask
->
timeoutUsec
=
SCH_DEFAULT_TASK_TIMEOUT_USEC
;
pTask
->
taskId
=
schGenTaskId
();
pTask
->
execNodes
=
taosHashInit
(
SCH_MAX_CANDIDATE_EP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
schInitTaskRetryTimes
(
pJob
,
pTask
,
pLevel
);
pTask
->
execNodes
=
taosHashInit
(
pTask
->
maxExecTimes
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
pTask
->
profile
.
execTime
=
taosArrayInit
(
pTask
->
maxExecTimes
,
sizeof
(
int64_t
));
if
(
NULL
==
pTask
->
execNodes
||
NULL
==
pTask
->
profile
.
execTime
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -405,6 +405,18 @@ _return:
int32_t
schHandleRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
if
(
JOB_TASK_STATUS_PART_SUCC
==
pJob
->
status
)
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
if
(
pJob
->
fetched
)
{
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
SCH_TASK_ELOG
(
"already fetched while got error %s"
,
tstrerror
(
rspCode
));
SCH_ERR_RET
(
rspCode
);
}
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_EXEC
);
}
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
))
{
if
(
NULL
==
pData
->
pEpSet
)
{
SCH_TASK_ELOG
(
"no epset updated while got error %s"
,
tstrerror
(
rspCode
));
...
...
@@ -602,7 +614,7 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
if
(
pJob
->
nodeList
)
{
nodeNum
=
taosArrayGetSize
(
pJob
->
nodeList
);
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_CANDIDATE_EP_NUM
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
nodeNum
;
++
i
)
{
SQueryNodeLoad
*
nload
=
taosArrayGet
(
pJob
->
nodeList
,
i
);
SQueryNodeAddr
*
naddr
=
&
nload
->
addr
;
...
...
@@ -611,8 +623,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_
DLOG
(
"set %dth candidate addr, id %d, fqdn:%s, port:%d"
,
i
,
naddr
->
nodeId
,
SCH_GET_CUR_EP
(
naddr
)
->
fqdn
,
SCH_GET_CUR_EP
(
naddr
)
->
port
);
SCH_TASK_
TLOG
(
"set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d"
,
i
,
naddr
->
nodeId
,
naddr
->
epSet
.
inUse
,
naddr
->
epSet
.
numOfEps
,
SCH_GET_CUR_EP
(
naddr
)
->
fqdn
,
SCH_GET_CUR_EP
(
naddr
)
->
port
);
++
addNum
;
}
...
...
@@ -632,9 +644,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
pTask
->
candidateIdx
=
0
;
pTask
->
candidateAddrs
=
taosArrayInit
(
SCH
_MAX_CANDIDATE_EP
_NUM
,
sizeof
(
SQueryNodeAddr
));
pTask
->
candidateAddrs
=
taosArrayInit
(
SCH
EDULE_DEFAULT_MAX_NODE
_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
pTask
->
candidateAddrs
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d condidate addrs failed"
,
SCH
_MAX_CANDIDATE_EP
_NUM
);
SCH_TASK_ELOG
(
"taosArrayInit %d condidate addrs failed"
,
SCH
EDULE_DEFAULT_MAX_NODE
_NUM
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -897,9 +909,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
int32_t
schLaunchFetchTask
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
void
*
resData
=
atomic_load_ptr
(
&
pJob
->
resData
);
if
(
resData
)
{
SCH_JOB_DLOG
(
"res already fetched, res:%p"
,
resData
);
void
*
fetchRes
=
atomic_load_ptr
(
&
pJob
->
fetchRes
);
if
(
fetchRes
)
{
SCH_JOB_DLOG
(
"res already fetched, res:%p"
,
fetchRes
);
return
TSDB_CODE_SUCCESS
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录