diff --git a/include/util/tworker.h b/include/util/tworker.h index 27f03bd2b69e6d6add97cdc2e5ab05ad2fdd5eca..6c81565a6a56446c8a33f068973314bccccf4531 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -15,57 +15,74 @@ #ifndef _TD_UTIL_WORKER_H #define _TD_UTIL_WORKER_H - +#include "os.h" #include "tqueue.h" #ifdef __cplusplus extern "C" { #endif -typedef struct SWorkerPool SWorkerPool; -typedef struct SMWorkerPool SMWorkerPool; +typedef struct SQWorkerPool SQWorkerPool; +typedef struct SFWorkerPool SFWorkerPool; +typedef struct SWWorkerPool SWWorkerPool; + +typedef struct SQWorker { + int32_t id; // worker ID + pthread_t thread; // thread + SQWorkerPool *pool; +} SQWorker; -typedef struct SWorker { - int32_t id; // worker ID - pthread_t thread; // thread - SWorkerPool *pool; -} SWorker; +typedef struct SQWorkerPool { + int32_t max; // max number of workers + int32_t min; // min number of workers + int32_t num; // current number of workers + STaosQset * qset; + const char * name; + SQWorker * workers; + pthread_mutex_t mutex; +} SQWorkerPool; + +typedef struct SFWorker { + int32_t id; // worker ID + pthread_t thread; // thread + SFWorkerPool *pool; +} SFWorker; -typedef struct SWorkerPool { +typedef struct SFWorkerPool { int32_t max; // max number of workers int32_t min; // min number of workers int32_t num; // current number of workers - STaosQset *qset; - const char *name; - SWorker *workers; + STaosQset * qset; + const char * name; + SFWorker * workers; pthread_mutex_t mutex; -} SWorkerPool; +} SFWorkerPool; -typedef struct SMWorker { +typedef struct SWWorker { int32_t id; // worker id pthread_t thread; // thread - STaosQall *qall; - STaosQset *qset; // queue set - SMWorkerPool *pool; -} SMWorker; + STaosQall * qall; + STaosQset * qset; // queue set + SWWorkerPool *pool; +} SWWorker; -typedef struct SMWorkerPool { +typedef struct SWWorkerPool { int32_t max; // max number of workers int32_t nextId; // from 0 to max-1, cyclic - const char *name; - SMWorker *workers; + const char * name; + SWWorker * workers; pthread_mutex_t mutex; -} SMWorkerPool; +} SWWorkerPool; -int32_t tWorkerInit(SWorkerPool *pool); -void tWorkerCleanup(SWorkerPool *pool); -STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); -void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue); +int32_t tQWorkerInit(SQWorkerPool *pool); +void tQWorkerCleanup(SQWorkerPool *pool); +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp); +void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue); -int32_t tMWorkerInit(SMWorkerPool *pool); -void tMWorkerCleanup(SMWorkerPool *pool); -STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); -void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue); +int32_t tWWorkerInit(SWWorkerPool *pool); +void tWWorkerCleanup(SWWorkerPool *pool); +STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp); +void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 7ab3f46fdbe36c0dbd7c85b079c45ceb68825ee0..25c04ff6f83eaf29c5cb01a3a131d93388a607ac 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -31,8 +31,8 @@ typedef struct { SDnode *pDnode; STaosQueue *queue; union { - SWorkerPool pool; - SMWorkerPool mpool; + SQWorkerPool pool; + SWWorkerPool mpool; }; } SDnodeWorker; @@ -109,10 +109,10 @@ typedef struct { int32_t openVnodes; int32_t totalVnodes; SRWLatch latch; - SWorkerPool queryPool; - SWorkerPool fetchPool; - SMWorkerPool syncPool; - SMWorkerPool writePool; + SQWorkerPool queryPool; + SQWorkerPool fetchPool; + SWWorkerPool syncPool; + SWWorkerPool writePool; } SVnodesMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index f487859fd6e0b02e5a21785f8401a0cc88b7b8c9..8d6fe3707b8e4d4892b2ee740bb1942a9e9d4bca 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -910,27 +910,27 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) { int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1); int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1); - SWorkerPool *pPool = &pMgmt->queryPool; + SQWorkerPool *pPool = &pMgmt->queryPool; pPool->name = "vnode-query"; pPool->min = minQueryThreads; pPool->max = maxQueryThreads; - if (tWorkerInit(pPool) != 0) return -1; + if (tQWorkerInit(pPool) != 0) return -1; pPool = &pMgmt->fetchPool; pPool->name = "vnode-fetch"; pPool->min = minFetchThreads; pPool->max = maxFetchThreads; - if (tWorkerInit(pPool) != 0) return -1; + if (tQWorkerInit(pPool) != 0) return -1; - SMWorkerPool *pMPool = &pMgmt->writePool; + SWWorkerPool *pMPool = &pMgmt->writePool; pMPool->name = "vnode-write"; pMPool->max = maxWriteThreads; - if (tMWorkerInit(pMPool) != 0) return -1; + if (tWWorkerInit(pMPool) != 0) return -1; pMPool = &pMgmt->syncPool; pMPool->name = "vnode-sync"; pMPool->max = maxSyncThreads; - if (tMWorkerInit(pMPool) != 0) return -1; + if (tWWorkerInit(pMPool) != 0) return -1; dDebug("vnode workers is initialized"); return 0; @@ -938,21 +938,21 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) { static void dndCleanupVnodeWorkers(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerCleanup(&pMgmt->fetchPool); - tWorkerCleanup(&pMgmt->queryPool); - tMWorkerCleanup(&pMgmt->writePool); - tMWorkerCleanup(&pMgmt->syncPool); + tQWorkerCleanup(&pMgmt->fetchPool); + tQWorkerCleanup(&pMgmt->queryPool); + tWWorkerCleanup(&pMgmt->writePool); + tWWorkerCleanup(&pMgmt->syncPool); dDebug("vnode workers is closed"); } static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); - pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); - pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); - pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); - pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); + pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); + pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); + pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); + pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || pVnode->pQueryQ == NULL) { @@ -965,11 +965,11 @@ static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); - tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); + tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); + tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); pVnode->pWriteQ = NULL; pVnode->pApplyQ = NULL; pVnode->pSyncQ = NULL; diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index e0db262f89b9cc3181174b0c3341757f23b313bb..42ef76bb43fd58e79ceca05aebdc1bf977619749 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -31,28 +31,28 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c pWorker->pDnode = pDnode; if (pWorker->type == DND_WORKER_SINGLE) { - SWorkerPool *pPool = &pWorker->pool; + SQWorkerPool *pPool = &pWorker->pool; pPool->name = name; pPool->min = minNum; pPool->max = maxNum; - if (tWorkerInit(pPool) != 0) { + if (tQWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); + pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } } else if (pWorker->type == DND_WORKER_MULTI) { - SMWorkerPool *pPool = &pWorker->mpool; + SWWorkerPool *pPool = &pWorker->mpool; pPool->name = name; pPool->max = maxNum; - if (tMWorkerInit(pPool) != 0) { + if (tWWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tMWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); + pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -70,11 +70,11 @@ void dndCleanupWorker(SDnodeWorker *pWorker) { } if (pWorker->type == DND_WORKER_SINGLE) { - tWorkerCleanup(&pWorker->pool); - tWorkerFreeQueue(&pWorker->pool, pWorker->queue); + tQWorkerCleanup(&pWorker->pool); + tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); } else if (pWorker->type == DND_WORKER_MULTI) { - tMWorkerCleanup(&pWorker->mpool); - tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue); + tWWorkerCleanup(&pWorker->mpool); + tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue); } else { } } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index ed74041712ce0022eb66ac5c9331b88fda961222..3fe087a20d3da050d2bff0d97c173799a063a6ee 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -14,38 +14,39 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "ulog.h" -#include "tqueue.h" #include "tworker.h" +#include "ulog.h" -typedef void* (*ThreadFp)(void *param); +typedef void *(*ThreadFp)(void *param); -int32_t tWorkerInit(SWorkerPool *pool) { +int32_t tQWorkerInit(SQWorkerPool *pool) { pool->qset = taosOpenQset(); - pool->workers = calloc(sizeof(SWorker), pool->max); - pthread_mutex_init(&pool->mutex, NULL); - for (int i = 0; i < pool->max; ++i) { - SWorker *worker = pool->workers + i; + pool->workers = calloc(sizeof(SQWorker), pool->max); + if (pthread_mutex_init(&pool->mutex, NULL)) { + return -1; + } + + for (int32_t i = 0; i < pool->max; ++i) { + SQWorker *worker = pool->workers + i; worker->id = i; worker->pool = pool; } - uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); + uDebug("qworker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); return 0; } -void tWorkerCleanup(SWorkerPool *pool) { - for (int i = 0; i < pool->max; ++i) { - SWorker *worker = pool->workers + i; +void tQWorkerCleanup(SQWorkerPool *pool) { + for (int32_t i = 0; i < pool->max; ++i) { + SQWorker *worker = pool->workers + i; if (worker == NULL) continue; if (taosCheckPthreadValid(worker->thread)) { taosQsetThreadResume(pool->qset); } } - for (int i = 0; i < pool->max; ++i) { - SWorker *worker = pool->workers + i; + for (int32_t i = 0; i < pool->max; ++i) { + SQWorker *worker = pool->workers + i; if (worker == NULL) continue; if (taosCheckPthreadValid(worker->thread)) { pthread_join(worker->thread, NULL); @@ -56,28 +57,28 @@ void tWorkerCleanup(SWorkerPool *pool) { taosCloseQset(pool->qset); pthread_mutex_destroy(&pool->mutex); - uInfo("worker:%s is closed", pool->name); + uDebug("qworker:%s is closed", pool->name); } -static void *tWorkerThreadFp(SWorker *worker) { - SWorkerPool *pool = worker->pool; - FProcessItem fp = NULL; +static void *tQWorkerThreadFp(SQWorker *worker) { + SQWorkerPool *pool = worker->pool; + FProcessItem fp = NULL; - void *msg = NULL; - void *ahandle = NULL; + void * msg = NULL; + void * ahandle = NULL; int32_t code = 0; taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("worker:%s:%d is running", pool->name, worker->id); + uDebug("qworker:%s:%d is running", pool->name, worker->id); while (1) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) { - uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); + uDebug("qworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } - if (fp) { + if (fp != NULL) { (*fp)(ahandle, msg); } } @@ -85,7 +86,7 @@ static void *tWorkerThreadFp(SWorker *worker) { return NULL; } -STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp) { pthread_mutex_lock(&pool->mutex); STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { @@ -99,61 +100,66 @@ STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) // spawn a thread to process queue if (pool->num < pool->max) { do { - SWorker *worker = pool->workers + pool->num; + SQWorker *worker = pool->workers + pool->num; pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWorkerThreadFp, worker) != 0) { - uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { + uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + taosCloseQueue(queue); + queue = NULL; + break; } pthread_attr_destroy(&thAttr); pool->num++; - uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); + uDebug("qworker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); } while (pool->num < pool->min); } pthread_mutex_unlock(&pool->mutex); - uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + uDebug("qworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); return queue; } -void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue) { +void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); - uDebug("worker:%s, queue:%p is freed", pool->name, queue); + uDebug("qworker:%s, queue:%p is freed", pool->name, queue); } -int32_t tMWorkerInit(SMWorkerPool *pool) { +int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; - pool->workers = calloc(sizeof(SMWorker), pool->max); + pool->workers = calloc(sizeof(SWWorker), pool->max); if (pool->workers == NULL) return -1; pthread_mutex_init(&pool->mutex, NULL); for (int32_t i = 0; i < pool->max; ++i) { - SMWorker *worker = pool->workers + i; + SWWorker *worker = pool->workers + i; worker->id = i; worker->qall = NULL; worker->qset = NULL; worker->pool = pool; } - uInfo("worker:%s is initialized, max:%d", pool->name, pool->max); + uInfo("wworker:%s is initialized, max:%d", pool->name, pool->max); return 0; } -void tMWorkerCleanup(SMWorkerPool *pool) { +void tWWorkerCleanup(SWWorkerPool *pool) { for (int32_t i = 0; i < pool->max; ++i) { - SMWorker *worker = pool->workers + i; + SWWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { - if (worker->qset) taosQsetThreadResume(worker->qset); + if (worker->qset) { + taosQsetThreadResume(worker->qset); + } } } for (int32_t i = 0; i < pool->max; ++i) { - SMWorker *worker = pool->workers + i; + SWWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { pthread_join(worker->thread, NULL); taosFreeQall(worker->qall); @@ -164,30 +170,30 @@ void tMWorkerCleanup(SMWorkerPool *pool) { tfree(pool->workers); pthread_mutex_destroy(&pool->mutex); - uInfo("worker:%s is closed", pool->name); + uInfo("wworker:%s is closed", pool->name); } -static void *tWriteWorkerThreadFp(SMWorker *worker) { - SMWorkerPool *pool = worker->pool; +static void *tWriteWorkerThreadFp(SWWorker *worker) { + SWWorkerPool *pool = worker->pool; FProcessItems fp = NULL; - void *msg = NULL; - void *ahandle = NULL; + void * msg = NULL; + void * ahandle = NULL; int32_t numOfMsgs = 0; int32_t qtype = 0; taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("worker:%s:%d is running", pool->name, worker->id); + uDebug("wworker:%s:%d is running", pool->name, worker->id); while (1) { numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp); if (numOfMsgs == 0) { - uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); + uDebug("wworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); break; } - if (fp) { + if (fp != NULL) { (*fp)(ahandle, worker->qall, numOfMsgs); } } @@ -195,9 +201,9 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { return NULL; } -STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { +STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp) { pthread_mutex_lock(&pool->mutex); - SMWorker *worker = pool->workers + pool->nextId; + SWWorker *worker = pool->workers + pool->nextId; STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { @@ -228,13 +234,13 @@ STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) { - uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + uError("wworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosFreeQall(worker->qall); taosCloseQset(worker->qset); taosCloseQueue(queue); queue = NULL; } else { - uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); + uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); pool->nextId = (pool->nextId + 1) % pool->max; } @@ -245,12 +251,12 @@ STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems } pthread_mutex_unlock(&pool->mutex); - uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + uDebug("wworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); return queue; } -void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue) { +void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); - uDebug("worker:%s, queue:%p is freed", pool->name, queue); + uDebug("wworker:%s, queue:%p is freed", pool->name, queue); }