From 8685fb79b7df3ee04be464aeadf825bdc43b1b6a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 25 Jan 2022 08:40:07 +0000 Subject: [PATCH] refact worker --- include/util/tqueue.h | 1 + include/util/tworker.h | 1 - source/util/src/tqueue.c | 59 ++++++++++++++++++++++----------------- source/util/src/tworker.c | 23 +++++++++++++-- 4 files changed, 56 insertions(+), 28 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 63ba460d39..a7a8b54439 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -15,6 +15,7 @@ #ifndef _TD_UTIL_QUEUE_H #define _TD_UTIL_QUEUE_H +#include "os.h" #ifdef __cplusplus extern "C" { diff --git a/include/util/tworker.h b/include/util/tworker.h index 6c81565a6a..bf002201b3 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -15,7 +15,6 @@ #ifndef _TD_UTIL_WORKER_H #define _TD_UTIL_WORKER_H -#include "os.h" #include "tqueue.h" #ifdef __cplusplus diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 5cb149d53c..66c1332db9 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -13,10 +13,9 @@ * along with this program. If not, see . */ -#include "os.h" - -#include "taoserror.h" +#define _DEFAULT_SOURCE #include "tqueue.h" +#include "taoserror.h" #include "ulog.h" typedef struct STaosQnode STaosQnode; @@ -29,19 +28,19 @@ typedef struct STaosQnode { typedef struct STaosQueue { int32_t itemSize; int32_t numOfItems; - STaosQnode *head; - STaosQnode *tail; - STaosQueue *next; // for queue set - STaosQset *qset; // for queue set - void *ahandle; // for queue set + STaosQnode * head; + STaosQnode * tail; + STaosQueue * next; // for queue set + STaosQset * qset; // for queue set + void * ahandle; // for queue set FProcessItem itemFp; FProcessItems itemsFp; pthread_mutex_t mutex; } STaosQueue; typedef struct STaosQset { - STaosQueue *head; - STaosQueue *current; + STaosQueue * head; + STaosQueue * current; pthread_mutex_t mutex; int32_t numOfQueues; int32_t numOfItems; @@ -56,15 +55,18 @@ typedef struct STaosQall { } STaosQall; STaosQueue *taosOpenQueue() { - STaosQueue *queue = calloc(sizeof(STaosQueue), 1); + STaosQueue *queue = calloc(1, sizeof(STaosQueue)); if (queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pthread_mutex_init(&queue->mutex, NULL); + if (pthread_mutex_init(&queue->mutex, NULL) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } - uTrace("queue:%p is opened", queue); + uDebug("queue:%p is opened", queue); return queue; } @@ -77,7 +79,7 @@ void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsF void taosCloseQueue(STaosQueue *queue) { if (queue == NULL) return; STaosQnode *pTemp; - STaosQset *qset; + STaosQset * qset; pthread_mutex_lock(&queue->mutex); STaosQnode *pNode = queue->head; @@ -85,7 +87,9 @@ void taosCloseQueue(STaosQueue *queue) { qset = queue->qset; pthread_mutex_unlock(&queue->mutex); - if (queue->qset) taosRemoveFromQset(qset, queue); + if (queue->qset) { + taosRemoveFromQset(qset, queue); + } while (pNode) { pTemp = pNode; @@ -96,7 +100,7 @@ void taosCloseQueue(STaosQueue *queue) { pthread_mutex_destroy(&queue->mutex); free(queue); - uTrace("queue:%p is closed", queue); + uDebug("queue:%p is closed", queue); } bool taosQueueEmpty(STaosQueue *queue) { @@ -120,9 +124,13 @@ int32_t taosQueueSize(STaosQueue *queue) { } void *taosAllocateQitem(int32_t size) { - STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); + STaosQnode *pNode = calloc(1, sizeof(STaosQnode) + size); + + if (pNode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } - if (pNode == NULL) return NULL; uTrace("item:%p, node:%p is allocated", pNode->item, pNode); return (void *)pNode->item; } @@ -130,7 +138,7 @@ void *taosAllocateQitem(int32_t size) { void taosFreeQitem(void *param) { if (param == NULL) return; - char *temp = (char *)param; + char *temp = param; temp -= sizeof(STaosQnode); uTrace("item:%p, node:%p is freed", param, temp); free(temp); @@ -175,7 +183,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; - uDebug("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); + uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -183,7 +191,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { return code; } -STaosQall *taosAllocateQall() { return calloc(sizeof(STaosQall), 1); } +STaosQall *taosAllocateQall() { return calloc(1, sizeof(STaosQall)); } void taosFreeQall(STaosQall *qall) { free(qall); } @@ -238,7 +246,7 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) { void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } STaosQset *taosOpenQset() { - STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); + STaosQset *qset = calloc(sizeof(STaosQset), 1); if (qset == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -247,7 +255,7 @@ STaosQset *taosOpenQset() { pthread_mutex_init(&qset->mutex, NULL); tsem_init(&qset->sem, 0, 0); - uTrace("qset:%p is opened", qset); + uDebug("qset:%p is opened", qset); return qset; } @@ -268,7 +276,7 @@ void taosCloseQset(STaosQset *qset) { pthread_mutex_destroy(&qset->mutex); tsem_destroy(&qset->sem); free(qset); - uTrace("qset:%p is closed", qset); + uDebug("qset:%p is closed", qset); } // tsem_post 'qset->sem', so that reader threads waiting for it @@ -338,7 +346,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { pthread_mutex_unlock(&qset->mutex); - uTrace("queue:%p is removed from qset:%p", queue, qset); + uDebug("queue:%p is removed from qset:%p", queue, qset); } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } @@ -365,6 +373,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP *ppItem = pNode->item; if (ahandle) *ahandle = queue->ahandle; if (itemFp) *itemFp = queue->itemFp; + queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 3fe087a20d..fc0b6afb3b 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "tworker.h" +#include "taoserror.h" #include "ulog.h" typedef void *(*ThreadFp)(void *param); @@ -22,7 +23,13 @@ typedef void *(*ThreadFp)(void *param); int32_t tQWorkerInit(SQWorkerPool *pool) { pool->qset = taosOpenQset(); pool->workers = calloc(sizeof(SQWorker), pool->max); + if (pool->workers == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (pthread_mutex_init(&pool->mutex, NULL)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -91,6 +98,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -109,6 +117,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosCloseQueue(queue); + terrno = TSDB_CODE_OUT_OF_MEMORY; queue = NULL; break; } @@ -133,9 +142,16 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; pool->workers = calloc(sizeof(SWWorker), pool->max); - if (pool->workers == NULL) return -1; + if (pool->workers == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (pthread_mutex_init(&pool->mutex, NULL) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - pthread_mutex_init(&pool->mutex, NULL); for (int32_t i = 0; i < pool->max; ++i) { SWWorker *worker = pool->workers + i; worker->id = i; @@ -208,6 +224,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -227,6 +244,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems taosCloseQset(worker->qset); taosCloseQueue(queue); pthread_mutex_unlock(&pool->mutex); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pthread_attr_t thAttr; @@ -238,6 +256,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems taosFreeQall(worker->qall); taosCloseQset(worker->qset); taosCloseQueue(queue); + terrno = TSDB_CODE_OUT_OF_MEMORY; queue = NULL; } else { uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); -- GitLab