diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index a0c23689d0b22ee063f659b201f37814d5db9815..37244b6cfd61b9f05fb86964af3f50ab8266da7c 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -39,6 +39,15 @@ #define MPEER_CONTENT_LEN 2000 +typedef struct { + pthread_t thread; + int32_t threadIndex; + int32_t failed; + int32_t opened; + int32_t vnodeNum; + int32_t * vnodeList; +} SOpenVnodeThread; + void * tsDnodeTmr = NULL; static void * tsStatusTimer = NULL; static uint32_t tsRebootTime; @@ -242,14 +251,33 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { return TSDB_CODE_SUCCESS; } -static int32_t dnodeOpenVnodes() { +static void *dnodeOpenVnode(void *param) { + SOpenVnodeThread *pThread = param; char vnodeDir[TSDB_FILENAME_LEN * 3]; - int32_t failed = 0; - int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES); - int32_t numOfVnodes; - int32_t status; - status = dnodeGetVnodeList(vnodeList, &numOfVnodes); + dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); + + for (int32_t v = 0; v < pThread->vnodeNum; ++v) { + int32_t vgId = pThread->vnodeList[v]; + snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId); + if (vnodeOpen(vgId, vnodeDir) < 0) { + dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex); + pThread->failed++; + } else { + dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex); + pThread->opened++; + } + } + + dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, + pThread->failed); + return NULL; +} + +static int32_t dnodeOpenVnodes() { + int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t)); + int32_t numOfVnodes; + int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes); if (status != TSDB_CODE_SUCCESS) { dInfo("Get dnode list failed"); @@ -257,13 +285,51 @@ static int32_t dnodeOpenVnodes() { return status; } - for (int32_t i = 0; i < numOfVnodes; ++i) { - snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]); - if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++; + int32_t threadNum = tsNumOfCores; + int32_t vnodesPerThread = numOfVnodes / threadNum + 1; + SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread)); + for (int32_t t = 0; t < threadNum; ++t) { + threads[t].threadIndex = t; + threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t)); + } + + for (int32_t v = 0; v < numOfVnodes; ++v) { + int32_t t = v % threadNum; + SOpenVnodeThread *pThread = &threads[t]; + pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v]; + } + + dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes); + + for (int32_t t = 0; t < threadNum; ++t) { + SOpenVnodeThread *pThread = &threads[t]; + if (pThread->vnodeNum == 0) continue; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) { + dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + } + + int32_t openVnodes = 0; + int32_t failedVnodes = 0; + for (int32_t t = 0; t < threadNum; ++t) { + SOpenVnodeThread *pThread = &threads[t]; + if (pThread->thread) { + pthread_join(pThread->thread, NULL); + } + openVnodes += pThread->opened; + failedVnodes += pThread->failed; + free(pThread->vnodeList); } free(vnodeList); - dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed); + dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes); + return TSDB_CODE_SUCCESS; }