From 2af212f42da5054de5401e6f520f1ff01883dc68 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 16:51:30 +0800 Subject: [PATCH] refact worker --- include/util/tworker.h | 16 ++++++------ source/dnode/mgmt/bnode/inc/bmInt.h | 2 +- source/dnode/mgmt/bnode/src/bmWorker.c | 8 +++--- source/dnode/mgmt/dnode/inc/dmInt.h | 4 +-- source/dnode/mgmt/dnode/src/dmWorker.c | 14 +++++------ source/dnode/mgmt/mnode/inc/mmInt.h | 6 ++--- source/dnode/mgmt/mnode/src/mmWorker.c | 18 +++++++------- source/dnode/mgmt/qnode/inc/qmInt.h | 4 +-- source/dnode/mgmt/qnode/src/qmWorker.c | 32 ++++++++++++------------ source/dnode/mgmt/snode/inc/smInt.h | 4 +-- source/dnode/mgmt/snode/src/smWorker.c | 34 +++++++++++++------------- source/dnode/mgmt/vnode/inc/vmInt.h | 2 +- source/dnode/mgmt/vnode/src/vmWorker.c | 24 ++++++------------ source/util/src/tworker.c | 8 +++--- 14 files changed, 84 insertions(+), 92 deletions(-) diff --git a/include/util/tworker.h b/include/util/tworker.h index f8ff45269b..92d474c885 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -74,31 +74,31 @@ typedef struct { int32_t maxNum; FItem fp; void *param; -} SQWorkerAllCfg; +} SSingleWorkerCfg; typedef struct { const char *name; STaosQueue *queue; SQWorkerPool pool; -} SQWorkerAll; +} SSingleWorker; typedef struct { const char *name; int32_t maxNum; FItems fp; void *param; -} SWWorkerAllCfg; +} SMultiWorkerCfg; typedef struct { const char *name; STaosQueue *queue; SWWorkerPool pool; -} SWWorkerAll; +} SMultiWorker; -int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg); -void tQWorkerAllCleanup(SQWorkerAll *pWorker); -int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg); -void tWWorkerAllCleanup(SWWorkerAll *pWorker); +int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg); +void tSingleWorkerCleanup(SSingleWorker *pWorker); +int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg); +void tMultiWorkerCleanup(SMultiWorker *pWorker); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index fc6b7a999d..8cfff0f1f3 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -28,7 +28,7 @@ typedef struct SBnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SWWorkerAll writeWorker; + SMultiWorker writeWorker; } SBnodeMgmt; // bmInt.c diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index 42490d66cf..2099787c0d 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -64,15 +64,15 @@ static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs } int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SWWorkerAll *pWorker = &pMgmt->writeWorker; + SMultiWorker *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - SWWorkerAllCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; - if (tWWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) { + SMultiWorkerCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; + if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start bnode write worker since %s", terrstr()); return -1; } @@ -80,4 +80,4 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { return 0; } -void bmStopWorker(SBnodeMgmt *pMgmt) { tWWorkerAllCleanup(&pMgmt->writeWorker); } +void bmStopWorker(SBnodeMgmt *pMgmt) { tMultiWorkerCleanup(&pMgmt->writeWorker); } diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index 0330b7f996..b02b1d2297 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -31,8 +31,8 @@ typedef struct SDnodeMgmt { SArray *dnodeEps; TdThread *threadId; SRWLatch latch; - SQWorkerAll mgmtWorker; - SQWorkerAll statusWorker; + SSingleWorker mgmtWorker; + SSingleWorker statusWorker; const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 9929ebae4f..b62c18655a 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -100,16 +100,16 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { - SQWorkerAllCfg mgmtCfg = { + SSingleWorkerCfg mgmtCfg = { .minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - SQWorkerAllCfg statusCfg = { + SSingleWorkerCfg statusCfg = { .minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) { + if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) { dError("failed to start dnode status worker since %s", terrstr()); return -1; } @@ -129,8 +129,8 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) { } void dmStopWorker(SDnodeMgmt *pMgmt) { - tQWorkerAllCleanup(&pMgmt->mgmtWorker); - tQWorkerAllCleanup(&pMgmt->statusWorker); + tSingleWorkerCleanup(&pMgmt->mgmtWorker); + tSingleWorkerCleanup(&pMgmt->statusWorker); if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); @@ -139,7 +139,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { } int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SQWorkerAll *pWorker = &pMgmt->mgmtWorker; + SSingleWorker *pWorker = &pMgmt->mgmtWorker; if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) { pWorker = &pMgmt->statusWorker; } diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 06ed637791..d57088474f 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -28,9 +28,9 @@ typedef struct SMnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SQWorkerAll readWorker; - SQWorkerAll writeWorker; - SQWorkerAll syncWorker; + SSingleWorker readWorker; + SSingleWorker writeWorker; + SSingleWorker syncWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; int8_t selfIndex; diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 9f64bec7c5..b9a3a4f14c 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -44,7 +44,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) { +static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } @@ -61,7 +61,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } -static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) { +static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -91,19 +91,19 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) { dError("failed to start mnode-read worker since %s", terrstr()); return -1; } - if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start mnode-write worker since %s", terrstr()); return -1; } - if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->syncWorker, &cfg) != 0) { dError("failed to start mnode sync-worker since %s", terrstr()); return -1; } @@ -112,7 +112,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { } void mmStopWorker(SMnodeMgmt *pMgmt) { - tQWorkerAllCleanup(&pMgmt->readWorker); - tQWorkerAllCleanup(&pMgmt->writeWorker); - tQWorkerAllCleanup(&pMgmt->syncWorker); + tSingleWorkerCleanup(&pMgmt->readWorker); + tSingleWorkerCleanup(&pMgmt->writeWorker); + tSingleWorkerCleanup(&pMgmt->syncWorker); } diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index 3f1f62ebd5..52d23a445c 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -28,8 +28,8 @@ typedef struct SQnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SQWorkerAll queryWorker; - SQWorkerAll fetchWorker; + SSingleWorker queryWorker; + SSingleWorker fetchWorker; } SQnodeMgmt; // qmInt.c diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 325eec7631..fff469a902 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -49,7 +49,7 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t qmPutMsgToWorker(SQWorkerAll *pWorker, SNodeMsg *pMsg) { +static int32_t qmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } @@ -58,7 +58,7 @@ int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgTo int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); } -static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) { +static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -110,24 +110,24 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); int32_t maxQueryThreads = minQueryThreads; - SQWorkerAllCfg queryCfg = {.minNum = minQueryThreads, - .maxNum = maxQueryThreads, - .name = "qnode-query", - .fp = (FItem)qmProcessQueryQueue, - .param = pMgmt}; + SSingleWorkerCfg queryCfg = {.minNum = minQueryThreads, + .maxNum = maxQueryThreads, + .name = "qnode-query", + .fp = (FItem)qmProcessQueryQueue, + .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->queryWorker, &queryCfg) != 0) { + if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { dError("failed to start qnode-query worker since %s", terrstr()); return -1; } - SQWorkerAllCfg fetchCfg = {.minNum = minFetchThreads, - .maxNum = maxFetchThreads, - .name = "qnode-fetch", - .fp = (FItem)qmProcessFetchQueue, - .param = pMgmt}; + SSingleWorkerCfg fetchCfg = {.minNum = minFetchThreads, + .maxNum = maxFetchThreads, + .name = "qnode-fetch", + .fp = (FItem)qmProcessFetchQueue, + .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { + if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { dError("failed to start qnode-fetch worker since %s", terrstr()); return -1; } @@ -136,6 +136,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { } void qmStopWorker(SQnodeMgmt *pMgmt) { - tQWorkerAllCleanup(&pMgmt->queryWorker); - tQWorkerAllCleanup(&pMgmt->fetchWorker); + tSingleWorkerCleanup(&pMgmt->queryWorker); + tSingleWorkerCleanup(&pMgmt->fetchWorker); } diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index 744089efae..f2b510483c 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -30,8 +30,8 @@ typedef struct SSnodeMgmt { const char *path; SRWLatch latch; int8_t uniqueWorkerInUse; - SArray *uniqueWorkers; // SArray - SQWorkerAll sharedWorker; + SArray *uniqueWorkers; // SArray + SSingleWorker sharedWorker; } SSnodeMgmt; // smInt.c diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index c008d8175f..ceec6b82c3 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -44,22 +44,22 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } int32_t smStartWorker(SSnodeMgmt *pMgmt) { - pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SWWorkerAll *)); + pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SMultiWorker *)); if (pMgmt->uniqueWorkers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) { - SWWorkerAll *pUniqueWorker = malloc(sizeof(SWWorkerAll)); + SMultiWorker *pUniqueWorker = malloc(sizeof(SMultiWorker)); if (pUniqueWorker == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; - if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) { + if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { dError("failed to start snode-unique worker since %s", terrstr()); return -1; } @@ -69,13 +69,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } } - SQWorkerAllCfg cfg = {.minNum = SND_SHARED_THREAD_NUM, - .maxNum = SND_SHARED_THREAD_NUM, - .name = "snode-shared", - .fp = (FItem)smProcessSharedQueue, - .param = pMgmt}; + SSingleWorkerCfg cfg = {.minNum = SND_SHARED_THREAD_NUM, + .maxNum = SND_SHARED_THREAD_NUM, + .name = "snode-shared", + .fp = (FItem)smProcessSharedQueue, + .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->sharedWorker, &cfg)) { + if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) { dError("failed to start snode shared-worker since %s", terrstr()); return -1; } @@ -85,11 +85,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { void smStopWorker(SSnodeMgmt *pMgmt) { for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { - SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); - tWWorkerAllCleanup(pWorker); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); + tMultiWorkerCleanup(pWorker); } taosArrayDestroy(pMgmt->uniqueWorkers); - tQWorkerAllCleanup(&pMgmt->sharedWorker); + tSingleWorkerCleanup(&pMgmt->sharedWorker); } static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { @@ -105,7 +105,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { } int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -116,8 +116,8 @@ int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); - SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); + int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -128,7 +128,7 @@ int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SQWorkerAll *pWorker = &pMgmt->sharedWorker; + SSingleWorker *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index ce1a004b65..ccdb1ae257 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -36,7 +36,7 @@ typedef struct SVnodesMgmt { const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; - SQWorkerAll mgmtWorker; + SSingleWorker mgmtWorker; } SVnodesMgmt; typedef struct { diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 73aff7f66d..6c7d513c58 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -200,24 +200,16 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp return code; } -int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); -} +int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } -int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); -} +int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } -int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); -} +int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); } -int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); -} +int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SQWorkerAll *pWorker = &pMgmt->mgmtWorker; + SSingleWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } @@ -357,9 +349,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = maxSyncThreads; if (tWWorkerInit(pWPool) != 0) return -1; - SQWorkerAllCfg cfg = { + SSingleWorkerCfg cfg = { .minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->mgmtWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; } @@ -369,7 +361,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { } void vmStopWorker(SVnodesMgmt *pMgmt) { - tQWorkerAllCleanup(&pMgmt->mgmtWorker); + tSingleWorkerCleanup(&pMgmt->mgmtWorker); tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index f62a63132e..044896d7a5 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -284,7 +284,7 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { uDebug("worker:%s, queue:%p is freed", pool->name, queue); } -int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) { +int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) { SQWorkerPool *pPool = &pWorker->pool; pPool->name = pCfg->name; pPool->min = pCfg->minNum; @@ -302,7 +302,7 @@ int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) { return 0; } -void tQWorkerAllCleanup(SQWorkerAll *pWorker) { +void tSingleWorkerCleanup(SSingleWorker *pWorker) { if (pWorker->queue == NULL) return; while (!taosQueueEmpty(pWorker->queue)) { @@ -313,7 +313,7 @@ void tQWorkerAllCleanup(SQWorkerAll *pWorker) { tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); } -int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) { +int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) { SWWorkerPool *pPool = &pWorker->pool; pPool->name = pCfg->name; pPool->max = pCfg->maxNum; @@ -330,7 +330,7 @@ int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) { return 0; } -void tWWorkerAllCleanup(SWWorkerAll *pWorker) { +void tMultiWorkerCleanup(SMultiWorker *pWorker) { if (pWorker->queue == NULL) return; while (!taosQueueEmpty(pWorker->queue)) { -- GitLab