From 1a8325021598527c35e50138e7db54d5115dc5a6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 1 Jul 2022 15:28:45 +0800 Subject: [PATCH] fix: fix redirect crash issue --- include/libs/qcom/query.h | 7 ------- source/client/src/clientImpl.c | 3 +++ source/libs/scheduler/inc/schedulerInt.h | 7 ++++--- source/libs/scheduler/src/schJob.c | 2 +- source/libs/scheduler/src/schRemote.c | 16 ++++++++-------- 5 files changed, 16 insertions(+), 19 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 9881c8cb44..7f7fe76139 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -249,13 +249,6 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ (_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY) -#define NEED_SCHEDULER_RETRY_ERROR(_code) \ - (NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ - (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR || (_code) == TSDB_CODE_RPC_BROKEN_LINK) - - - - #define REQUEST_TOTAL_EXEC_TIMES 2 #define qFatal(...) \ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b6009ebbda..7d42bbdad9 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -786,6 +786,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*)param; pRequest->code = code; + pRequest->body.resInfo.execRes = pResult->res; if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { @@ -797,6 +798,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { } } + taosMemoryFree(pResult); + tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 94a1108710..aaa8274ce8 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -276,8 +276,6 @@ extern SSchedulerMgmt schMgmt; #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1) -#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0) -#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1) #define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) #define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) @@ -309,7 +307,10 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) -#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (((_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_BROKEN_LINK) && ((_len) > 0)) +#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0)) +#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH) +#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen))) +#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 425ea242fd..6e4838e50c 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -835,7 +835,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } - if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) { + if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) { *needRetry = false; SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode)); return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 69e41d3111..f8a627600e 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -23,7 +23,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { - int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask); + int32_t lastMsgType = pTask->lastMsgType; int32_t taskStatus = SCH_GET_TASK_STATUS(pTask); int32_t reqMsgType = msgType - 1; switch (msgType) { @@ -42,7 +42,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy TMSG_INFO(msgType)); } - SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + //SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; case TDMT_SCH_FETCH_RSP: if (lastMsgType != reqMsgType && -1 != lastMsgType) { @@ -57,7 +57,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + //SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_DROP_TABLE_RSP: @@ -82,7 +82,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + //SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; } @@ -396,7 +396,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); - if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || SCH_SUB_TASK_NETWORK_ERR(rspCode, pMsg->len > 0)) { + int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); + if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) { code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode); goto _return; } @@ -855,6 +856,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery addr->nodeId, epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle); + pTask->lastMsgType = msgType; int64_t transporterId = 0; code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); @@ -1098,8 +1100,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, break; } - SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); - SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); @@ -1112,7 +1112,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, _return: - SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + pTask->lastMsgType = -1; schFreeRpcCtx(&rpcCtx); taosMemoryFreeClear(msg); -- GitLab