From 34e374633aeb855ae0ba53150bb0e47a0630561b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 9 Dec 2020 15:36:45 +0800 Subject: [PATCH] TD-2393 --- src/dnode/src/dnodeVMgmt.c | 66 +++++++++++++------------------------- 1 file changed, 23 insertions(+), 43 deletions(-) diff --git a/src/dnode/src/dnodeVMgmt.c b/src/dnode/src/dnodeVMgmt.c index 87302026ec..4350614545 100644 --- a/src/dnode/src/dnodeVMgmt.c +++ b/src/dnode/src/dnodeVMgmt.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tqueue.h" +#include "tworker.h" #include "dnodeVMgmt.h" typedef struct { @@ -23,9 +24,8 @@ typedef struct { char pCont[]; } SMgmtMsg; -static taos_qset tsMgmtQset = NULL; -static taos_queue tsMgmtQueue = NULL; -static pthread_t tsQthread; +static SWorkerPool tsVMgmtWP; +static taos_queue tsVMgmtQueue = NULL; static void * dnodeProcessMgmtQueue(void *param); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); @@ -47,45 +47,23 @@ int32_t dnodeInitVMgmt() { int32_t code = vnodeInitMgmt(); if (code != TSDB_CODE_SUCCESS) return -1; - tsMgmtQset = taosOpenQset(); - if (tsMgmtQset == NULL) { - dError("failed to create the vmgmt queue set"); - return -1; - } - - tsMgmtQueue = taosOpenQueue(); - if (tsMgmtQueue == NULL) { - dError("failed to create the vmgmt queue"); - return -1; - } - - taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL); + tsVMgmtWP.name = "vmgmt"; + tsVMgmtWP.workerFp = dnodeProcessMgmtQueue; + tsVMgmtWP.min = 1; + tsVMgmtWP.max = 1; + if (tWorkerInit(&tsVMgmtWP) != 0) return -1; - pthread_attr_t thAttr; - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - - code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL); - pthread_attr_destroy(&thAttr); - if (code != 0) { - dError("failed to create thread to process vmgmt queue, reason:%s", strerror(errno)); - return -1; - } + tsVMgmtQueue = tWorkerAllocQueue(&tsVMgmtWP, NULL); dInfo("dnode vmgmt is initialized"); return TSDB_CODE_SUCCESS; } void dnodeCleanupVMgmt() { - if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset); - if (tsQthread) pthread_join(tsQthread, NULL); - - if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue); - if (tsMgmtQset) taosCloseQset(tsMgmtQset); - - tsMgmtQset = NULL; - tsMgmtQueue = NULL; + tWorkerFreeQueue(&tsVMgmtWP, tsVMgmtQueue); + tWorkerCleanup(&tsVMgmtWP); + tsVMgmtQueue = NULL; vnodeCleanupMgmt(); } @@ -97,7 +75,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { pMgmt->rpcMsg = *pMsg; pMgmt->rpcMsg.pCont = pMgmt->pCont; memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen); - taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt); + taosWriteQitem(tsVMgmtQueue, TAOS_QTYPE_RPC, pMgmt); return TSDB_CODE_SUCCESS; } @@ -112,16 +90,18 @@ void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static void *dnodeProcessMgmtQueue(void *param) { - SMgmtMsg *pMgmt; - SRpcMsg * pMsg; - SRpcMsg rsp = {0}; - int32_t qtype; - void * handle; +static void *dnodeProcessMgmtQueue(void *wparam) { + SWorker * pWorker = wparam; + SWorkerPool *pPool = pWorker->pPool; + SMgmtMsg * pMgmt; + SRpcMsg * pMsg; + SRpcMsg rsp = {0}; + int32_t qtype; + void * handle; while (1) { - if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) { - dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset); + if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pMgmt, &handle) == 0) { + dDebug("qdnode mgmt got no message from qset:%p, , exit", pPool->qset); break; } -- GitLab