diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 679e3ba7753f4b8de27a3cf8325af58d9f885b11..7ba6e5044c35b09ddf4cefea2509909c0e340c37 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -41,8 +41,8 @@ typedef enum { typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); -typedef void (*SendRspFp)(const SRpcMsg* pMsg); -typedef void (*SendRedirectRspFp)(const SRpcMsg* pMsg, const SEpSet* pNewEpSet); +typedef void (*SendRspFp)(SRpcMsg* pMsg); +typedef void (*SendRedirectRspFp)(SRpcMsg* pMsg, const SEpSet* pNewEpSet); typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg); typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type); typedef void (*ReportStartup)(const char* name, const char* desc); @@ -64,8 +64,8 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg); -void tmsgSendRsp(const SRpcMsg* pMsg); -void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet); +void tmsgSendRsp(SRpcMsg* pMsg); +void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); void tmsgReportStartup(const char* name, const char* desc); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index d28e2c675d9d3d05f3308e003b70f694af94d167..f69fb65f04492452c240a2fa647e822d523f1d8c 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -21,9 +21,9 @@ static SMsgCb tsDefaultMsgCb; void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } -int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { +int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) { PutToQueueFp fp = pMsgCb->queueFps[qtype]; - return (*fp)(pMsgCb->mgmt, pReq); + return (*fp)(pMsgCb->mgmt, pMsg); } int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { @@ -31,17 +31,17 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { return (*fp)(pMsgCb->mgmt, vgId, qtype); } -int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pReq) { +int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) { SendReqFp fp = tsDefaultMsgCb.sendReqFp; - return (*fp)(epSet, pReq); + return (*fp)(epSet, pMsg); } -void tmsgSendRsp(const SRpcMsg* pMsg) { +void tmsgSendRsp(SRpcMsg* pMsg) { SendRspFp fp = tsDefaultMsgCb.sendRspFp; return (*fp)(pMsg); } -void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet) { +void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) { SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp; (*fp)(pMsg, pNewEpSet); } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 7daf25bb8ad871b62c4ebe09d788356967d6ae4c..787c6c98c853c8f1116a46df464fe1f365e07ff6 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -75,6 +75,7 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { void dmStopStatusThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->statusThread)) { taosThreadJoin(pMgmt->statusThread, NULL); + taosThreadClear(&pMgmt->statusThread); } } @@ -95,6 +96,7 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { void dmStopMonitorThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->monitorThread)) { taosThreadJoin(pMgmt->monitorThread, NULL); + taosThreadClear(&pMgmt->monitorThread); } } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index ab41ee5df3e98492304e400b7ef3c5c920425bf6..8c3b8576a8d6f4eb3bb8a04766bd91ae1db57feb 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -196,6 +196,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { SVnodeThread *pThread = &threads[t]; if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { taosThreadJoin(pThread->thread, NULL); + taosThreadClear(&pThread->thread); } taosMemoryFree(pThread->pCfgs); } diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 2d03267fe882afdad5e4171bd91a266fd3c28780..7484c1e18f0e9e555e6f1c51e7d2d3bdad36ad7e 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -151,12 +151,10 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper); void dmCleanupProc(struct SMgmtWrapper *pWrapper); int32_t dmRunProc(SProc *proc); void dmStopProc(SProc *proc); -int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle); +void dmRemoveProcRpcHandle(SProc *proc, void *handle); void dmCloseProcRpcHandles(SProc *proc); -int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - void *handle, int64_t handleRef, EProcFuncType ftype); -void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - EProcFuncType ftype); +int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype); +void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype); // dmTransport.c int32_t dmInitServer(SDnode *pDnode); diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index f9875f718284f54f8a63222cba2de63dd8cc5df3..2e24e3fa1ced47d3d495fbc5a4af4ed69b63d3de 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -16,10 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -static inline int32_t CEIL8(int32_t v) { - const int32_t c = ceil((float)(v) / 8) * 8; - return c < 8 ? 8 : c; -} +static inline int32_t CEIL8(int32_t v) { return ceil((float)(v) / 8) * 8; } static int32_t dmInitProcMutex(SProcQueue *queue) { TdThreadMutexAttr mattr = {0}; @@ -87,42 +84,17 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) { return queue; } -#if 0 -static void dmDestroyProcQueue(SProcQueue *queue) { - if (queue->mutex != NULL) { - taosThreadMutexDestroy(queue->mutex); - queue->mutex = NULL; - } -} - -static void dmDestroyProcSem(SProcQueue *queue) { - if (queue->sem != NULL) { - tsem_destroy(queue->sem); - queue->sem = NULL; - } -} -#endif +static void dmCleanupProcQueue(SProcQueue *queue) {} -static void dmCleanupProcQueue(SProcQueue *queue) { -#if 0 - if (queue != NULL) { - dmDestroyProcQueue(queue); - dmDestroyProcSem(queue); - } -#endif -} - -static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHead, int16_t rawHeadLen, - const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef, - EProcFuncType ftype) { - if (rawHeadLen == 0 || pHead == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - const int32_t headLen = CEIL8(rawHeadLen); +static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) { + const void *pHead = pMsg; + const void *pBody = pMsg->pCont; + const int16_t rawHeadLen = sizeof(SRpcMsg); + const int32_t rawBodyLen = pMsg->contLen; + const int16_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; + const int64_t handle = (int64_t)pMsg->info.handle; taosThreadMutexLock(&queue->mutex); if (fullLen > queue->avail) { @@ -131,8 +103,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe return -1; } - if (handle != 0 && ftype == DND_FUNC_REQ) { - if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) { + if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) { + if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) { taosThreadMutexUnlock(&queue->mutex); return -1; } @@ -151,31 +123,31 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe if (queue->tail < queue->head) { memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); - memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen); + if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen); queue->tail = queue->tail + 8 + headLen + bodyLen; } else { int32_t remain = queue->total - queue->tail; if (remain == 0) { memcpy(queue->pBuffer + 8, pHead, rawHeadLen); - memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen); + if (rawBodyLen > 0) memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen); queue->tail = 8 + headLen + bodyLen; } else if (remain == 8) { memcpy(queue->pBuffer, pHead, rawHeadLen); - memcpy(queue->pBuffer + headLen, pBody, rawBodyLen); + if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen, pBody, rawBodyLen); queue->tail = headLen + bodyLen; } else if (remain < 8 + headLen) { memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8); memcpy(queue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8)); - memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); + if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); queue->tail = headLen - (remain - 8) + bodyLen; } else if (remain < 8 + headLen + bodyLen) { memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); - memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen); - memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); + if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen); + if (rawBodyLen > 0) memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); queue->tail = bodyLen - (remain - 8 - headLen); } else { memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); - memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen); + if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen); queue->tail = queue->tail + headLen + bodyLen + 8; } } @@ -185,13 +157,12 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe taosThreadMutexUnlock(&queue->mutex); tsem_post(&queue->sem); - dTrace("node:%s, push %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, - pBody, bodyLen, pos, queue->items); + dTrace("node:%s, push %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype), + pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, pMsg->code, pos, queue->items); return 0; } -static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen, - EProcFuncType *pFuncType) { +static int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) { tsem_wait(&queue->sem); taosThreadMutexLock(&queue->mutex); @@ -217,8 +188,9 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe int32_t bodyLen = CEIL8(rawBodyLen); void *pHead = taosAllocateQitem(headLen, DEF_QITEM); - void *pBody = rpcMallocCont(bodyLen); - if (pHead == NULL || pBody == NULL) { + void *pBody = NULL; + if (bodyLen > 0) pBody = rpcMallocCont(bodyLen); + if (pHead == NULL || (bodyLen > 0 && pBody == NULL)) { taosThreadMutexUnlock(&queue->mutex); tsem_post(&queue->sem); taosFreeQitem(pHead); @@ -230,31 +202,31 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe const int32_t pos = queue->head; if (queue->head < queue->tail) { memcpy(pHead, queue->pBuffer + queue->head + 8, headLen); - memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen); + if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen); queue->head = queue->head + 8 + headLen + bodyLen; } else { int32_t remain = queue->total - queue->head; if (remain == 0) { memcpy(pHead, queue->pBuffer + 8, headLen); - memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen); + if (bodyLen > 0) memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen); queue->head = 8 + headLen + bodyLen; } else if (remain == 8) { memcpy(pHead, queue->pBuffer, headLen); - memcpy(pBody, queue->pBuffer + headLen, bodyLen); + if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen, bodyLen); queue->head = headLen + bodyLen; } else if (remain < 8 + headLen) { memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8); memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8)); - memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen); + if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen); queue->head = headLen - (remain - 8) + bodyLen; } else if (remain < 8 + headLen + bodyLen) { memcpy(pHead, queue->pBuffer + queue->head + 8, headLen); - memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen); - memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen)); + if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen); + if (bodyLen > 0) memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen)); queue->head = bodyLen - (remain - 8 - headLen); } else { memcpy(pHead, queue->pBuffer + queue->head + 8, headLen); - memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen); + if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen); queue->head = queue->head + headLen + bodyLen + 8; } } @@ -263,14 +235,12 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe queue->items--; taosThreadMutexUnlock(&queue->mutex); - *ppHead = pHead; - *ppBody = pBody; - *pHeadLen = rawHeadLen; - *pBodyLen = rawBodyLen; + *ppMsg = pHead; + (*ppMsg)->pCont = pBody; *pFuncType = (EProcFuncType)ftype; - dTrace("node:%s, pop %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody, - bodyLen, pos, queue->items); + dTrace("node:%s, pop %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype), + (*ppMsg), (*ppMsg)->msgType, (*ppMsg)->info.handle, (*ppMsg)->contLen, (*ppMsg)->code, pos, queue->items); return 1; } @@ -308,18 +278,14 @@ static void *dmConsumChildQueue(void *param) { SProc *proc = param; SMgmtWrapper *pWrapper = proc->wrapper; SProcQueue *queue = proc->cqueue; - void *pHead = NULL; - void *pBody = NULL; - int16_t headLen = 0; - int32_t bodyLen = 0; int32_t numOfMsgs = 0; int32_t code = 0; EProcFuncType ftype = DND_FUNC_REQ; - SRpcMsg *pReq = NULL; + SRpcMsg *pMsg = NULL; dDebug("node:%s, start to consume from cqueue", proc->name); do { - numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype); + numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype); if (numOfMsgs == 0) { dDebug("node:%s, get no msg from cqueue and exit thread", proc->name); break; @@ -332,25 +298,24 @@ static void *dmConsumChildQueue(void *param) { } if (ftype != DND_FUNC_REQ) { - dFatal("node:%s, get msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype); - taosFreeQitem(pHead); - rpcFreeCont(pBody); - } else { - pReq = pHead; - pReq->pCont = pBody; - code = dmProcessNodeMsg(pWrapper, pReq); - if (code != 0) { - dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pReq, terrstr()); - SRpcMsg rspMsg = { - .info = pReq->info, - .pCont = pReq->info.rsp, - .contLen = pReq->info.rspLen, - }; - dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, DND_FUNC_RSP); - taosFreeQitem(pHead); - rpcFreeCont(pBody); - rpcFreeCont(rspMsg.pCont); - } + dError("node:%s, invalid ftype:%d from cqueue", proc->name, ftype); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + continue; + } + + code = dmProcessNodeMsg(pWrapper, pMsg); + if (code != 0) { + dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr()); + SRpcMsg rsp = { + .code = (terrno != 0 ? terrno : code), + .pCont = pMsg->info.rsp, + .contLen = pMsg->info.rspLen, + .info = pMsg->info, + }; + dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } } while (1); @@ -361,18 +326,14 @@ static void *dmConsumParentQueue(void *param) { SProc *proc = param; SMgmtWrapper *pWrapper = proc->wrapper; SProcQueue *queue = proc->pqueue; - void *pHead = NULL; - void *pBody = NULL; - int16_t headLen = 0; - int32_t bodyLen = 0; int32_t numOfMsgs = 0; int32_t code = 0; EProcFuncType ftype = DND_FUNC_REQ; - SRpcMsg *pRsp = NULL; + SRpcMsg *pMsg = NULL; dDebug("node:%s, start to consume from pqueue", proc->name); do { - numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype); + numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype); if (numOfMsgs == 0) { dDebug("node:%s, get no msg from pqueue and exit thread", proc->name); break; @@ -385,31 +346,19 @@ static void *dmConsumParentQueue(void *param) { } if (ftype == DND_FUNC_RSP) { - pRsp = pHead; - pRsp->pCont = pBody; - dTrace("node:%s, get rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle); - dmRemoveProcRpcHandle(proc, pRsp->info.handle); - rpcSendResponse(pRsp); + dmRemoveProcRpcHandle(proc, pMsg->info.handle); + rpcSendResponse(pMsg); } else if (ftype == DND_FUNC_REGIST) { - pRsp = pHead; - pRsp->pCont = pBody; - dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, - pRsp->info.handle); - rpcRegisterBrokenLinkArg(pRsp); + rpcRegisterBrokenLinkArg(pMsg); } else if (ftype == DND_FUNC_RELEASE) { - pRsp = pHead; - pRsp->pCont = NULL; - dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, - pRsp->info.handle); - dmRemoveProcRpcHandle(proc, pRsp->info.handle); - rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code); - rpcFreeCont(pBody); + dmRemoveProcRpcHandle(proc, pMsg->info.handle); + rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code); } else { - dFatal("node:%s, get msg:%p from pqueue, invalid ftype:%d", proc->name, pHead, ftype); - rpcFreeCont(pBody); + dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype); + rpcFreeCont(pMsg->pCont); } - taosFreeQitem(pHead); + taosFreeQitem(pMsg); } while (1); return NULL; @@ -468,51 +417,55 @@ void dmCleanupProc(struct SMgmtWrapper *pWrapper) { dmCleanupProcQueue(proc->cqueue); dmCleanupProcQueue(proc->pqueue); taosHashCleanup(proc->hash); + proc->hash = NULL; dDebug("node:%s, proc is cleaned up", pWrapper->name); } -int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle) { +void dmRemoveProcRpcHandle(SProc *proc, void *handle) { int64_t h = (int64_t)handle; taosThreadMutexLock(&proc->cqueue->mutex); - - int64_t *pRef = taosHashGet(proc->hash, &h, sizeof(int64_t)); - int64_t ref = 0; - if (pRef != NULL) { - ref = *pRef; - } - taosHashRemove(proc->hash, &h, sizeof(int64_t)); taosThreadMutexUnlock(&proc->cqueue->mutex); - - return ref; } void dmCloseProcRpcHandles(SProc *proc) { taosThreadMutexLock(&proc->cqueue->mutex); - void *h = taosHashIterate(proc->hash, NULL); - while (h != NULL) { - void *handle = *((void **)h); - h = taosHashIterate(proc->hash, h); - - dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, handle); - SRpcMsg rpcMsg = {.info.handle = handle, .code = TSDB_CODE_NODE_OFFLINE}; + SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL); + while (pInfo != NULL) { + dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle); + SRpcMsg rpcMsg = {.info = *pInfo, .code = TSDB_CODE_NODE_OFFLINE}; rpcSendResponse(&rpcMsg); + pInfo = taosHashIterate(proc->hash, pInfo); } taosHashClear(proc->hash); taosThreadMutexUnlock(&proc->cqueue->mutex); } -void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - EProcFuncType ftype) { +void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) { int32_t retry = 0; - while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) { - dWarn("node:%s, failed to put msg:%p to pqueue since %s, retry:%d", proc->name, pHead, terrstr(), retry); - retry++; - taosMsleep(retry); + while (1) { + if (dmPushToProcQueue(proc, proc->pqueue, pMsg, ftype) == 0) { + break; + } + + if (retry == 10) { + pMsg->code = terrno; + if (pMsg->contLen > 0) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + pMsg->contLen = 0; + } + dError("node:%s, failed to push %s msg:%p type:%d handle:%p then discard data and return error", proc->name, + dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle); + } else { + dError("node:%s, failed to push %s msg:%p type:%d handle:%p len:%d since %s, retry:%d", proc->name, + dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, terrstr(), retry); + retry++; + taosMsleep(retry); + } } } -int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - void *handle, int64_t ref, EProcFuncType ftype) { - return dmPushToProcQueue(proc, proc->cqueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype); +int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) { + return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 2efa34170454c4514de9cf24af577261b6de6dc5..c9100aab9daf0880376df7fff7c2b93d6bac3e7e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -128,9 +128,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } if (InParentProc(pWrapper)) { - code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen, - (IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId, - DND_FUNC_REQ); + code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ); } else { code = dmProcessNodeMsg(pWrapper, pMsg); } @@ -255,23 +253,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) { } } -static inline void dmSendRsp(const SRpcMsg *pMsg) { +static inline void dmSendRsp(SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; - if (InChildProc(pWrapper)) { - dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP); + if (pMsg->code == TSDB_CODE_NODE_REDIRECT) { + dmSendRpcRedirectRsp(pMsg); } else { - if (pMsg->code == TSDB_CODE_NODE_REDIRECT) { - dmSendRpcRedirectRsp(pMsg); + if (InChildProc(pWrapper)) { + dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); } else { rpcSendResponse(pMsg); } } } -static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { - SMgmtWrapper *pWrapper = pRsp->info.wrapper; +static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { + SMgmtWrapper *pWrapper = pMsg->info.wrapper; if (InChildProc(pWrapper)) { - dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); + dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); } else { SRpcMsg rsp = {0}; SMEpSet msg = {.epSet = *pNewEpSet}; @@ -281,7 +279,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe tSerializeSMEpSet(rsp.pCont, len, &msg); rsp.code = TSDB_CODE_RPC_REDIRECT; - rsp.info = pRsp->info; + rsp.info = pMsg->info; rpcSendResponse(&rsp); } } @@ -289,7 +287,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; if (InChildProc(pWrapper)) { - dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST); + dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST); } else { rpcRegisterBrokenLinkArg(pMsg); } @@ -299,7 +297,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { SMgmtWrapper *pWrapper = pHandle->wrapper; if (InChildProc(pWrapper)) { SRpcMsg msg = {.code = type, .info = *pHandle}; - dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE); + dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE); } else { rpcReleaseHandle(pHandle->handle, type); } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 2756d2203e2a82dc0e741959e55b1b412cd4d501..3dfba4eca73b8c4f8c36c73fae81c8d1a56eeb04 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -122,7 +122,7 @@ static void mndCleanupTimer(SMnode *pMnode) { pMnode->stopped = true; if (taosCheckPthreadValid(pMnode->thread)) { taosThreadJoin(pMnode->thread, NULL); - memset(&pMnode->thread, 0, sizeof(pMnode->thread)); + taosThreadClear(&pMnode->thread); } } diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index efaa8bb7d37214c44433dd871044728353e822ec..82acd7fddc2fe44607095898853f9ebc3b658f4f 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -16,10 +16,9 @@ #include "tcache.h" void reportStartup(const char *name, const char *desc) {} -void sendRsp(const SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } +void sendRsp(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { - // rpcFreeCont(pMsg->pCont); terrno = TSDB_CODE_INVALID_PTR; return -1; } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 1117528b531c2201898160263235c57adcbb877a..203a8a1e625176f2d037915dea945f26999f218f 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -237,6 +237,7 @@ static int32_t syncIOStopInternal(SSyncIO *io) { int32_t ret = 0; atomic_store_8(&io->isStart, 0); taosThreadJoin(io->consumerTid, NULL); + taosThreadClear(&io->consumerTid); taosTmrCleanUp(io->timerMgr); return ret; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index cf40c998deabb1b869b8e64cbd84b7f52dbb01ea..ada1f599f231a9a9e2092fbc68637d13e33aa8ff 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -244,6 +244,7 @@ static void walStopThread() { if (taosCheckPthreadValid(tsWal.thread)) { taosThreadJoin(tsWal.thread, NULL); + taosThreadClear(&tsWal.thread); } wDebug("wal thread is stopped"); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 46403833f1a059b6c7751048cef2cadcec55ff1d..c1fc2c48c04b1fe42ea886516772ab63eac91556 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -145,6 +145,7 @@ void taosCloseLog() { taosStopLog(); if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL); + taosThreadClear(&tsLogObj.logHandle->asyncThread); } tsLogInited = 0; diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 2deba5077b37e227b507b4d982dd8d7883de1199..ee1f4185613dd85f0e60d86ebd0487b07b3ceee9 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -209,6 +209,7 @@ void taosCleanUpScheduler(void *param) { for (int32_t i = 0; i < pSched->numOfThreads; ++i) { if (taosCheckPthreadValid(pSched->qthread[i])) { taosThreadJoin(pSched->qthread[i], NULL); + taosThreadClear(&pSched->qthread[i]); } } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 7fc390e70ba64731ae91d85808730684d6e06c8a..dc48fc3f8d2b2e803e8f1593d5471184fa99e059 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -57,6 +57,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) { if (worker == NULL) continue; if (taosCheckPthreadValid(worker->thread)) { taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); } } @@ -179,6 +180,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) { SWWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); taosFreeQall(worker->qall); taosCloseQset(worker->qset); } diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index 4fca7d6245b6e5dcf18f4fa2103e6204ec4e6188..c53ae0136cb2da84fd35634d0011a18a36d57c67 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -436,6 +436,7 @@ int32_t main(int32_t argc, char *argv[]) { taosMsleep(300); for (int32_t i = 0; i < numOfThreads; i++) { taosThreadJoin(pInfo[i].thread, NULL); + taosThreadClear(&pInfo[i].thread); } int64_t maxDelay = 0; diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 55aa7a8d31e12acc5a0af45768ca663659fd64c3..cf113369bc3eae01e019a4d97bc664f5d5a7665b 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -537,6 +537,7 @@ int main(int32_t argc, char* argv[]) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); + taosThreadClear(&g_stConfInfo.stThreads[i].thread); } // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); diff --git a/tests/tsim/src/simSystem.c b/tests/tsim/src/simSystem.c index 69f15a164ef78fb750c5386683f0eb66f24d0b7e..969332ba5f032efcc2098c3bcc215456b8e6a509 100644 --- a/tests/tsim/src/simSystem.c +++ b/tests/tsim/src/simSystem.c @@ -56,6 +56,7 @@ void simFreeScript(SScript *script) { bgScript->killed = true; if (taosCheckPthreadValid(bgScript->bgPid)) { taosThreadJoin(bgScript->bgPid, NULL); + taosThreadClear(&bgScript->bgPid); } simDebug("script:%s, background thread joined", bgScript->fileName); diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 4825aae699240f69cc0b67b18f7e1ee940c18e68..9f9c8821b04760e5c49d0edd87d58899e284a1fb 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -985,6 +985,7 @@ int32_t shellExecute() { while (1) { taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn); taosThreadJoin(shell.pid, NULL); + taosThreadClear(&shell.pid); } return 0;