From 07978790225f62672004c18e8052bdbee154778e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 25 Jan 2022 10:29:47 +0000 Subject: [PATCH] dead lock after refact worker --- source/dnode/mgmt/impl/src/dndVnodes.c | 42 +++++++++++++------------- source/util/src/tqueue.c | 14 +++++++-- source/util/src/tworker.c | 13 +++++--- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 473e6c33ef..dcb73d13c7 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -910,27 +910,27 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) { int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1); int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1); - SQWorkerPool *pPool = &pMgmt->queryPool; - pPool->name = "vnode-query"; - pPool->min = minQueryThreads; - pPool->max = maxQueryThreads; - if (tQWorkerInit(pPool) != 0) return -1; - - pPool = &pMgmt->fetchPool; - pPool->name = "vnode-fetch"; - pPool->min = minFetchThreads; - pPool->max = maxFetchThreads; - if (tFWorkerInit(pPool) != 0) return -1; - - SWWorkerPool *pMPool = &pMgmt->writePool; - pMPool->name = "vnode-write"; - pMPool->max = maxWriteThreads; - if (tWWorkerInit(pMPool) != 0) return -1; - - pMPool = &pMgmt->syncPool; - pMPool->name = "vnode-sync"; - pMPool->max = maxSyncThreads; - if (tWWorkerInit(pMPool) != 0) return -1; + SQWorkerPool *pQPool = &pMgmt->queryPool; + pQPool->name = "vnode-query"; + pQPool->min = minQueryThreads; + pQPool->max = maxQueryThreads; + if (tQWorkerInit(pQPool) != 0) return -1; + + SFWorkerPool *pFPool = &pMgmt->fetchPool; + pFPool->name = "vnode-fetch"; + pFPool->min = minFetchThreads; + pFPool->max = maxFetchThreads; + if (tFWorkerInit(pFPool) != 0) return -1; + + SWWorkerPool *pWPool = &pMgmt->writePool; + pWPool->name = "vnode-write"; + pWPool->max = maxWriteThreads; + if (tWWorkerInit(pWPool) != 0) return -1; + + pWPool = &pMgmt->syncPool; + pWPool->name = "vnode-sync"; + pWPool->max = maxSyncThreads; + if (tWWorkerInit(pWPool) != 0) return -1; dDebug("vnode workers is initialized"); return 0; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index b50ad77b6f..8125f550d0 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -423,7 +423,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand queue->tail = NULL; queue->numOfItems = 0; atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); - for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem); + for (int32_t j = 1; j < qall->numOfItems; ++j) { + tsem_wait(&qset->sem); + } } pthread_mutex_unlock(&queue->mutex); @@ -437,7 +439,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) { STaosQnode *pNode = NULL; - int32_t code = 0; + int32_t code = -1; tsem_wait(&qset->sem); @@ -449,7 +451,10 @@ int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **aha if (queue) qset->current = queue->next; if (queue == NULL) break; if (queue->head == NULL) continue; - if (queue->threadId != -1 && queue->threadId != threadId) continue; + if (queue->threadId != -1 && queue->threadId != threadId) { + code = 0; + continue; + } pthread_mutex_lock(&queue->mutex); @@ -485,6 +490,9 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) { pthread_mutex_lock(&qset->mutex); pNode->queue->threadId = -1; + for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) { + tsem_post(&qset->sem); + } pthread_mutex_unlock(&qset->mutex); } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index bc98a371fc..97440f8dae 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -149,8 +149,8 @@ void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); } static void *tFWorkerThreadFp(SQWorker *worker) { SQWorkerPool *pool = worker->pool; - FItem fp = NULL; + FItem fp = NULL; void * msg = NULL; void * ahandle = NULL; int32_t code = 0; @@ -160,9 +160,14 @@ static void *tFWorkerThreadFp(SQWorker *worker) { uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - if (taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id) == 0) { + code = taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id); + + if (code < 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; + } else if (code == 0) { + // uTrace("worker:%s:%d qset:%p, got no message and continue", pool->name, worker->id, pool->qset); + continue; } if (fp != NULL) { @@ -231,7 +236,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) { uInfo("worker:%s is closed", pool->name); } -static void *tWriteWorkerThreadFp(SWWorker *worker) { +static void *tWWorkerThreadFp(SWWorker *worker) { SWWorkerPool *pool = worker->pool; FItems fp = NULL; @@ -293,7 +298,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) { + if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) { uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosFreeQall(worker->qall); taosCloseQset(worker->qset); -- GitLab