提交 92e2d961 编写于 作者: S Shengliang Guan

refactor: free rpcCont on error occured

上级 2b9e2659
...@@ -282,8 +282,8 @@ int32_t* taosGetErrno(); ...@@ -282,8 +282,8 @@ int32_t* taosGetErrno();
// dnode // dnode
#define TSDB_CODE_NODE_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_NODE_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_NODE_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401) #define TSDB_CODE_NODE_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401)
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403) #define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0402)
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404) #define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403)
// vnode // vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500)
......
...@@ -142,13 +142,14 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg ...@@ -142,13 +142,14 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg
queue->tail = headLen + bodyLen; queue->tail = headLen + bodyLen;
} else if (remain < 8 + headLen) { } else if (remain < 8 + headLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8); memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
memcpy(queue->pBuffer, (char*)pHead + remain - 8, rawHeadLen - (remain - 8)); memcpy(queue->pBuffer, (char *)pHead + remain - 8, rawHeadLen - (remain - 8));
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
queue->tail = headLen - (remain - 8) + bodyLen; queue->tail = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) { } else if (remain < 8 + headLen + bodyLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen); if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer, (char*)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); if (rawBodyLen > 0)
memcpy(queue->pBuffer, (char *)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
queue->tail = bodyLen - (remain - 8 - headLen); queue->tail = bodyLen - (remain - 8 - headLen);
} else { } else {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
...@@ -312,12 +313,7 @@ static void *dmConsumChildQueue(void *param) { ...@@ -312,12 +313,7 @@ static void *dmConsumChildQueue(void *param) {
code = dmProcessNodeMsg(pWrapper, pMsg); code = dmProcessNodeMsg(pWrapper, pMsg);
if (code != 0) { if (code != 0) {
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr()); dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
SRpcMsg rsp = { SRpcMsg rsp = {.code = (terrno != 0 ? terrno : code), .info = pMsg->info};
.code = (terrno != 0 ? terrno : code),
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
.info = pMsg->info,
};
dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP); dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
...@@ -469,8 +465,18 @@ void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) { ...@@ -469,8 +465,18 @@ void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
taosMsleep(retry); taosMsleep(retry);
} }
} }
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
pMsg->contLen = 0;
} }
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) { int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype); int32_t code = dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
if (code == 0) {
dTrace("msg:%p, is freed after push to cqueue", pMsg);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
return code;
} }
...@@ -16,6 +16,9 @@ ...@@ -16,6 +16,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
static inline void dmSendRsp(SRpcMsg *pMsg);
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) { static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0}; SRpcConnInfo connInfo = {0};
if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) { if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) {
...@@ -122,27 +125,28 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -122,27 +125,28 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
} }
_OVER: _OVER:
if (code == 0) { if (code != 0) {
if (pWrapper != NULL && InParentProc(pWrapper)) {
dTrace("msg:%p, is freed after push to cqueue", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
} else {
dError("msg:%p, failed to process since %s", pMsg, terrstr()); dError("msg:%p, failed to process since %s", pMsg, terrstr());
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
if (IsReq(pRpc)) { if (IsReq(pRpc)) {
if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) { if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
if (pRpc->msgType > TDMT_MND_MSG && pRpc->msgType < TDMT_VND_MSG) { pRpc->msgType < TDMT_VND_MSG) {
code = TSDB_CODE_NODE_REDIRECT; SEpSet epset = {0};
} dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epset);
SRpcMsg rsp = {.code = TSDB_CODE_NODE_REDIRECT, .info = pRpc->info};
if (pWrapper != NULL) {
dmSendRedirectRsp(&rsp, &epset);
} else {
rpcSendRedirectRsp(&rsp, &epset);
} }
SRpcMsg rspMsg = {.code = code, .info = pRpc->info}; } else {
SRpcMsg rsp = {.code = code, .info = pRpc->info};
if (pWrapper != NULL) { if (pWrapper != NULL) {
tmsgSendRsp(&rspMsg); dmSendRsp(&rsp);
} else { } else {
rpcSendResponse(&rspMsg); rpcSendResponse(&rsp);
}
} }
} }
...@@ -198,8 +202,6 @@ static inline void dmSendRsp(SRpcMsg *pMsg) { ...@@ -198,8 +202,6 @@ static inline void dmSendRsp(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper; SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) { if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} else { } else {
rpcSendResponse(pMsg); rpcSendResponse(pMsg);
} }
...@@ -226,8 +228,6 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { ...@@ -226,8 +228,6 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper; SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) { if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST); dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} else { } else {
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
} }
...@@ -322,34 +322,3 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) { ...@@ -322,34 +322,3 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
}; };
return msgCb; return msgCb;
} }
static void dmSendMnodeRedirectRsp(SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
SMEpSet msg = {.epSet = epSet};
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSMEpSet(rsp.pCont, contLen, &msg);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
...@@ -173,6 +173,7 @@ int32_t dmReadEps(SDnodeData *pData); ...@@ -173,6 +173,7 @@ int32_t dmReadEps(SDnodeData *pData);
int32_t dmWriteEps(SDnodeData *pData); int32_t dmWriteEps(SDnodeData *pData);
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet);
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -314,6 +314,17 @@ void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { ...@@ -314,6 +314,17 @@ void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
taosThreadRwlockUnlock(&pData->lock); taosThreadRwlockUnlock(&pData->lock);
} }
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
dmGetMnodeEpSet(pData, pEpSet);
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) {
pEpSet->inUse = (i + 1) % pEpSet->numOfEps;
}
}
}
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
taosThreadRwlockWrlock(&pData->lock); taosThreadRwlockWrlock(&pData->lock);
pData->mnodeEps = *pEpSet; pData->mnodeEps = *pEpSet;
......
...@@ -994,9 +994,6 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -994,9 +994,6 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
pAction->msgSent = 0; pAction->msgSent = 0;
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = terrno; pAction->errCode = terrno;
if (terrno == TSDB_CODE_INVALID_PTR || terrno == TSDB_CODE_NODE_OFFLINE) {
rpcFreeCont(rpcMsg.pCont);
}
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册