diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 92131e354ae41f01a675726a617fb7953b90973f..9881c8cb445a9cea43ff049ad5f8dafe658d2987 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -239,7 +239,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_REDIRECT_ERROR(_code) \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \ - (_code) == TSDB_CODE_APP_NOT_READY) + (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK) #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ @@ -249,8 +249,9 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ (_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY) -#define NEED_SCHEDULER_RETRY_ERROR(_code) \ - (NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) +#define NEED_SCHEDULER_RETRY_ERROR(_code) \ + (NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ + (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR || (_code) == TSDB_CODE_RPC_BROKEN_LINK) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 4e581fd28ee55cbe079716ed6f1f754008c28a74..91912bb764c4f966b46d036e5b32203d08aa40bf 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -250,7 +250,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || - code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { return false; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 4aad544e1910a31267c529620358b3a6ef86fadb..708ea4bd3815cce731e909739085977f5975fcab 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -548,7 +548,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { } static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || - code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { return false; } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 70a6a70c449bc308608a79a42ceb3f15ffce87c8..30772ff1ac5e5f0e48fc70d4c4aa2c7ae548c699 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -205,7 +205,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { .msgType = TDMT_SCH_DROP_TASK, .pCont = req, .contLen = sizeof(STaskDropReq), - .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, + .code = TSDB_CODE_RPC_BROKEN_LINK, .info = *pConn, }; @@ -239,7 +239,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * .msgType = TDMT_SCH_QUERY_HEARTBEAT, .pCont = msg, .contLen = msgSize, - .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, + .code = TSDB_CODE_RPC_BROKEN_LINK, .info = *pConn, }; @@ -484,7 +484,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info}; - if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { + if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) { QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code)); } @@ -522,7 +522,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ uint64_t sId = req.sId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info}; - if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { + if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) { QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code)); } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index fb0c43b0ff40f634170abca8e6ab3e88202dbf5b..8dfc703dd98c7b85d80fcef30751a891c6710eb3 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -306,6 +306,8 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) +#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) +#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (((_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_BROKEN_LINK) && ((_len) > 0)) #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 91f5ff979ccfa0b99f275292c302055f62d3a870..3688cb02400bb2bf591a81db56218516b39af7d0 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -384,7 +384,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { goto _return; } - bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL); + bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx)); int8_t status = 0; @@ -396,7 +396,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); - if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || ((rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL) && pMsg->len > 0)) { + if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || SCH_SUB_TASK_NETWORK_ERR(rspCode, pMsg->len > 0)) { code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode); goto _return; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e21237ec10309f3b926d701fb4b80ad9cc31f58b..5de907f8cb0ac836d03edc1b496527d3a272a779 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -328,7 +328,7 @@ void cliHandleResp(SCliConn* conn) { tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle, TMSG_INFO(transMsg.msgType)); if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { - transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + transMsg.code = TSDB_CODE_RPC_BROKEN_LINK; transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);