From 35a071b9c3d861b3c98e1ca233be40f001c4c4cd Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 19 Oct 2022 12:13:13 +0800 Subject: [PATCH] enh: close vnode with multiple threads --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 69 ++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index bcc2e358d6..f825407b45 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -217,17 +217,80 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { } } +static void *vmCloseVnodeInThread(void *param) { + SVnodeThread *pThread = param; + SVnodeMgmt *pMgmt = pThread->pMgmt; + + dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum); + setThreadName("close-vnodes"); + + for (int32_t v = 0; v < pThread->vnodeNum; ++v) { + SVnodeObj *pVnode = pThread->ppVnodes[v]; + + char stepDesc[TSDB_STEP_DESC_LEN] = {0}; + snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId, + pMgmt->state.openVnodes, pMgmt->state.totalVnodes); + tmsgReportStartup("vnode-close", stepDesc); + + vmCloseVnode(pMgmt, pVnode); + } + + dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum); + return NULL; +} + static void vmCloseVnodes(SVnodeMgmt *pMgmt) { dInfo("start to close all vnodes"); int32_t numOfVnodes = 0; SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); - for (int32_t i = 0; i < numOfVnodes; ++i) { - if (ppVnodes == NULL || ppVnodes[i] == NULL) continue; - vmCloseVnode(pMgmt, ppVnodes[i]); + int32_t threadNum = tsNumOfCores / 2; + if (threadNum < 1) threadNum = 1; + int32_t vnodesPerThread = numOfVnodes / threadNum + 1; + + SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); + for (int32_t t = 0; t < threadNum; ++t) { + threads[t].threadIndex = t; + threads[t].pMgmt = pMgmt; + threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *)); + } + + for (int32_t v = 0; v < numOfVnodes; ++v) { + int32_t t = v % threadNum; + SVnodeThread *pThread = &threads[t]; + if (pThread->ppVnodes != NULL && ppVnodes != NULL) { + pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v]; + } } + pMgmt->state.openVnodes = 0; + dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum); + + for (int32_t t = 0; t < threadNum; ++t) { + SVnodeThread *pThread = &threads[t]; + if (pThread->vnodeNum == 0) continue; + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) { + dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno)); + } + + taosThreadAttrDestroy(&thAttr); + } + + for (int32_t t = 0; t < threadNum; ++t) { + SVnodeThread *pThread = &threads[t]; + if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { + taosThreadJoin(pThread->thread, NULL); + taosThreadClear(&pThread->thread); + } + taosMemoryFree(pThread->ppVnodes); + } + taosMemoryFree(threads); + if (ppVnodes != NULL) { taosMemoryFree(ppVnodes); } -- GitLab