diff --git a/include/util/tqueue.h b/include/util/tqueue.h index bcb9aea856a104ce71b8355d607f6437d768a3a2..a57bdb5ce8270d65e7280412e90961360036239c 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -22,59 +22,57 @@ extern "C" { /* -This set of API for queue is designed specially for vnode/mnode. The main purpose is to -consume all the items instead of one item from a queue by one single read. Also, it can -combine multiple queues into a queue set, a consumer thread can consume a queue set via +This set of API for queue is designed specially for vnode/mnode. The main purpose is to +consume all the items instead of one item from a queue by one single read. Also, it can +combine multiple queues into a queue set, a consumer thread can consume a queue set via a single API instead of looping every queue by itself. Notes: -1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe +1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe 2: after taosCloseQueue/taosCloseQset is called, read/write operation APIs are not safe. 3: read/write operation APIs are multi-thread safe To remove the limitation and make this set of queue APIs multi-thread safe, REF(tref.c) -shall be used to set up the protection. +shall be used to set up the protection. */ -typedef void *taos_queue; -typedef void *taos_qset; -typedef void *taos_qall; +typedef struct STaosQueue STaosQueue; +typedef struct STaosQset STaosQset; +typedef struct STaosQall STaosQall; typedef void (*FProcessItem)(void *ahandle, void *pItem); -typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems); - -taos_queue taosOpenQueue(); -void taosCloseQueue(taos_queue); -void taosSetQueueFp(taos_queue, FProcessItem, FProcessItems); -void *taosAllocateQitem(int size); -void taosFreeQitem(void *pItem); -int taosWriteQitem(taos_queue, void *pItem); -int taosReadQitem(taos_queue, void **pItem); -bool taosQueueEmpty(taos_queue); - -taos_qall taosAllocateQall(); -void taosFreeQall(taos_qall); -int taosReadAllQitems(taos_queue, taos_qall); -int taosGetQitem(taos_qall, void **pItem); -void taosResetQitems(taos_qall); - -taos_qset taosOpenQset(); -void taosCloseQset(); -void taosQsetThreadResume(taos_qset param); -int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); -void taosRemoveFromQset(taos_qset, taos_queue); -int taosGetQueueNumber(taos_qset); - -int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessItem *); -int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessItems *); - -int taosGetQueueItemsNumber(taos_queue param); -int taosGetQsetItemsNumber(taos_qset param); +typedef void (*FProcessItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); + +STaosQueue *taosOpenQueue(); +void taosCloseQueue(STaosQueue *queue); +void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp); +void *taosAllocateQitem(int32_t size); +void taosFreeQitem(void *pItem); +int32_t taosWriteQitem(STaosQueue *queue, void *pItem); +int32_t taosReadQitem(STaosQueue *queue, void **ppItem); +bool taosQueueEmpty(STaosQueue *queue); + +STaosQall *taosAllocateQall(); +void taosFreeQall(STaosQall *qall); +int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall); +int32_t taosGetQitem(STaosQall *qall, void **ppItem); +void taosResetQitems(STaosQall *qall); + +STaosQset *taosOpenQset(); +void taosCloseQset(STaosQset *qset); +void taosQsetThreadResume(STaosQset *qset); +int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); +void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); +int32_t taosGetQueueNumber(STaosQset *qset); + +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp); +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp); + +int32_t taosGetQueueItemsNumber(STaosQueue *queue); +int32_t taosGetQsetItemsNumber(STaosQset *qset); #ifdef __cplusplus } #endif #endif /*_TD_UTIL_QUEUE_H*/ - - diff --git a/include/util/tworker.h b/include/util/tworker.h index 2e5852cbbac6b305b55bc8ec1cf04b34f254663b..27f03bd2b69e6d6add97cdc2e5ab05ad2fdd5eca 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -35,7 +35,7 @@ typedef struct SWorkerPool { int32_t max; // max number of workers int32_t min; // min number of workers int32_t num; // current number of workers - taos_qset qset; + STaosQset *qset; const char *name; SWorker *workers; pthread_mutex_t mutex; @@ -44,8 +44,8 @@ typedef struct SWorkerPool { typedef struct SMWorker { int32_t id; // worker id pthread_t thread; // thread - taos_qall qall; - taos_qset qset; // queue set + STaosQall *qall; + STaosQset *qset; // queue set SMWorkerPool *pool; } SMWorker; @@ -57,15 +57,15 @@ typedef struct SMWorkerPool { pthread_mutex_t mutex; } SMWorkerPool; -int32_t tWorkerInit(SWorkerPool *pool); -void tWorkerCleanup(SWorkerPool *pool); -taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); -void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue); +int32_t tWorkerInit(SWorkerPool *pool); +void tWorkerCleanup(SWorkerPool *pool); +STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); +void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue); -int32_t tMWorkerInit(SMWorkerPool *pool); -void tMWorkerCleanup(SMWorkerPool *pool); -taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); -void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue); +int32_t tMWorkerInit(SMWorkerPool *pool); +void tMWorkerCleanup(SMWorkerPool *pool); +STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); +void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index 8161b8d125ebb9f9fec4762f0327e2adc3a5449f..cedab6266e65734fc0f18790788bd0b2bb4b9bf6 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -140,7 +140,7 @@ void dmnInitOption(SDnodeOpt *pOption) { pOption->sver = 30000000; //3.0.0.0 pOption->numOfCores = tsNumOfCores; pOption->numOfSupportVnodes = tsNumOfSupportVnodes; - pOption->numOfCommitThreads = 1; + pOption->numOfCommitThreads = tsNumOfCommitThreads; pOption->statusInterval = tsStatusInterval; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->ratioOfQueryCores = tsRatioOfQueryCores; diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 954e21aefa98ba9f8a14aae9ca227e7400fb8ee5..07c8ce5d02db09a19db429801b637e4e7dc73e6d 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -64,7 +64,7 @@ typedef struct { int32_t maxNum; void *queueFp; SDnode *pDnode; - taos_queue queue; + STaosQueue *queue; union { SWorkerPool pool; SMWorkerPool mpool; @@ -92,7 +92,7 @@ typedef struct { SDnodeEps *dnodeEps; pthread_t *threadId; SRWLatch latch; - taos_queue pMgmtQ; + STaosQueue *pMgmtQ; SWorkerPool mgmtPool; } SDnodeMgmt; diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index 66b619318d0690e3640668a4db6013b1b8696c9d..c12d449517adcf5fbeafe1a5f727177000ab10b3 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -19,7 +19,7 @@ #include "dndTransport.h" #include "dndWorker.h" -static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs); +static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); static SBnode *dndAcquireBnode(SDnode *pDnode) { SBnodeMgmt *pMgmt = &pDnode->bmgmt; @@ -286,7 +286,7 @@ static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) { taosFreeQitem(pMsg); } -static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t code) { +static void dndSendBnodeErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { for (int32_t i = 0; i < numOfMsgs; ++i) { SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); @@ -294,7 +294,7 @@ static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t cod } } -static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { SBnode *pBnode = dndAcquireBnode(pDnode); if (pBnode == NULL) { dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 3eab3e5aec734dc1993bcd37b745529204b1e562..8835e0ba65cbb4a2f8a8721391a606e6e47bdac1 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -27,20 +27,20 @@ typedef struct { } SWrapperCfg; typedef struct { - int32_t vgId; - int32_t refCount; - int32_t vgVersion; - int8_t dropped; - int8_t accessState; - uint64_t dbUid; - char *db; - char *path; - SVnode *pImpl; - taos_queue pWriteQ; - taos_queue pSyncQ; - taos_queue pApplyQ; - taos_queue pQueryQ; - taos_queue pFetchQ; + int32_t vgId; + int32_t refCount; + int32_t vgVersion; + int8_t dropped; + int8_t accessState; + uint64_t dbUid; + char *db; + char *path; + SVnode *pImpl; + STaosQueue *pWriteQ; + STaosQueue *pSyncQ; + STaosQueue *pApplyQ; + STaosQueue *pQueryQ; + STaosQueue* pFetchQ; } SVnodeObj; typedef struct { @@ -72,9 +72,9 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); -static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); -static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); -static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); +static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs); +static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs); +static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); @@ -768,7 +768,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp); } -static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -804,7 +804,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t taosArrayDestroy(pArray); } -static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -815,7 +815,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t } } -static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -826,7 +826,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t } } -static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { +static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) { int32_t code = 0; if (pQueue == NULL) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 04bc0c8dc8a4d84e4b4454db894580b1e2cc9d55..75f5e9cdbcb96198661a2f863a7d42d4f886b965 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -14,26 +14,29 @@ */ #include "os.h" -#include "ulog.h" + #include "taoserror.h" #include "tqueue.h" +#include "ulog.h" + +typedef struct STaosQnode STaosQnode; typedef struct STaosQnode { - struct STaosQnode *next; - char item[]; + STaosQnode *next; + char item[]; } STaosQnode; typedef struct STaosQueue { - int32_t itemSize; - int32_t numOfItems; - struct STaosQnode *head; - struct STaosQnode *tail; - struct STaosQueue *next; // for queue set - struct STaosQset *qset; // for queue set - void *ahandle; // for queue set - FProcessItem itemFp; - FProcessItems itemsFp; - pthread_mutex_t mutex; + int32_t itemSize; + int32_t numOfItems; + STaosQnode *head; + STaosQnode *tail; + STaosQueue *next; // for queue set + STaosQset *qset; // for queue set + void *ahandle; // for queue set + FProcessItem itemFp; + FProcessItems itemsFp; + pthread_mutex_t mutex; } STaosQueue; typedef struct STaosQset { @@ -52,8 +55,8 @@ typedef struct STaosQall { int32_t numOfItems; } STaosQall; -taos_queue taosOpenQueue() { - STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1); +STaosQueue *taosOpenQueue() { + STaosQueue *queue = calloc(sizeof(STaosQueue), 1); if (queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -65,16 +68,14 @@ taos_queue taosOpenQueue() { return queue; } -void taosSetQueueFp(taos_queue param, FProcessItem itemFp, FProcessItems itemsFp) { - if (param == NULL) return; - STaosQueue *queue = (STaosQueue *)param; +void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) { + if (queue == NULL) return; queue->itemFp = itemFp; queue->itemsFp = itemsFp; } -void taosCloseQueue(taos_queue param) { - if (param == NULL) return; - STaosQueue *queue = (STaosQueue *)param; +void taosCloseQueue(STaosQueue *queue) { + if (queue == NULL) return; STaosQnode *pTemp; STaosQset *qset; @@ -98,9 +99,8 @@ void taosCloseQueue(taos_queue param) { uTrace("queue:%p is closed", queue); } -bool taosQueueEmpty(taos_queue param) { - if (param == NULL) return true; - STaosQueue *queue = (STaosQueue *)param; +bool taosQueueEmpty(STaosQueue *queue) { + if (queue == NULL) return true; bool empty = false; pthread_mutex_lock(&queue->mutex); @@ -112,7 +112,7 @@ bool taosQueueEmpty(taos_queue param) { return empty; } -void *taosAllocateQitem(int size) { +void *taosAllocateQitem(int32_t size) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); if (pNode == NULL) return NULL; @@ -129,9 +129,8 @@ void taosFreeQitem(void *param) { free(temp); } -int taosWriteQitem(taos_queue param, void *item) { - STaosQueue *queue = (STaosQueue *)param; - STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode)); +int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { + STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode)); pNode->next = NULL; pthread_mutex_lock(&queue->mutex); @@ -146,7 +145,7 @@ int taosWriteQitem(taos_queue param, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - uTrace("item:%p is put into queue:%p, items:%d", item, queue, queue->numOfItems); + uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); @@ -155,22 +154,21 @@ int taosWriteQitem(taos_queue param, void *item) { return 0; } -int taosReadQitem(taos_queue param, void **pitem) { - STaosQueue *queue = (STaosQueue *)param; +int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { STaosQnode *pNode = NULL; - int code = 0; + int32_t code = 0; pthread_mutex_lock(&queue->mutex); if (queue->head) { pNode = queue->head; - *pitem = pNode->item; + *ppItem = pNode->item; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; - uDebug("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); + uDebug("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -178,18 +176,13 @@ int taosReadQitem(taos_queue param, void **pitem) { return code; } -void *taosAllocateQall() { - void *p = calloc(sizeof(STaosQall), 1); - return p; -} +STaosQall *taosAllocateQall() { return calloc(sizeof(STaosQall), 1); } -void taosFreeQall(void *param) { free(param); } +void taosFreeQall(STaosQall *qall) { free(qall); } -int taosReadAllQitems(taos_queue param, taos_qall p2) { - STaosQueue *queue = (STaosQueue *)param; - STaosQall *qall = (STaosQall *)p2; - int code = 0; - bool empty; +int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { + int32_t code = 0; + bool empty; pthread_mutex_lock(&queue->mutex); @@ -219,29 +212,25 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) { return code; } -int taosGetQitem(taos_qall param, void **pitem) { - STaosQall *qall = (STaosQall *)param; +int32_t taosGetQitem(STaosQall *qall, void **ppItem) { STaosQnode *pNode; - int num = 0; + int32_t num = 0; pNode = qall->current; if (pNode) qall->current = pNode->next; if (pNode) { - *pitem = pNode->item; + *ppItem = pNode->item; num = 1; - uTrace("item:%p is fetched", *pitem); + uTrace("item:%p is fetched", *ppItem); } return num; } -void taosResetQitems(taos_qall param) { - STaosQall *qall = (STaosQall *)param; - qall->current = qall->start; -} +void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } -taos_qset taosOpenQset() { +STaosQset *taosOpenQset() { STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); if (qset == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -255,9 +244,8 @@ taos_qset taosOpenQset() { return qset; } -void taosCloseQset(taos_qset param) { - if (param == NULL) return; - STaosQset *qset = (STaosQset *)param; +void taosCloseQset(STaosQset *qset) { + if (qset == NULL) return; // remove all the queues from qset pthread_mutex_lock(&qset->mutex); @@ -279,16 +267,12 @@ void taosCloseQset(taos_qset param) { // tsem_post 'qset->sem', so that reader threads waiting for it // resumes execution and return, should only be used to signal the // thread to exit. -void taosQsetThreadResume(taos_qset param) { - STaosQset *qset = (STaosQset *)param; +void taosQsetThreadResume(STaosQset *qset) { uDebug("qset:%p, it will exit", qset); tsem_post(&qset->sem); } -int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { - STaosQueue *queue = (STaosQueue *)p2; - STaosQset *qset = (STaosQset *)p1; - +int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) { if (queue->qset) return -1; pthread_mutex_lock(&qset->mutex); @@ -309,10 +293,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { return 0; } -void taosRemoveFromQset(taos_qset p1, taos_queue p2) { - STaosQueue *queue = (STaosQueue *)p2; - STaosQset *qset = (STaosQset *)p1; - +void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { STaosQueue *tqueue = NULL; pthread_mutex_lock(&qset->mutex); @@ -353,18 +334,17 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) { uTrace("queue:%p is removed from qset:%p", queue, qset); } -int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; } +int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProcessItem *itemFp) { - STaosQset *qset = (STaosQset *)param; +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) { STaosQnode *pNode = NULL; - int code = 0; + int32_t code = 0; tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); - for (int i = 0; i < qset->numOfQueues; ++i) { + for (int32_t i = 0; i < qset->numOfQueues; ++i) { if (qset->current == NULL) qset->current = qset->head; STaosQueue *queue = qset->current; if (queue) qset->current = queue->next; @@ -375,7 +355,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces if (queue->head) { pNode = queue->head; - *pitem = pNode->item; + *ppItem = pNode->item; if (ahandle) *ahandle = queue->ahandle; if (itemFp) *itemFp = queue->itemFp; queue->head = pNode->next; @@ -383,7 +363,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces queue->numOfItems--; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; - uTrace("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); + uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -395,18 +375,15 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces return code; } -int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FProcessItems *itemsFp) { - STaosQset *qset = (STaosQset *)param; +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) { STaosQueue *queue; - STaosQall *qall = (STaosQall *)p2; - int code = 0; + int32_t code = 0; tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); - for(int i=0; inumOfQueues; ++i) { - if (qset->current == NULL) - qset->current = qset->head; + for (int32_t i = 0; i < qset->numOfQueues; ++i) { + if (qset->current == NULL) qset->current = qset->head; queue = qset->current; if (queue) qset->current = queue->next; if (queue == NULL) break; @@ -427,34 +404,32 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FPr queue->tail = NULL; queue->numOfItems = 0; atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); - for (int j=1; jnumOfItems; ++j) tsem_wait(&qset->sem); - } + for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem); + } pthread_mutex_unlock(&queue->mutex); - if (code != 0) break; + if (code != 0) break; } pthread_mutex_unlock(&qset->mutex); return code; } -int taosGetQueueItemsNumber(taos_queue param) { - STaosQueue *queue = (STaosQueue *)param; +int32_t taosGetQueueItemsNumber(STaosQueue *queue) { if (!queue) return 0; - int num; + int32_t num; pthread_mutex_lock(&queue->mutex); num = queue->numOfItems; pthread_mutex_unlock(&queue->mutex); return num; } -int taosGetQsetItemsNumber(taos_qset param) { - STaosQset *qset = (STaosQset *)param; +int32_t taosGetQsetItemsNumber(STaosQset *qset) { if (!qset) return 0; - int num = 0; + int32_t num = 0; pthread_mutex_lock(&qset->mutex); num = qset->numOfItems; pthread_mutex_unlock(&qset->mutex); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index fb7b71b845e5f5a5584f651e14b20d67fa3d6eb4..ed74041712ce0022eb66ac5c9331b88fda961222 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -85,9 +85,9 @@ static void *tWorkerThreadFp(SWorker *worker) { return NULL; } -taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { +STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { pthread_mutex_lock(&pool->mutex); - taos_queue queue = taosOpenQueue(); + STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); return NULL; @@ -121,7 +121,7 @@ taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) return queue; } -void tWorkerFreeQueue(SWorkerPool *pool, void *queue) { +void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); } @@ -195,11 +195,11 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { return NULL; } -taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { +STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { pthread_mutex_lock(&pool->mutex); SMWorker *worker = pool->workers + pool->nextId; - taos_queue *queue = taosOpenQueue(); + STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); return NULL; @@ -250,7 +250,7 @@ taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems f return queue; } -void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue) { +void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); }