diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a89808852d7a9707925f1d0c7e06bf977d72350c..e28618d940c53ad0f592ca14cd44e42f01ce0db9 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -282,8 +282,8 @@ int32_t* taosGetErrno(); // dnode #define TSDB_CODE_NODE_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_NODE_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401) -#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403) -#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404) +#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0402) +#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403) // vnode #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index d044ad3b2e5558c4c0660c12c4826068dceccbf1..72878d0d853e2067bdd73d42bb4b001ccfdb5295 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -142,13 +142,14 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg queue->tail = headLen + bodyLen; } else if (remain < 8 + headLen) { memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8); - memcpy(queue->pBuffer, (char*)pHead + remain - 8, rawHeadLen - (remain - 8)); + memcpy(queue->pBuffer, (char *)pHead + remain - 8, rawHeadLen - (remain - 8)); if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); queue->tail = headLen - (remain - 8) + bodyLen; } else if (remain < 8 + headLen + bodyLen) { memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen); - if (rawBodyLen > 0) memcpy(queue->pBuffer, (char*)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); + if (rawBodyLen > 0) + memcpy(queue->pBuffer, (char *)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); queue->tail = bodyLen - (remain - 8 - headLen); } else { memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); @@ -312,12 +313,7 @@ static void *dmConsumChildQueue(void *param) { code = dmProcessNodeMsg(pWrapper, pMsg); if (code != 0) { dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr()); - SRpcMsg rsp = { - .code = (terrno != 0 ? terrno : code), - .pCont = pMsg->info.rsp, - .contLen = pMsg->info.rspLen, - .info = pMsg->info, - }; + SRpcMsg rsp = {.code = (terrno != 0 ? terrno : code), .info = pMsg->info}; dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -469,8 +465,18 @@ void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) { taosMsleep(retry); } } + + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + pMsg->contLen = 0; } int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) { - return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype); + int32_t code = dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype); + if (code == 0) { + dTrace("msg:%p, is freed after push to cqueue", pMsg); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + return code; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 3dc0572927812f3283b2fcc091f8d5d3a4994e26..8dfcb798ac966e25fc430b3bcddf7e69fd6534a2 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -16,6 +16,10 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" +static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet); +static void dmSendRsp(SRpcMsg *pMsg); +static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); + static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) { SRpcConnInfo connInfo = {0}; if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) { @@ -122,27 +126,22 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } _OVER: - if (code == 0) { - if (pWrapper != NULL && InParentProc(pWrapper)) { - dTrace("msg:%p, is freed after push to cqueue", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pRpc->pCont); - } - } else { + if (code != 0) { dError("msg:%p, failed to process since %s", pMsg, terrstr()); if (terrno != 0) code = terrno; if (IsReq(pRpc)) { - if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) { - if (pRpc->msgType > TDMT_MND_MSG && pRpc->msgType < TDMT_VND_MSG) { - code = TSDB_CODE_NODE_REDIRECT; - } + SRpcMsg rsp = {.code = code, .info = pRpc->info}; + + if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG && + pRpc->msgType < TDMT_VND_MSG) { + dmBuildMnodeRedirectRsp(pDnode, &rsp); } - SRpcMsg rspMsg = {.code = code, .info = pRpc->info}; + if (pWrapper != NULL) { - tmsgSendRsp(&rspMsg); + dmSendRsp(&rsp); } else { - rpcSendResponse(&rspMsg); + rpcSendResponse(&rsp); } } @@ -198,13 +197,25 @@ static inline void dmSendRsp(SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; if (InChildProc(pWrapper)) { dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; } else { rpcSendResponse(pMsg); } } +static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { + SMEpSet msg = {0}; + dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &msg.epSet); + + int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg); + pMsg->pCont = rpcMallocCont(contLen); + if (pMsg->pCont == NULL) { + pMsg->code = TSDB_CODE_OUT_OF_MEMORY; + } else { + tSerializeSMEpSet(pMsg->pCont, contLen, &msg); + pMsg->contLen = contLen; + } +} + static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; SMEpSet msg = {.epSet = *pNewEpSet}; @@ -226,8 +237,6 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; if (InChildProc(pWrapper)) { dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST); - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; } else { rpcRegisterBrokenLinkArg(pMsg); } @@ -322,34 +331,3 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) { }; return msgCb; } - -static void dmSendMnodeRedirectRsp(SRpcMsg *pMsg) { - SDnode *pDnode = dmInstance(); - SEpSet epSet = {0}; - dmGetMnodeEpSet(&pDnode->data, &epSet); - - dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse); - for (int32_t i = 0; i < epSet.numOfEps; ++i) { - dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); - if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { - epSet.inUse = (i + 1) % epSet.numOfEps; - } - - epSet.eps[i].port = htons(epSet.eps[i].port); - } - - SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; - SMEpSet msg = {.epSet = epSet}; - int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg); - rsp.pCont = rpcMallocCont(contLen); - if (rsp.pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - } else { - tSerializeSMEpSet(rsp.pCont, contLen, &msg); - rsp.contLen = contLen; - } - - dmSendRsp(&rsp); - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; -} diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index e7256a3a87526d8c4b2b1e67189f8aa45935acf5..4946669678cd0fd17a22b935aa9e2613e58d73db 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -173,6 +173,7 @@ int32_t dmReadEps(SDnodeData *pData); int32_t dmWriteEps(SDnodeData *pData); void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); +void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet); void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); #ifdef __cplusplus diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index 94fa5695578ccfbcdfc6b7a7c1d4ee834c1f99f9..e0af20e41bfef194d90d30316c16042522e7f87d 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -314,6 +314,17 @@ void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { taosThreadRwlockUnlock(&pData->lock); } +void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) { + dmGetMnodeEpSet(pData, pEpSet); + dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, pEpSet->numOfEps, pEpSet->inUse); + for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { + dDebug("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) { + pEpSet->inUse = (i + 1) % pEpSet->numOfEps; + } + } +} + void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { taosThreadRwlockWrlock(&pData->lock); pData->mnodeEps = *pEpSet; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 35ecaa748ecdbdce486814f063b7678004f68909..6210fe3fcfd66f1395a3f5c9e20fa1bee9558611 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -994,9 +994,6 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr pAction->msgSent = 0; pAction->msgReceived = 0; pAction->errCode = terrno; - if (terrno == TSDB_CODE_INVALID_PTR || terrno == TSDB_CODE_NODE_OFFLINE) { - rpcFreeCont(rpcMsg.pCont); - } mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); return -1; }