From 19836961ced1c60f7184384a9bcc4c83e653f8cf Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Wed, 6 May 2020 10:03:41 +0000 Subject: [PATCH] use semphore instead of polling for tqeueu --- src/dnode/src/dnodeRead.c | 8 +++++--- src/dnode/src/dnodeWrite.c | 3 +++ src/rpc/src/rpcMain.c | 2 +- src/util/src/tqueue.c | 14 ++++++++++++-- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index e52b59d20a..efa5dc7695 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -74,8 +74,10 @@ void dnodeCleanupRead() { for (int i=0; i < readPool.max; ++i) { SReadWorker *pWorker = readPool.readWorker + i; - if (pWorker->thread) + if (pWorker->thread) { + pthread_cancel(pWorker->thread); pthread_join(pWorker->thread, NULL); + } } taosCloseQset(readQset); @@ -114,12 +116,12 @@ void dnodeRead(SRpcMsg *pMsg) { pRead->pCont = pCont; pRead->contLen = pHead->contLen; - taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); - // next vnode leftLen -= pHead->contLen; pCont -= pHead->contLen; queuedMsgNum++; + + taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); } if (queuedMsgNum == 0) { diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 39757c690f..4ca9b1935d 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -71,7 +71,10 @@ void dnodeCleanupWrite() { for (int32_t i = 0; i < wWorkerPool.max; ++i) { SWriteWorker *pWorker = wWorkerPool.writeWorker + i; if (pWorker->thread) { + pthread_cancel(pWorker->thread); pthread_join(pWorker->thread, NULL); + taosFreeQall(pWorker->qall); + taosCloseQset(pWorker->qset); } } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index d463bc3d95..3e4e05cd11 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1364,7 +1364,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { } } } else { - tTrace("%s %p, auth spi not matched, msg discarded", pRpc->label, pConn); + tTrace("%s %p, auth spi:%d not matched with received:%d", pRpc->label, pConn, pConn->spi, pHead->spi); code = TSDB_CODE_AUTH_FAILURE; } diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index c639452823..a458cefc9c 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -41,6 +41,7 @@ typedef struct _taos_qset { pthread_mutex_t mutex; int32_t numOfQueues; int32_t numOfItems; + tsem_t sem; } STaosQset; typedef struct _taos_qall { @@ -59,6 +60,7 @@ taos_queue taosOpenQueue() { } pthread_mutex_init(&queue->mutex, NULL); + return queue; } @@ -79,7 +81,7 @@ void taosCloseQueue(taos_queue param) { } pthread_mutex_unlock(&queue->mutex); - + pthread_mutex_destroy(&queue->mutex); free(queue); } @@ -116,11 +118,12 @@ int taosWriteQitem(taos_queue param, int type, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - uTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); + if (queue->qset) tsem_post(&queue->qset->sem); + return 0; } @@ -217,12 +220,15 @@ taos_qset taosOpenQset() { } pthread_mutex_init(&qset->mutex, NULL); + tsem_init(&qset->sem, 0, 0); return qset; } void taosCloseQset(taos_qset param) { STaosQset *qset = (STaosQset *)param; + pthread_mutex_destroy(&qset->mutex); + tsem_destroy(&qset->sem); free(qset); } @@ -298,6 +304,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand STaosQnode *pNode = NULL; int code = 0; + tsem_wait(&qset->sem); + pthread_mutex_lock(&qset->mutex); for(int i=0; inumOfQueues; ++i) { @@ -339,6 +347,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { STaosQall *qall = (STaosQall *)p2; int code = 0; + tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); for(int i=0; inumOfQueues; ++i) { @@ -364,6 +373,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { queue->tail = NULL; queue->numOfItems = 0; atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); + for (int j=1; jnumOfItems; ++j) tsem_wait(&qset->sem); } pthread_mutex_unlock(&queue->mutex); -- GitLab