diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 3f11d2a21832ab5b5bab3c0edc8c0fd5ae085a86..5b640dce92052f1aef246f599c3f686a089c2ec7 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -268,7 +268,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) #define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND) -#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) +#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) #define NEED_REDIRECT_ERROR(_code) \ (NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2d7b15ebda0dffeed5761853fc93e78e37e9666c..3823252de6ab29c60d6db1bcd30774d9e7ce6b39 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -65,6 +65,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) // #define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018) // #define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) // +#define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected" +#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) // //common & util #define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) // diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 08a2477e5fb7cc07bb408822d4acaa431b1e1ee4..f36036fd0ae53875848cd086f3b17e9d1664d5ec 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1430,6 +1430,21 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet)); } + // pMsg is response msg + if (pMsg->msgType == TDMT_MND_CONNECT + 1) { + // restore origin code + if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) { + pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) { + pMsg->code = TSDB_CODE_RPC_BROKEN_LINK; + } + } else { + // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED + if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) { + pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED; + } + } + AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg)); arg->msg = *pMsg; arg->pEpset = tEpSet; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index f11378f84ce216af5c6e8a5f8e73bc4dc46290c0..756f97d1d026cea84288ce6522d152082aa78246 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -248,6 +248,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND || + code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 67d3f7e11983f9550d59c042d1a0b84243884d4b..718fc5c73f27e96fb261effbdc54c73310a3e924 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -957,7 +957,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { for (int32_t i = 0; i < size; ++i) { SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i); if (pInfo->handle != NULL) { - if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) { code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL; } if (code == TSDB_CODE_SYN_TIMEOUT) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 40c75ce6baacbaad258ad10874af16f9466ad2d4..6c88e4d5c8615d262db99d3e0ab5e476269820a4 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -606,6 +606,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { } static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || + code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 48df7e36a30737324342bb06bdbbfced8b73a5cd..e8216fcd7c7d6f50981218ae8c27531390903382 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -375,7 +375,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) -#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) +#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) #define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \ (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect)) #define SCH_REDIRECT_MSGTYPE(_msgType) \ diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d144a76eb035522272bb13b8db630e0042485519..1dc79e0cfb9d606ba0e7739e8cb6f047449365db 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1665,11 +1665,20 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (pCtx->retryCode != TSDB_CODE_SUCCESS) { int32_t code = pResp->code; // return internal code app - if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) { + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) { pResp->code = pCtx->retryCode; } } + // check whole vnodes is offline on this vgroup + if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) { + if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED; + } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) { + pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK; + } + } + STraceId* trace = &pResp->info.traceId; bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); if (hasEpSet) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e59b1daa059813f3434804ca9241a499e1580fd8..2d7121e2ae2b356c99d50b41764f62272ff66af8 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -51,6 +51,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQD TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")