From 943de8bc15dc000b851f7d99b995b63e6ac1b446 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 11 Jul 2022 17:52:30 +0800 Subject: [PATCH] enh: adjust vnode fetch queue number --- source/common/src/tglobal.c | 3 +- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 42 +++++++++++---------- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f19d17d034..7947624451 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -412,7 +412,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; - tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 1, 1); + tsNumOfVnodeFetchThreads = tsNumOfCores / 4; + tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 6f00767eb0..6fc0ab4e5d 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -31,7 +31,7 @@ typedef struct SVnodeMgmt { const char *path; const char *name; SQWorkerPool queryPool; - SQWorkerPool fetchPool; + SWWorkerPool fetchPool; SWWorkerPool syncPool; SWWorkerPool writePool; SWWorkerPool applyPool; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 1d795c74f2..e5b268a6a2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -81,21 +81,26 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SVnodeObj *pVnode = pInfo->ahandle; - const STraceId *trace = &pMsg->info.traceId; +static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SRpcMsg *pMsg = NULL; - dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg); - int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); - if (code != 0) { - if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr()); - vmSendRsp(pMsg, code); - } + for (int32_t i = 0; i < numOfMsgs; ++i) { + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + const STraceId *trace = &pMsg->info.traceId; + dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg); - dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); + int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); + if (code != 0) { + if (terrno != 0) code = terrno; + dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr()); + vmSendRsp(pMsg, code); + } + + dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } } static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { @@ -242,7 +247,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); - pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); + pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL || pVnode->pFetchQ == NULL) { @@ -259,7 +264,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pWriteQ = NULL; pVnode->pSyncQ = NULL; pVnode->pApplyQ = NULL; @@ -275,11 +280,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pQPool->max = tsNumOfVnodeQueryThreads; if (tQWorkerInit(pQPool) != 0) return -1; - SQWorkerPool *pFPool = &pMgmt->fetchPool; + SWWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; - pFPool->min = tsNumOfVnodeFetchThreads; pFPool->max = tsNumOfVnodeFetchThreads; - if (tQWorkerInit(pFPool) != 0) return -1; + if (tWWorkerInit(pFPool) != 0) return -1; SWWorkerPool *pWPool = &pMgmt->writePool; pWPool->name = "vnode-write"; @@ -325,6 +329,6 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { tWWorkerCleanup(&pMgmt->applyPool); tWWorkerCleanup(&pMgmt->syncPool); tQWorkerCleanup(&pMgmt->queryPool); - tQWorkerCleanup(&pMgmt->fetchPool); + tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); } -- GitLab