diff --git a/include/util/tprocess.h b/include/util/tprocess.h index bc0c8fe5a43f269c65aa75a4828bc678ef0d26fa..80c855b78c480aa71e0c94154ae014121d8f3aef 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -24,11 +24,10 @@ extern "C" { typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; -typedef struct SProcQueue SProcQueue; -typedef struct SProcObj SProcObj; +typedef struct SProcObj SProcObj; typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcFreeFp)(void *pCont); -typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, +typedef void *(*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype); typedef struct { @@ -43,7 +42,7 @@ typedef struct { ProcMallocFp parentMallocBodyFp; ProcFreeFp parentFreeBodyFp; SShm shm; - void *pParent; + void *parent; const char *name; bool isChild; } SProcCfg; @@ -51,10 +50,13 @@ typedef struct { SProcObj *taosProcInit(const SProcCfg *pCfg); void taosProcCleanup(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc); -int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType ftype); -void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType ftype); + +int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, + void *handle, ProcFuncType ftype); +void taosProcRemoveHandle(SProcObj *pProc, void *handle); +void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)); +void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, + ProcFuncType ftype); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/main/dndExec.c b/source/dnode/mgmt/main/dndExec.c index a98c0954fedc7f7b4f6923140c135fe705dd7218..d76ef37a5d61355bca3d918fcf9014c679fdbeeb 100644 --- a/source/dnode/mgmt/main/dndExec.c +++ b/source/dnode/mgmt/main/dndExec.c @@ -88,6 +88,10 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { return 0; } +static void dndProcessProcHandle(void *handle) { + SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE}; + rpcSendResponse(&rpcMsg); +} static int32_t dndRunInSingleProcess(SDnode *pDnode) { dInfo("dnode run in single process"); @@ -220,6 +224,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); + taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle); dndNewProc(pWrapper, n); } } diff --git a/source/dnode/mgmt/main/dndTransport.c b/source/dnode/mgmt/main/dndTransport.c index babc6d1209e7d1276d779cbab371bc7c240a6b46..22895dffcc4bde85fa58e1f35a9f1753a7207c75 100644 --- a/source/dnode/mgmt/main/dndTransport.c +++ b/source/dnode/mgmt/main/dndTransport.c @@ -12,7 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - + #define _DEFAULT_SOURCE #include "dndInt.h" @@ -68,7 +68,8 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS code = (*msgFp)(pWrapper, pMsg); } else if (pWrapper->procType == PROC_PARENT) { dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); - code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ); + code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle, + PROC_REQ); } else { dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); ASSERT(1); @@ -454,6 +455,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t rpcRegisterBrokenLinkArg(pMsg); break; case PROC_RELEASE: + taosProcRemoveHandle(pWrapper->pProc, pMsg->handle); rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); rpcFreeCont(pCont); break; @@ -461,6 +463,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); break; case PROC_RSP: + taosProcRemoveHandle(pWrapper->pProc, pMsg->handle); dndSendRpcRsp(pWrapper, pMsg); break; default: @@ -481,7 +484,7 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = pWrapper->shm, - .pParent = pWrapper, + .parent = pWrapper, .name = pWrapper->name}; return cfg; } \ No newline at end of file diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 600413068c5560642f4689be671e35bd949c089c..139c35de4567938fc3e528098b47be234fd88032 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -15,7 +15,9 @@ #define _DEFAULT_SOURCE #include "tprocess.h" +#include "taos.h" #include "taoserror.h" +#include "thash.h" #include "tlog.h" #include "tqueue.h" @@ -48,8 +50,9 @@ typedef struct SProcObj { ProcFreeFp parentFreeHeadFp; ProcMallocFp parentMallocBodyFp; ProcFreeFp parentFreeBodyFp; - void *pParent; + void *parent; const char *name; + SHashObj *hash; int32_t pid; bool isChild; bool stopFlag; @@ -151,8 +154,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { #endif } -static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, - int32_t rawBodyLen, ProcFuncType ftype) { +static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, + const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) { const int32_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; @@ -164,6 +167,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t return -1; } + if (handle != 0 && ftype == PROC_REQ) { + if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handle, sizeof(int64_t)) != 0) { + taosThreadMutexUnlock(&pQueue->mutex); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + const int32_t pos = pQueue->tail; if (pQueue->tail < pQueue->total) { *(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen; @@ -317,6 +328,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { pProc->name = pCfg->name; pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize); pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize); + pProc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { taosProcCleanupQueue(pProc->pChildQueue); taosMemoryFree(pProc); @@ -324,7 +336,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { } pProc->name = pCfg->name; - pProc->pParent = pCfg->pParent; + pProc->parent = pCfg->parent; pProc->childMallocHeadFp = pCfg->childMallocHeadFp; pProc->childFreeHeadFp = pCfg->childFreeHeadFp; pProc->childMallocBodyFp = pCfg->childMallocBodyFp; @@ -384,7 +396,7 @@ static void taosProcThreadLoop(SProcObj *pProc) { taosMsleep(1); continue; } else { - (*consumeFp)(pProc->pParent, pHead, headLen, pBody, bodyLen, ftype); + (*consumeFp)(pProc->parent, pHead, headLen, pBody, bodyLen, ftype); } } } @@ -424,19 +436,41 @@ void taosProcCleanup(SProcObj *pProc) { taosProcStop(pProc); taosProcCleanupQueue(pProc->pChildQueue); taosProcCleanupQueue(pProc->pParentQueue); + if (pProc->hash != NULL) { + taosHashCleanup(pProc->hash); + pProc->hash = NULL; + } + uDebug("proc:%s, is cleaned up", pProc->name); taosMemoryFree(pProc); } } int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType ftype) { - return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype); + void *handle, ProcFuncType ftype) { + return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype); +} + +void taosProcRemoveHandle(SProcObj *pProc, void *handle) { + int64_t h = (int64_t)handle; + taosThreadMutexLock(&pProc->pChildQueue->mutex); + taosHashRemove(pProc->hash, &h, sizeof(int64_t)); + taosThreadMutexUnlock(&pProc->pChildQueue->mutex); +} + +void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { + taosThreadMutexLock(&pProc->pChildQueue->mutex); + void *h = taosHashIterate(pProc->hash, NULL); + while (h != NULL) { + void *handle = *((void **)h); + (*HandleFp)(handle); + } + taosThreadMutexUnlock(&pProc->pChildQueue->mutex); } void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype) { - while (taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype) != 0) { + while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { taosMsleep(1); } }