Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1a832502
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1a832502
编写于
7月 01, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: fix redirect crash issue
上级
5adaf57c
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
16 addition
and
19 deletion
+16
-19
include/libs/qcom/query.h
include/libs/qcom/query.h
+0
-7
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+3
-0
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+4
-3
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+1
-1
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+8
-8
未找到文件。
include/libs/qcom/query.h
浏览文件 @
1a832502
...
...
@@ -249,13 +249,6 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_SCH_TIMEOUT_ERROR || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
#define REQUEST_TOTAL_EXEC_TIMES 2
#define qFatal(...) \
...
...
source/client/src/clientImpl.c
浏览文件 @
1a832502
...
...
@@ -786,6 +786,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
void
schedulerExecCb
(
SQueryResult
*
pResult
,
void
*
param
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
pRequest
->
code
=
code
;
pRequest
->
body
.
resInfo
.
execRes
=
pResult
->
res
;
if
(
TDMT_VND_SUBMIT
==
pRequest
->
type
||
TDMT_VND_DELETE
==
pRequest
->
type
||
TDMT_VND_CREATE_TABLE
==
pRequest
->
type
)
{
...
...
@@ -797,6 +798,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
}
}
taosMemoryFree
(
pResult
);
tscDebug
(
"0x%"
PRIx64
" enter scheduler exec cb, code:%d - %s, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
requestId
);
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
1a832502
...
...
@@ -276,8 +276,6 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
...
...
@@ -309,7 +307,10 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (((_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_BROKEN_LINK) && ((_len) > 0))
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH)
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen)))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
1a832502
...
...
@@ -835,7 +835,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return
TSDB_CODE_SUCCESS
;
}
if
(
!
NEED_SCHEDULER_RETRY_ERROR
(
errCode
))
{
if
(
!
SCH_NEED_RETRY
(
pTask
->
lastMsgType
,
errCode
))
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry cause of errCode, errCode:%x - %s"
,
errCode
,
tstrerror
(
errCode
));
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
1a832502
...
...
@@ -23,7 +23,7 @@
int32_t
schValidateReceivedMsgType
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
)
{
int32_t
lastMsgType
=
SCH_GET_TASK_LASTMSG_TYPE
(
pTask
)
;
int32_t
lastMsgType
=
pTask
->
lastMsgType
;
int32_t
taskStatus
=
SCH_GET_TASK_STATUS
(
pTask
);
int32_t
reqMsgType
=
msgType
-
1
;
switch
(
msgType
)
{
...
...
@@ -42,7 +42,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
TMSG_INFO
(
msgType
));
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
//
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return
TSDB_CODE_SUCCESS
;
case
TDMT_SCH_FETCH_RSP
:
if
(
lastMsgType
!=
reqMsgType
&&
-
1
!=
lastMsgType
)
{
...
...
@@ -57,7 +57,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
//
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return
TSDB_CODE_SUCCESS
;
case
TDMT_VND_CREATE_TABLE_RSP
:
case
TDMT_VND_DROP_TABLE_RSP
:
...
...
@@ -82,7 +82,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
//
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -396,7 +396,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCH_ERR_JRET
(
schValidateReceivedMsgType
(
pJob
,
pTask
,
msgType
));
if
(
NEED_SCHEDULER_REDIRECT_ERROR
(
rspCode
)
||
SCH_SUB_TASK_NETWORK_ERR
(
rspCode
,
pMsg
->
len
>
0
))
{
int32_t
reqType
=
IsReq
(
pMsg
)
?
pMsg
->
msgType
:
(
pMsg
->
msgType
-
1
);
if
(
SCH_NEED_REDIRECT
(
reqType
,
rspCode
,
pMsg
->
len
))
{
code
=
schHandleRedirect
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
);
goto
_return
;
}
...
...
@@ -855,6 +856,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
addr
->
nodeId
,
epSet
->
eps
[
epSet
->
inUse
].
fqdn
,
epSet
->
eps
[
epSet
->
inUse
].
port
,
trans
->
pTrans
,
trans
->
pHandle
);
pTask
->
lastMsgType
=
msgType
;
int64_t
transporterId
=
0
;
code
=
asyncSendMsgToServerExt
(
trans
->
pTrans
,
epSet
,
&
transporterId
,
pMsgSendInfo
,
persistHandle
,
ctx
);
...
...
@@ -1098,8 +1100,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
break
;
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
msgType
);
SSchTrans
trans
=
{.
pTrans
=
pJob
->
conn
.
pTrans
,
.
pHandle
=
SCH_GET_TASK_HANDLE
(
pTask
)};
SCH_ERR_JRET
(
schAsyncSendMsg
(
pJob
,
pTask
,
&
trans
,
addr
,
msgType
,
msg
,
msgSize
,
persistHandle
,
(
rpcCtx
.
args
?
&
rpcCtx
:
NULL
)));
...
...
@@ -1112,7 +1112,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
_return:
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
)
;
pTask
->
lastMsgType
=
-
1
;
schFreeRpcCtx
(
&
rpcCtx
);
taosMemoryFreeClear
(
msg
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录