diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index a0c23689d0b22ee063f659b201f37814d5db9815..3706f3b6b39dce9d098ece9ff0058f0b46fc69b0 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -39,6 +39,15 @@ #define MPEER_CONTENT_LEN 2000 +typedef struct { + pthread_t thread; + int32_t threadIndex; + int32_t failed; + int32_t opened; + int32_t vnodeNum; + int32_t * vnodeList; +} SOpenVnodeThread; + void * tsDnodeTmr = NULL; static void * tsStatusTimer = NULL; static uint32_t tsRebootTime; @@ -242,28 +251,85 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { return TSDB_CODE_SUCCESS; } -static int32_t dnodeOpenVnodes() { +static void *dnodeOpenVnode(void *param) { + SOpenVnodeThread *pThread = param; char vnodeDir[TSDB_FILENAME_LEN * 3]; - int32_t failed = 0; - int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES); - int32_t numOfVnodes; - int32_t status; - status = dnodeGetVnodeList(vnodeList, &numOfVnodes); + dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); + + for (int32_t v = 0; v < pThread->vnodeNum; ++v) { + int32_t vgId = pThread->vnodeList[v]; + snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId); + if (vnodeOpen(vgId, vnodeDir) < 0) { + dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex); + pThread->failed++; + } else { + dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex); + pThread->opened++; + } + } + + dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, + pThread->failed); + return NULL; +} + +static int32_t dnodeOpenVnodes() { + int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t)); + int32_t numOfVnodes; + int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes); if (status != TSDB_CODE_SUCCESS) { - dInfo("Get dnode list failed"); + dInfo("get dnode list failed"); free(vnodeList); return status; } - for (int32_t i = 0; i < numOfVnodes; ++i) { - snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]); - if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++; + int32_t threadNum = tsNumOfCores; + int32_t vnodesPerThread = numOfVnodes / threadNum + 1; + SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread)); + for (int32_t t = 0; t < threadNum; ++t) { + threads[t].threadIndex = t; + threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t)); + } + + for (int32_t v = 0; v < numOfVnodes; ++v) { + int32_t t = v % threadNum; + SOpenVnodeThread *pThread = &threads[t]; + pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v]; + } + + dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes); + + for (int32_t t = 0; t < threadNum; ++t) { + SOpenVnodeThread *pThread = &threads[t]; + if (pThread->vnodeNum == 0) continue; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) { + dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + } + + int32_t openVnodes = 0; + int32_t failedVnodes = 0; + for (int32_t t = 0; t < threadNum; ++t) { + SOpenVnodeThread *pThread = &threads[t]; + if (pThread->vnodeNum > 0 && pThread->thread) { + pthread_join(pThread->thread, NULL); + } + openVnodes += pThread->opened; + failedVnodes += pThread->failed; + free(pThread->vnodeList); } free(vnodeList); - dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed); + dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes); + return TSDB_CODE_SUCCESS; } @@ -273,7 +339,7 @@ void dnodeStartStream() { int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes); if (status != TSDB_CODE_SUCCESS) { - dInfo("Get dnode list failed"); + dInfo("get dnode list failed"); return; } @@ -292,7 +358,7 @@ static void dnodeCloseVnodes() { status = dnodeGetVnodeList(vnodeList, &numOfVnodes); if (status != TSDB_CODE_SUCCESS) { - dInfo("Get dnode list failed"); + dInfo("get dnode list failed"); free(vnodeList); return; } diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 19be83f36b3864a0a8571a6b16947be0e622394f..6bbb291b6aba169710d2e564489d5bda220d6e02 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -36,6 +36,7 @@ typedef struct { int32_t min; // min number of workers int32_t num; // current number of workers SReadWorker *readWorker; + pthread_mutex_t mutex; } SReadWorkerPool; static void *dnodeProcessReadQueue(void *param); @@ -51,27 +52,28 @@ int32_t dnodeInitVnodeRead() { readPool.min = 2; readPool.max = tsNumOfCores * tsNumOfThreadsPerCore; if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min; - readPool.readWorker = (SReadWorker *) calloc(sizeof(SReadWorker), readPool.max); + readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max); + pthread_mutex_init(&readPool.mutex, NULL); if (readPool.readWorker == NULL) return -1; - for (int i=0; i < readPool.max; ++i) { + for (int i = 0; i < readPool.max; ++i) { SReadWorker *pWorker = readPool.readWorker + i; pWorker->workerId = i; } - dInfo("dnode read is opened"); + dInfo("dnode read is opened, min worker:%d max worker:%d", readPool.min, readPool.max); return 0; } void dnodeCleanupVnodeRead() { - for (int i=0; i < readPool.max; ++i) { + for (int i = 0; i < readPool.max; ++i) { SReadWorker *pWorker = readPool.readWorker + i; if (pWorker->thread) { taosQsetThreadResume(readQset); } } - for (int i=0; i < readPool.max; ++i) { + for (int i = 0; i < readPool.max; ++i) { SReadWorker *pWorker = readPool.readWorker + i; if (pWorker->thread) { pthread_join(pWorker->thread, NULL); @@ -80,6 +82,7 @@ void dnodeCleanupVnodeRead() { free(readPool.readWorker); taosCloseQset(readQset); + pthread_mutex_destroy(&readPool.mutex); dInfo("dnode read is closed"); } @@ -136,8 +139,12 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { } void *dnodeAllocateVnodeRqueue(void *pVnode) { + pthread_mutex_lock(&readPool.mutex); taos_queue queue = taosOpenQueue(); - if (queue == NULL) return NULL; + if (queue == NULL) { + pthread_mutex_unlock(&readPool.mutex); + return NULL; + } taosAddIntoQset(readQset, queue, pVnode); @@ -160,6 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) { } while (readPool.num < readPool.min); } + pthread_mutex_unlock(&readPool.mutex); dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue); return queue; diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 6de6fceb2fd4a17c14459a03dfd05eb7cbada8ee..b20e6c9749bedbef99777b1ff2d8e6c3372b3706 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -47,6 +47,7 @@ typedef struct { int32_t max; // max number of workers int32_t nextId; // from 0 to max-1, cyclic SWriteWorker *writeWorker; + pthread_mutex_t mutex; } SWriteWorkerPool; static void *dnodeProcessWriteQueue(void *param); @@ -58,25 +59,26 @@ int32_t dnodeInitVnodeWrite() { wWorkerPool.max = tsNumOfCores; wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); if (wWorkerPool.writeWorker == NULL) return -1; + pthread_mutex_init(&wWorkerPool.mutex, NULL); for (int32_t i = 0; i < wWorkerPool.max; ++i) { wWorkerPool.writeWorker[i].workerId = i; } - dInfo("dnode write is opened"); + dInfo("dnode write is opened, max worker %d", wWorkerPool.max); return 0; } void dnodeCleanupVnodeWrite() { for (int32_t i = 0; i < wWorkerPool.max; ++i) { - SWriteWorker *pWorker = wWorkerPool.writeWorker + i; + SWriteWorker *pWorker = wWorkerPool.writeWorker + i; if (pWorker->thread) { taosQsetThreadResume(pWorker->qset); } } - + for (int32_t i = 0; i < wWorkerPool.max; ++i) { - SWriteWorker *pWorker = wWorkerPool.writeWorker + i; + SWriteWorker *pWorker = wWorkerPool.writeWorker + i; if (pWorker->thread) { pthread_join(pWorker->thread, NULL); taosFreeQall(pWorker->qall); @@ -84,6 +86,7 @@ void dnodeCleanupVnodeWrite() { } } + pthread_mutex_destroy(&wWorkerPool.mutex); free(wWorkerPool.writeWorker); dInfo("dnode write is closed"); } @@ -124,14 +127,19 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { } void *dnodeAllocateVnodeWqueue(void *pVnode) { + pthread_mutex_lock(&wWorkerPool.mutex); SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; void *queue = taosOpenQueue(); - if (queue == NULL) return NULL; + if (queue == NULL) { + pthread_mutex_unlock(&wWorkerPool.mutex); + return NULL; + } if (pWorker->qset == NULL) { pWorker->qset = taosOpenQset(); if (pWorker->qset == NULL) { taosCloseQueue(queue); + pthread_mutex_unlock(&wWorkerPool.mutex); return NULL; } @@ -140,6 +148,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { if (pWorker->qall == NULL) { taosCloseQset(pWorker->qset); taosCloseQueue(queue); + pthread_mutex_unlock(&wWorkerPool.mutex); return NULL; } pthread_attr_t thAttr; @@ -163,6 +172,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } + pthread_mutex_unlock(&wWorkerPool.mutex); dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue); return queue; @@ -201,6 +211,8 @@ static void *dnodeProcessWriteQueue(void *param) { int type; void *pVnode, *item; + dDebug("write worker:%d is running", pWorker->workerId); + while (1) { numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); if (numOfMsgs == 0) { diff --git a/tests/script/test.sh b/tests/script/test.sh index cba0a0fe4bc43143a61b8ef1994022b6cd13cccf..d9a738749f6223bc7f2ec76384bf49fa0c19a98e 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -112,7 +112,7 @@ echo "numOfLogLines 100000000" >> $TAOS_CFG echo "dDebugFlag 135" >> $TAOS_CFG echo "mDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 135" >> $TAOS_CFG -echo "rpcDebugFlag 143" >> $TAOS_CFG +echo "rpcDebugFlag 135" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG echo "httpDebugFlag 135" >> $TAOS_CFG