提交 07978790 编写于 作者: S Shengliang Guan

dead lock after refact worker

上级 5690df9a
......@@ -910,27 +910,27 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1);
int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1);
SQWorkerPool *pPool = &pMgmt->queryPool;
pPool->name = "vnode-query";
pPool->min = minQueryThreads;
pPool->max = maxQueryThreads;
if (tQWorkerInit(pPool) != 0) return -1;
pPool = &pMgmt->fetchPool;
pPool->name = "vnode-fetch";
pPool->min = minFetchThreads;
pPool->max = maxFetchThreads;
if (tFWorkerInit(pPool) != 0) return -1;
SWWorkerPool *pMPool = &pMgmt->writePool;
pMPool->name = "vnode-write";
pMPool->max = maxWriteThreads;
if (tWWorkerInit(pMPool) != 0) return -1;
pMPool = &pMgmt->syncPool;
pMPool->name = "vnode-sync";
pMPool->max = maxSyncThreads;
if (tWWorkerInit(pMPool) != 0) return -1;
SQWorkerPool *pQPool = &pMgmt->queryPool;
pQPool->name = "vnode-query";
pQPool->min = minQueryThreads;
pQPool->max = maxQueryThreads;
if (tQWorkerInit(pQPool) != 0) return -1;
SFWorkerPool *pFPool = &pMgmt->fetchPool;
pFPool->name = "vnode-fetch";
pFPool->min = minFetchThreads;
pFPool->max = maxFetchThreads;
if (tFWorkerInit(pFPool) != 0) return -1;
SWWorkerPool *pWPool = &pMgmt->writePool;
pWPool->name = "vnode-write";
pWPool->max = maxWriteThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
pWPool = &pMgmt->syncPool;
pWPool->name = "vnode-sync";
pWPool->max = maxSyncThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
dDebug("vnode workers is initialized");
return 0;
......
......@@ -423,7 +423,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
queue->tail = NULL;
queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem);
for (int32_t j = 1; j < qall->numOfItems; ++j) {
tsem_wait(&qset->sem);
}
}
pthread_mutex_unlock(&queue->mutex);
......@@ -437,7 +439,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) {
STaosQnode *pNode = NULL;
int32_t code = 0;
int32_t code = -1;
tsem_wait(&qset->sem);
......@@ -449,7 +451,10 @@ int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **aha
if (queue) qset->current = queue->next;
if (queue == NULL) break;
if (queue->head == NULL) continue;
if (queue->threadId != -1 && queue->threadId != threadId) continue;
if (queue->threadId != -1 && queue->threadId != threadId) {
code = 0;
continue;
}
pthread_mutex_lock(&queue->mutex);
......@@ -485,6 +490,9 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) {
pthread_mutex_lock(&qset->mutex);
pNode->queue->threadId = -1;
for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
tsem_post(&qset->sem);
}
pthread_mutex_unlock(&qset->mutex);
}
......
......@@ -149,8 +149,8 @@ void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); }
static void *tFWorkerThreadFp(SQWorker *worker) {
SQWorkerPool *pool = worker->pool;
FItem fp = NULL;
FItem fp = NULL;
void * msg = NULL;
void * ahandle = NULL;
int32_t code = 0;
......@@ -160,9 +160,14 @@ static void *tFWorkerThreadFp(SQWorker *worker) {
uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) {
if (taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id) == 0) {
code = taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id);
if (code < 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break;
} else if (code == 0) {
// uTrace("worker:%s:%d qset:%p, got no message and continue", pool->name, worker->id, pool->qset);
continue;
}
if (fp != NULL) {
......@@ -231,7 +236,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
uInfo("worker:%s is closed", pool->name);
}
static void *tWriteWorkerThreadFp(SWWorker *worker) {
static void *tWWorkerThreadFp(SWWorker *worker) {
SWWorkerPool *pool = worker->pool;
FItems fp = NULL;
......@@ -293,7 +298,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) {
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) {
uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
taosFreeQall(worker->qall);
taosCloseQset(worker->qset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册