From a340baa8795faea0c1463eb088e2de941cdd4ea7 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 16:10:26 +0800 Subject: [PATCH] refact queue --- include/util/tqueue.h | 11 +++++++++-- include/util/tworker.h | 3 ++- source/dnode/mgmt/bnode/src/bmWorker.c | 3 ++- source/dnode/mgmt/dnode/src/dmWorker.c | 4 +++- source/dnode/mgmt/mnode/src/mmWorker.c | 4 +++- source/dnode/mgmt/qnode/src/qmWorker.c | 8 ++++++-- source/dnode/mgmt/snode/src/smWorker.c | 10 +++++++--- source/dnode/mgmt/vnode/src/vmWorker.c | 26 ++++++++++++++++++-------- source/util/src/tworker.c | 6 +++++- 9 files changed, 55 insertions(+), 20 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 9faf90113e..e84875bd7d 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -42,8 +42,15 @@ shall be used to set up the protection. typedef struct STaosQueue STaosQueue; typedef struct STaosQset STaosQset; typedef struct STaosQall STaosQall; -typedef void (*FItem)(void *ahandle, void *pItem); -typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); +typedef struct { + void *ahandle; + int32_t qsize; + int32_t workerId; + int32_t threadNum; +} SQueueInfo; + +typedef void (*FItem)(SQueueInfo *pInfo, void *pItem); +typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems); STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); diff --git a/include/util/tworker.h b/include/util/tworker.h index 91f4fbf7ff..f8ff45269b 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -50,7 +50,8 @@ typedef struct SWWorker { } SWWorker; typedef struct SWWorkerPool { - int32_t max; // max number of workers + int32_t max; // max number of workers + int32_t num; int32_t nextId; // from 0 to max-1, cyclic const char *name; SWWorker *workers; diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index 79b2a669ae..42490d66cf 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -33,7 +33,8 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num } } -static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { +static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SBnodeMgmt *pMgmt = pInfo->ahandle; SMgmtWrapper *pWrapper = pMgmt->pWrapper; SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 5bacd5f10b..79ed6d50c8 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -52,7 +52,9 @@ static void *dmThreadRoutine(void *param) { } } -static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SDnodeMgmt *pMgmt = pInfo->ahandle; + SDnode *pDnode = pMgmt->pDnode; SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 470128940b..9f64bec7c5 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -16,7 +16,9 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in mnode queue", pMsg); SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 19686e2a76..5f71837269 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -21,7 +21,9 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { dndSendRsp(pWrapper, &rsp); } -static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in qnode-query queue", pMsg); int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg); if (code != 0) { @@ -33,7 +35,9 @@ static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg); int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg); if (code != 0) { diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 99c844f8b5..c008d8175f 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -16,7 +16,9 @@ #define _DEFAULT_SOURCE #include "smInt.h" -static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { +static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SSnodeMgmt *pMgmt = pInfo->ahandle; + for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); @@ -30,7 +32,9 @@ static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t num } } -static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in snode shared queue", pMsg); sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg); @@ -53,7 +57,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } - SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = (FItems)smProcessUniqueQueue, .param = pMgmt}; + SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) { dError("failed to start snode-unique worker since %s", terrstr()); diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index dd398cb202..73aff7f66d 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -21,7 +21,9 @@ static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { dndSendRsp(pWrapper, &rsp); } -static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { +static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); @@ -57,7 +59,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { +static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SVnodeObj *pVnode = pInfo->ahandle; + dTrace("msg:%p, will be processed in vnode-query queue", pMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { @@ -68,7 +72,9 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { } } -static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { +static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SVnodeObj *pVnode = pInfo->ahandle; + dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { @@ -79,7 +85,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { } } -static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { +static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); if (pArray == NULL) { dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); @@ -126,8 +134,9 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO taosArrayDestroy(pArray); } -static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { - SNodeMsg *pMsg = NULL; +static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SNodeMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); @@ -138,8 +147,9 @@ static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO } } -static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { - SNodeMsg *pMsg = NULL; +static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SNodeMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index e05c4e0a78..44fc2f9f93 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -86,7 +86,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) { } if (fp != NULL) { - (*fp)(ahandle, msg); + SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; + (*fp)(&info, msg); } } @@ -210,6 +211,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) { } if (fp != NULL) { + SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; (*fp)(ahandle, worker->qall, numOfMsgs); } } @@ -264,6 +266,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { } taosThreadAttrDestroy(&thAttr); + pool->num++; + if (pool->num > pool->max) pool->num = pool->max; } else { taosAddIntoQset(worker->qset, queue, ahandle); pool->nextId = (pool->nextId + 1) % pool->max; -- GitLab