diff --git a/include/util/tqueue.h b/include/util/tqueue.h index a7a8b544398a8ca7c2d969c1851a3fc2a7474afb..cfa5a65c2a14543e6690fcf0b886d77696857cb0 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -41,13 +41,13 @@ shall be used to set up the protection. typedef struct STaosQueue STaosQueue; typedef struct STaosQset STaosQset; typedef struct STaosQall STaosQall; -typedef void (*FProcessItem)(void *ahandle, void *pItem); -typedef void (*FProcessItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); +typedef void (*FItem)(void *ahandle, void *pItem); +typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); -void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp); -void *taosAllocateQitem(int32_t size); +void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); +void * taosAllocateQitem(int32_t size); void taosFreeQitem(void *pItem); int32_t taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); @@ -67,8 +67,11 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); int32_t taosGetQueueNumber(STaosQset *qset); -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp); -int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp); +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp); +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); + +int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId); +void taosResetQsetThread(STaosQset *qset, void *pItem); int32_t taosGetQueueItemsNumber(STaosQueue *queue); int32_t taosGetQsetItemsNumber(STaosQset *qset); diff --git a/include/util/tworker.h b/include/util/tworker.h index bf002201b3e03970e0b2975359566cd733899fce..771c7c943359b08e664f3d7de45fe6a4ecf6ada3 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -22,14 +22,13 @@ extern "C" { #endif typedef struct SQWorkerPool SQWorkerPool; -typedef struct SFWorkerPool SFWorkerPool; typedef struct SWWorkerPool SWWorkerPool; typedef struct SQWorker { int32_t id; // worker ID pthread_t thread; // thread SQWorkerPool *pool; -} SQWorker; +} SQWorker, SFWorker; typedef struct SQWorkerPool { int32_t max; // max number of workers @@ -39,23 +38,7 @@ typedef struct SQWorkerPool { const char * name; SQWorker * workers; pthread_mutex_t mutex; -} SQWorkerPool; - -typedef struct SFWorker { - int32_t id; // worker ID - pthread_t thread; // thread - SFWorkerPool *pool; -} SFWorker; - -typedef struct SFWorkerPool { - int32_t max; // max number of workers - int32_t min; // min number of workers - int32_t num; // current number of workers - STaosQset * qset; - const char * name; - SFWorker * workers; - pthread_mutex_t mutex; -} SFWorkerPool; +} SQWorkerPool, SFWorkerPool; typedef struct SWWorker { int32_t id; // worker id @@ -75,12 +58,17 @@ typedef struct SWWorkerPool { int32_t tQWorkerInit(SQWorkerPool *pool); void tQWorkerCleanup(SQWorkerPool *pool); -STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp); +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp); void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue); +int32_t tFWorkerInit(SFWorkerPool *pool); +void tFWorkerCleanup(SFWorkerPool *pool); +STaosQueue *tFWorkerAllocQueue(SFWorkerPool *pool, void *ahandle, FItem fp); +void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue); + int32_t tWWorkerInit(SWWorkerPool *pool); void tWWorkerCleanup(SWWorkerPool *pool); -STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp); +STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); #ifdef __cplusplus diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 8d6fe3707b8e4d4892b2ee740bb1942a9e9d4bca..9b9269eb9cd8c15e1fa1e74879228549272509fe 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -253,7 +253,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_ } for (int32_t i = 0; i < vnodesNum; ++i) { - cJSON *vnode = cJSON_GetArrayItem(vnodes, i); + cJSON * vnode = cJSON_GetArrayItem(vnodes, i); SWrapperCfg *pCfg = &pCfgs[i]; cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); @@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) { dndReportStartup(pDnode, "open-vnodes", stepDesc); SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId}; - SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); + SVnode * pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -948,11 +948,11 @@ static void dndCleanupVnodeWorkers(SDnode *pDnode) { static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); - pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); - pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); - pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); - pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); + pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue); + pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue); + pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue); + pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || pVnode->pQueryQ == NULL) { diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index 42ef76bb43fd58e79ceca05aebdc1bf977619749..5ccf6640c04789911932a1aabd2717f7efd3762a 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -39,7 +39,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); + pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FItem)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -52,7 +52,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); + pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FItems)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 66c1332db90be2f851e64b019cfd9282b392f1b7..b50ad77b6ff5823f4bc9ce28e5cd78947e5e5d2c 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -22,19 +22,21 @@ typedef struct STaosQnode STaosQnode; typedef struct STaosQnode { STaosQnode *next; + STaosQueue *queue; char item[]; } STaosQnode; typedef struct STaosQueue { int32_t itemSize; int32_t numOfItems; + int32_t threadId; STaosQnode * head; STaosQnode * tail; STaosQueue * next; // for queue set STaosQset * qset; // for queue set void * ahandle; // for queue set - FProcessItem itemFp; - FProcessItems itemsFp; + FItem itemFp; + FItems itemsFp; pthread_mutex_t mutex; } STaosQueue; @@ -66,11 +68,12 @@ STaosQueue *taosOpenQueue() { return NULL; } + queue->threadId = -1; uDebug("queue:%p is opened", queue); return queue; } -void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) { +void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) { if (queue == NULL) return; queue->itemFp = itemFp; queue->itemsFp = itemsFp; @@ -135,12 +138,12 @@ void *taosAllocateQitem(int32_t size) { return (void *)pNode->item; } -void taosFreeQitem(void *param) { - if (param == NULL) return; +void taosFreeQitem(void *pItem) { + if (pItem == NULL) return; - char *temp = param; + char *temp = pItem; temp -= sizeof(STaosQnode); - uTrace("item:%p, node:%p is freed", param, temp); + uTrace("item:%p, node:%p is freed", pItem, temp); free(temp); } @@ -351,7 +354,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) { +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) { STaosQnode *pNode = NULL; int32_t code = 0; @@ -391,7 +394,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP return code; } -int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) { +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) { STaosQueue *queue; int32_t code = 0; @@ -432,6 +435,59 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand return code; } +int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) { + STaosQnode *pNode = NULL; + int32_t code = 0; + + tsem_wait(&qset->sem); + + pthread_mutex_lock(&qset->mutex); + + for (int32_t i = 0; i < qset->numOfQueues; ++i) { + if (qset->current == NULL) qset->current = qset->head; + STaosQueue *queue = qset->current; + if (queue) qset->current = queue->next; + if (queue == NULL) break; + if (queue->head == NULL) continue; + if (queue->threadId != -1 && queue->threadId != threadId) continue; + + pthread_mutex_lock(&queue->mutex); + + if (queue->head) { + pNode = queue->head; + pNode->queue = queue; + queue->threadId = threadId; + *ppItem = pNode->item; + + if (ahandle) *ahandle = queue->ahandle; + if (itemFp) *itemFp = queue->itemFp; + + queue->head = pNode->next; + if (queue->head == NULL) queue->tail = NULL; + queue->numOfItems--; + atomic_sub_fetch_32(&qset->numOfItems, 1); + code = 1; + uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); + } + + pthread_mutex_unlock(&queue->mutex); + if (pNode) break; + } + + pthread_mutex_unlock(&qset->mutex); + + return code; +} + +void taosResetQsetThread(STaosQset *qset, void *pItem) { + if (pItem == NULL) return; + STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); + + pthread_mutex_lock(&qset->mutex); + pNode->queue->threadId = -1; + pthread_mutex_unlock(&qset->mutex); +} + int32_t taosGetQueueItemsNumber(STaosQueue *queue) { if (!queue) return 0; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index fc0b6afb3b0732103941a3a72ec2b61f5ac8ac81..bc98a371fc4a47047ddddbb4325eef3fb506cb6d 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -39,7 +39,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { worker->pool = pool; } - uDebug("qworker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); + uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); return 0; } @@ -64,12 +64,12 @@ void tQWorkerCleanup(SQWorkerPool *pool) { taosCloseQset(pool->qset); pthread_mutex_destroy(&pool->mutex); - uDebug("qworker:%s is closed", pool->name); + uDebug("worker:%s is closed", pool->name); } static void *tQWorkerThreadFp(SQWorker *worker) { SQWorkerPool *pool = worker->pool; - FProcessItem fp = NULL; + FItem fp = NULL; void * msg = NULL; void * ahandle = NULL; @@ -77,11 +77,11 @@ static void *tQWorkerThreadFp(SQWorker *worker) { taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("qworker:%s:%d is running", pool->name, worker->id); + uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) { - uDebug("qworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); + uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } @@ -93,7 +93,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) { return NULL; } -STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp) { +STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) { pthread_mutex_lock(&pool->mutex); STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { @@ -114,8 +114,8 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { - uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + if (pthread_create(&worker->thread, &thAttr, threadFp, worker) != 0) { + uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosCloseQueue(queue); terrno = TSDB_CODE_OUT_OF_MEMORY; queue = NULL; @@ -124,21 +124,63 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f pthread_attr_destroy(&thAttr); pool->num++; - uDebug("qworker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); + uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); } while (pool->num < pool->min); } pthread_mutex_unlock(&pool->mutex); - uDebug("qworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); return queue; } +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { + return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp); +} + void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); - uDebug("qworker:%s, queue:%p is freed", pool->name, queue); + uDebug("worker:%s, queue:%p is freed", pool->name, queue); } +int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); } + +void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); } + +static void *tFWorkerThreadFp(SQWorker *worker) { + SQWorkerPool *pool = worker->pool; + FItem fp = NULL; + + void * msg = NULL; + void * ahandle = NULL; + int32_t code = 0; + + taosBlockSIGPIPE(); + setThreadName(pool->name); + uDebug("worker:%s:%d is running", pool->name, worker->id); + + while (1) { + if (taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id) == 0) { + uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); + break; + } + + if (fp != NULL) { + (*fp)(ahandle, msg); + } + + taosResetQsetThread(pool->qset, msg); + } + + return NULL; +} + +STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { + return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tFWorkerThreadFp); +} + +void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); } + int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; pool->workers = calloc(sizeof(SWWorker), pool->max); @@ -160,7 +202,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) { worker->pool = pool; } - uInfo("wworker:%s is initialized, max:%d", pool->name, pool->max); + uInfo("worker:%s is initialized, max:%d", pool->name, pool->max); return 0; } @@ -186,12 +228,12 @@ void tWWorkerCleanup(SWWorkerPool *pool) { tfree(pool->workers); pthread_mutex_destroy(&pool->mutex); - uInfo("wworker:%s is closed", pool->name); + uInfo("worker:%s is closed", pool->name); } static void *tWriteWorkerThreadFp(SWWorker *worker) { SWWorkerPool *pool = worker->pool; - FProcessItems fp = NULL; + FItems fp = NULL; void * msg = NULL; void * ahandle = NULL; @@ -200,12 +242,12 @@ static void *tWriteWorkerThreadFp(SWWorker *worker) { taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("wworker:%s:%d is running", pool->name, worker->id); + uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp); if (numOfMsgs == 0) { - uDebug("wworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); + uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); break; } @@ -217,7 +259,7 @@ static void *tWriteWorkerThreadFp(SWWorker *worker) { return NULL; } -STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp) { +STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { pthread_mutex_lock(&pool->mutex); SWWorker *worker = pool->workers + pool->nextId; @@ -252,14 +294,14 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) { - uError("wworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosFreeQall(worker->qall); taosCloseQset(worker->qset); taosCloseQueue(queue); terrno = TSDB_CODE_OUT_OF_MEMORY; queue = NULL; } else { - uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); + uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); pool->nextId = (pool->nextId + 1) % pool->max; } @@ -270,12 +312,12 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems } pthread_mutex_unlock(&pool->mutex); - uDebug("wworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); return queue; } void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); - uDebug("wworker:%s, queue:%p is freed", pool->name, queue); + uDebug("worker:%s, queue:%p is freed", pool->name, queue); }