From e01d598da973108f747a77386676faa4b9799bb1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 13 Jun 2023 17:16:06 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/vnd.h | 3 +- source/dnode/vnode/src/tsdb/dev/tsdbFS.c | 3 +- source/dnode/vnode/src/vnd/vnodeModule.c | 109 ++++++++++++----------- 3 files changed, 61 insertions(+), 54 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index a67f246e73..3cbc2d39f4 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -49,7 +49,8 @@ int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson); int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj); // vnodeModule.c -int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg); +int vnodeScheduleTask(int (*execute)(void*), void* arg); +int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg); // vnodeBufPool.c typedef struct SVBufPoolNode SVBufPoolNode; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index a39cb585e9..72f10743e6 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -16,6 +16,7 @@ #include "inc/tsdbFS.h" extern int vnodeScheduleTask(int (*execute)(void *), void *arg); +extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT #define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) @@ -612,7 +613,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { if (fobj->f->stt->nseg < fs->tsdb->pVnode->config.sttTrigger) continue; - code = vnodeScheduleTask(tsdbMerge, fs->tsdb); + code = vnodeScheduleTaskEx(1, tsdbMerge, fs->tsdb); TSDB_CHECK_CODE(code, lino, _exit); fs->mergeTaskOn = true; diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 782ffd788d..09d2e445c7 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -23,26 +23,24 @@ struct SVnodeTask { void* arg; }; -struct SVnodeGlobal { - int8_t init; - int8_t stop; +typedef struct { int nthreads; TdThread* threads; TdThreadMutex mutex; TdThreadCond hasTask; SVnodeTask queue; +} SVnodeThreadPool; + +struct SVnodeGlobal { + int8_t init; + int8_t stop; + SVnodeThreadPool tp[2]; }; struct SVnodeGlobal vnodeGlobal; static void* loop(void* arg); -static tsem_t canCommit = {0}; - -static void vnodeInitCommit() { tsem_init(&canCommit, 0, 4); }; -void vnode_wait_commit() { tsem_wait(&canCommit); } -void vnode_done_commit() { tsem_wait(&canCommit); } - int vnodeInit(int nthreads) { int8_t init; int ret; @@ -51,28 +49,30 @@ int vnodeInit(int nthreads) { if (init) { return 0; } + vnodeGlobal.stop = 0; - taosThreadMutexInit(&vnodeGlobal.mutex, NULL); - taosThreadCondInit(&vnodeGlobal.hasTask, NULL); + for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) { + taosThreadMutexInit(&vnodeGlobal.tp[i].mutex, NULL); + taosThreadCondInit(&vnodeGlobal.tp[i].hasTask, NULL); - taosThreadMutexLock(&vnodeGlobal.mutex); + taosThreadMutexLock(&vnodeGlobal.tp[i].mutex); - vnodeGlobal.stop = 0; - vnodeGlobal.queue.next = &vnodeGlobal.queue; - vnodeGlobal.queue.prev = &vnodeGlobal.queue; + vnodeGlobal.tp[i].queue.next = &vnodeGlobal.tp[i].queue; + vnodeGlobal.tp[i].queue.prev = &vnodeGlobal.tp[i].queue; - taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex)); - vnodeGlobal.nthreads = nthreads; - vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread)); - if (vnodeGlobal.threads == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - vError("failed to init vnode module since:%s", tstrerror(terrno)); - return -1; - } + vnodeGlobal.tp[i].nthreads = nthreads; + vnodeGlobal.tp[i].threads = taosMemoryCalloc(nthreads, sizeof(TdThread)); + if (vnodeGlobal.tp[i].threads == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + vError("failed to init vnode module since:%s", tstrerror(terrno)); + return -1; + } - for (int i = 0; i < nthreads; i++) { - taosThreadCreate(&(vnodeGlobal.threads[i]), NULL, loop, NULL); + for (int j = 0; j < nthreads; j++) { + taosThreadCreate(&(vnodeGlobal.tp[i].threads[j]), NULL, loop, &vnodeGlobal.tp[i]); + } } if (walInit() < 0) { @@ -92,27 +92,29 @@ void vnodeCleanup() { if (init == 0) return; // set stop - taosThreadMutexLock(&(vnodeGlobal.mutex)); vnodeGlobal.stop = 1; - taosThreadCondBroadcast(&(vnodeGlobal.hasTask)); - taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) { + taosThreadMutexLock(&(vnodeGlobal.tp[i].mutex)); + taosThreadCondBroadcast(&(vnodeGlobal.tp[i].hasTask)); + taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex)); + + // wait for threads + for (int j = 0; j < vnodeGlobal.tp[i].nthreads; j++) { + taosThreadJoin(vnodeGlobal.tp[i].threads[j], NULL); + } - // wait for threads - for (int i = 0; i < vnodeGlobal.nthreads; i++) { - taosThreadJoin(vnodeGlobal.threads[i], NULL); + // clear source + taosMemoryFreeClear(vnodeGlobal.tp[i].threads); + taosThreadCondDestroy(&(vnodeGlobal.tp[i].hasTask)); + taosThreadMutexDestroy(&(vnodeGlobal.tp[i].mutex)); } - // clear source - taosMemoryFreeClear(vnodeGlobal.threads); - taosThreadCondDestroy(&(vnodeGlobal.hasTask)); - taosThreadMutexDestroy(&(vnodeGlobal.mutex)); - walCleanUp(); tqCleanUp(); smaCleanUp(); } -int vnodeScheduleTask(int (*execute)(void*), void* arg) { +int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg) { SVnodeTask* pTask; ASSERT(!vnodeGlobal.stop); @@ -126,35 +128,38 @@ int vnodeScheduleTask(int (*execute)(void*), void* arg) { pTask->execute = execute; pTask->arg = arg; - taosThreadMutexLock(&(vnodeGlobal.mutex)); - pTask->next = &vnodeGlobal.queue; - pTask->prev = vnodeGlobal.queue.prev; - vnodeGlobal.queue.prev->next = pTask; - vnodeGlobal.queue.prev = pTask; - taosThreadCondSignal(&(vnodeGlobal.hasTask)); - taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + taosThreadMutexLock(&(vnodeGlobal.tp[tpid].mutex)); + pTask->next = &vnodeGlobal.tp[tpid].queue; + pTask->prev = vnodeGlobal.tp[tpid].queue.prev; + vnodeGlobal.tp[tpid].queue.prev->next = pTask; + vnodeGlobal.tp[tpid].queue.prev = pTask; + taosThreadCondSignal(&(vnodeGlobal.tp[tpid].hasTask)); + taosThreadMutexUnlock(&(vnodeGlobal.tp[tpid].mutex)); return 0; } +int vnodeScheduleTask(int (*execute)(void*), void* arg) { return vnodeScheduleTaskEx(0, execute, arg); } + /* ------------------------ STATIC METHODS ------------------------ */ static void* loop(void* arg) { - SVnodeTask* pTask; - int ret; + SVnodeThreadPool* tp = (SVnodeThreadPool*)arg; + SVnodeTask* pTask; + int ret; setThreadName("vnode-commit"); for (;;) { - taosThreadMutexLock(&(vnodeGlobal.mutex)); + taosThreadMutexLock(&(tp->mutex)); for (;;) { - pTask = vnodeGlobal.queue.next; - if (pTask == &vnodeGlobal.queue) { + pTask = tp->queue.next; + if (pTask == &tp->queue) { // no task if (vnodeGlobal.stop) { - taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + taosThreadMutexUnlock(&(tp->mutex)); return NULL; } else { - taosThreadCondWait(&(vnodeGlobal.hasTask), &(vnodeGlobal.mutex)); + taosThreadCondWait(&(tp->hasTask), &(tp->mutex)); } } else { // has task @@ -164,7 +169,7 @@ static void* loop(void* arg) { } } - taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + taosThreadMutexUnlock(&(tp->mutex)); pTask->execute(pTask->arg); taosMemoryFree(pTask); -- GitLab