From 4091771274f95910111e46c41d9b5579363b62bf Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 14:38:26 +0800 Subject: [PATCH] refact queue --- include/util/tworker.h | 32 ++++++++++++++ source/dnode/mgmt/mnode/inc/mmInt.h | 6 +-- source/dnode/mgmt/mnode/src/mmWorker.c | 28 ++++++------ source/util/src/tqueue.c | 1 - source/util/src/tworker.c | 59 +++++++++++++++++++++++++- 5 files changed, 108 insertions(+), 18 deletions(-) diff --git a/include/util/tworker.h b/include/util/tworker.h index 3da8a1db63..91f4fbf7ff 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -67,6 +67,38 @@ void tWWorkerCleanup(SWWorkerPool *pool); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); +typedef struct { + const char *name; + int32_t minNum; + int32_t maxNum; + FItem fp; + void *param; +} SQWorkerAllCfg; + +typedef struct { + const char *name; + STaosQueue *queue; + SQWorkerPool pool; +} SQWorkerAll; + +typedef struct { + const char *name; + int32_t maxNum; + FItems fp; + void *param; +} SWWorkerAllCfg; + +typedef struct { + const char *name; + STaosQueue *queue; + SWWorkerPool pool; +} SWWorkerAll; + +int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg); +void tQWorkerAllCleanup(SQWorkerAll *pWorker); +int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg); +void tWWorkerAllCleanup(SWWorkerAll *pWorker); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 1751131764..06ed637791 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -28,9 +28,9 @@ typedef struct SMnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SDnodeWorker readWorker; - SDnodeWorker writeWorker; - SDnodeWorker syncWorker; + SQWorkerAll readWorker; + SQWorkerAll writeWorker; + SQWorkerAll syncWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; int8_t selfIndex; diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index ab9f46e323..470128940b 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -42,9 +42,9 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) { +static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { @@ -59,7 +59,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } -static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) { +static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -68,7 +68,7 @@ static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; - int32_t code = dndWriteMsgToWorker(pWorker, pMsg); + int32_t code = taosWriteQitem(pWorker->queue, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); @@ -89,18 +89,20 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) { - dError("failed to start mnode read worker since %s", terrstr()); + SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + + if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) { + dError("failed to start mnode-read worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) { - dError("failed to start mnode write worker since %s", terrstr()); + if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) { + dError("failed to start mnode-write worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) { - dError("failed to start mnode sync worker since %s", terrstr()); + if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) { + dError("failed to start mnode sync-worker since %s", terrstr()); return -1; } @@ -108,7 +110,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { } void mmStopWorker(SMnodeMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->readWorker); - dndCleanupWorker(&pMgmt->writeWorker); - dndCleanupWorker(&pMgmt->syncWorker); + tQWorkerAllCleanup(&pMgmt->readWorker); + tQWorkerAllCleanup(&pMgmt->writeWorker); + tQWorkerAllCleanup(&pMgmt->syncWorker); } diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 258ca1402f..70f2871f41 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -441,7 +441,6 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) { STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); taosThreadMutexLock(&qset->mutex); - pNode->queue->threadId = -1; for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) { tsem_post(&qset->sem); } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 7d7f819dec..e05c4e0a78 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param); int32_t tQWorkerInit(SQWorkerPool *pool) { pool->qset = taosOpenQset(); - pool->workers = calloc(sizeof(SQWorker), pool->max); + pool->workers = calloc(pool->max, sizeof(SQWorker)); if (pool->workers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -279,3 +279,60 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); } + +int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) { + SQWorkerPool *pPool = &pWorker->pool; + pPool->name = pCfg->name; + pPool->min = pCfg->minNum; + pPool->max = pCfg->maxNum; + if (tQWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (pWorker->queue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->name = pCfg->name; + return 0; +} + +void tQWorkerAllCleanup(SQWorkerAll *pWorker) { + if (pWorker->queue == NULL) return; + + while (!taosQueueEmpty(pWorker->queue)) { + taosMsleep(10); + } + + tQWorkerCleanup(&pWorker->pool); + tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); +} + +int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) { + SWWorkerPool *pPool = &pWorker->pool; + pPool->name = pCfg->name; + pPool->max = pCfg->maxNum; + if (tWWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (pWorker->queue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->name = pCfg->name; + return 0; +} + +void tWWorkerAllCleanup(SWWorkerAll *pWorker) { + if (pWorker->queue == NULL) return; + + while (!taosQueueEmpty(pWorker->queue)) { + taosMsleep(10); + } + + tWWorkerCleanup(&pWorker->pool); + tWWorkerFreeQueue(&pWorker->pool, pWorker->queue); +} -- GitLab