提交 4fc74ced 编写于 作者: S Shengliang Guan

refact worker

上级 8685fb79
...@@ -41,13 +41,13 @@ shall be used to set up the protection. ...@@ -41,13 +41,13 @@ shall be used to set up the protection.
typedef struct STaosQueue STaosQueue; typedef struct STaosQueue STaosQueue;
typedef struct STaosQset STaosQset; typedef struct STaosQset STaosQset;
typedef struct STaosQall STaosQall; typedef struct STaosQall STaosQall;
typedef void (*FProcessItem)(void *ahandle, void *pItem); typedef void (*FItem)(void *ahandle, void *pItem);
typedef void (*FProcessItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems);
STaosQueue *taosOpenQueue(); STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue); void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
void *taosAllocateQitem(int32_t size); void * taosAllocateQitem(int32_t size);
void taosFreeQitem(void *pItem); void taosFreeQitem(void *pItem);
int32_t taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
...@@ -67,8 +67,11 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); ...@@ -67,8 +67,11 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue);
int32_t taosGetQueueNumber(STaosQset *qset); int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId);
void taosResetQsetThread(STaosQset *qset, void *pItem);
int32_t taosGetQueueItemsNumber(STaosQueue *queue); int32_t taosGetQueueItemsNumber(STaosQueue *queue);
int32_t taosGetQsetItemsNumber(STaosQset *qset); int32_t taosGetQsetItemsNumber(STaosQset *qset);
......
...@@ -22,14 +22,13 @@ extern "C" { ...@@ -22,14 +22,13 @@ extern "C" {
#endif #endif
typedef struct SQWorkerPool SQWorkerPool; typedef struct SQWorkerPool SQWorkerPool;
typedef struct SFWorkerPool SFWorkerPool;
typedef struct SWWorkerPool SWWorkerPool; typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQWorker { typedef struct SQWorker {
int32_t id; // worker ID int32_t id; // worker ID
pthread_t thread; // thread pthread_t thread; // thread
SQWorkerPool *pool; SQWorkerPool *pool;
} SQWorker; } SQWorker, SFWorker;
typedef struct SQWorkerPool { typedef struct SQWorkerPool {
int32_t max; // max number of workers int32_t max; // max number of workers
...@@ -39,23 +38,7 @@ typedef struct SQWorkerPool { ...@@ -39,23 +38,7 @@ typedef struct SQWorkerPool {
const char * name; const char * name;
SQWorker * workers; SQWorker * workers;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SQWorkerPool; } SQWorkerPool, SFWorkerPool;
typedef struct SFWorker {
int32_t id; // worker ID
pthread_t thread; // thread
SFWorkerPool *pool;
} SFWorker;
typedef struct SFWorkerPool {
int32_t max; // max number of workers
int32_t min; // min number of workers
int32_t num; // current number of workers
STaosQset * qset;
const char * name;
SFWorker * workers;
pthread_mutex_t mutex;
} SFWorkerPool;
typedef struct SWWorker { typedef struct SWWorker {
int32_t id; // worker id int32_t id; // worker id
...@@ -75,12 +58,17 @@ typedef struct SWWorkerPool { ...@@ -75,12 +58,17 @@ typedef struct SWWorkerPool {
int32_t tQWorkerInit(SQWorkerPool *pool); int32_t tQWorkerInit(SQWorkerPool *pool);
void tQWorkerCleanup(SQWorkerPool *pool); void tQWorkerCleanup(SQWorkerPool *pool);
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp); STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp);
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue); void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
int32_t tFWorkerInit(SFWorkerPool *pool);
void tFWorkerCleanup(SFWorkerPool *pool);
STaosQueue *tFWorkerAllocQueue(SFWorkerPool *pool, void *ahandle, FItem fp);
void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue);
int32_t tWWorkerInit(SWWorkerPool *pool); int32_t tWWorkerInit(SWWorkerPool *pool);
void tWWorkerCleanup(SWWorkerPool *pool); void tWWorkerCleanup(SWWorkerPool *pool);
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -253,7 +253,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_ ...@@ -253,7 +253,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
} }
for (int32_t i = 0; i < vnodesNum; ++i) { for (int32_t i = 0; i < vnodesNum; ++i) {
cJSON *vnode = cJSON_GetArrayItem(vnodes, i); cJSON * vnode = cJSON_GetArrayItem(vnodes, i);
SWrapperCfg *pCfg = &pCfgs[i]; SWrapperCfg *pCfg = &pCfgs[i];
cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
...@@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) { ...@@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId}; SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); SVnode * pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++; pThread->failed++;
...@@ -948,11 +948,11 @@ static void dndCleanupVnodeWorkers(SDnode *pDnode) { ...@@ -948,11 +948,11 @@ static void dndCleanupVnodeWorkers(SDnode *pDnode) {
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue);
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue);
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
pVnode->pQueryQ == NULL) { pVnode->pQueryQ == NULL) {
......
...@@ -39,7 +39,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c ...@@ -39,7 +39,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FItem)queueFp);
if (pWorker->queue == NULL) { if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -52,7 +52,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c ...@@ -52,7 +52,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FItems)queueFp);
if (pWorker->queue == NULL) { if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
......
...@@ -22,19 +22,21 @@ typedef struct STaosQnode STaosQnode; ...@@ -22,19 +22,21 @@ typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode { typedef struct STaosQnode {
STaosQnode *next; STaosQnode *next;
STaosQueue *queue;
char item[]; char item[];
} STaosQnode; } STaosQnode;
typedef struct STaosQueue { typedef struct STaosQueue {
int32_t itemSize; int32_t itemSize;
int32_t numOfItems; int32_t numOfItems;
int32_t threadId;
STaosQnode * head; STaosQnode * head;
STaosQnode * tail; STaosQnode * tail;
STaosQueue * next; // for queue set STaosQueue * next; // for queue set
STaosQset * qset; // for queue set STaosQset * qset; // for queue set
void * ahandle; // for queue set void * ahandle; // for queue set
FProcessItem itemFp; FItem itemFp;
FProcessItems itemsFp; FItems itemsFp;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} STaosQueue; } STaosQueue;
...@@ -66,11 +68,12 @@ STaosQueue *taosOpenQueue() { ...@@ -66,11 +68,12 @@ STaosQueue *taosOpenQueue() {
return NULL; return NULL;
} }
queue->threadId = -1;
uDebug("queue:%p is opened", queue); uDebug("queue:%p is opened", queue);
return queue; return queue;
} }
void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) { void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
if (queue == NULL) return; if (queue == NULL) return;
queue->itemFp = itemFp; queue->itemFp = itemFp;
queue->itemsFp = itemsFp; queue->itemsFp = itemsFp;
...@@ -135,12 +138,12 @@ void *taosAllocateQitem(int32_t size) { ...@@ -135,12 +138,12 @@ void *taosAllocateQitem(int32_t size) {
return (void *)pNode->item; return (void *)pNode->item;
} }
void taosFreeQitem(void *param) { void taosFreeQitem(void *pItem) {
if (param == NULL) return; if (pItem == NULL) return;
char *temp = param; char *temp = pItem;
temp -= sizeof(STaosQnode); temp -= sizeof(STaosQnode);
uTrace("item:%p, node:%p is freed", param, temp); uTrace("item:%p, node:%p is freed", pItem, temp);
free(temp); free(temp);
} }
...@@ -351,7 +354,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { ...@@ -351,7 +354,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) { int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) {
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -391,7 +394,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP ...@@ -391,7 +394,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP
return code; return code;
} }
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) { int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) {
STaosQueue *queue; STaosQueue *queue;
int32_t code = 0; int32_t code = 0;
...@@ -432,6 +435,59 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand ...@@ -432,6 +435,59 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
return code; return code;
} }
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) {
STaosQnode *pNode = NULL;
int32_t code = 0;
tsem_wait(&qset->sem);
pthread_mutex_lock(&qset->mutex);
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
if (qset->current == NULL) qset->current = qset->head;
STaosQueue *queue = qset->current;
if (queue) qset->current = queue->next;
if (queue == NULL) break;
if (queue->head == NULL) continue;
if (queue->threadId != -1 && queue->threadId != threadId) continue;
pthread_mutex_lock(&queue->mutex);
if (queue->head) {
pNode = queue->head;
pNode->queue = queue;
queue->threadId = threadId;
*ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp;
queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1;
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
}
pthread_mutex_unlock(&queue->mutex);
if (pNode) break;
}
pthread_mutex_unlock(&qset->mutex);
return code;
}
void taosResetQsetThread(STaosQset *qset, void *pItem) {
if (pItem == NULL) return;
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
pthread_mutex_lock(&qset->mutex);
pNode->queue->threadId = -1;
pthread_mutex_unlock(&qset->mutex);
}
int32_t taosGetQueueItemsNumber(STaosQueue *queue) { int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
if (!queue) return 0; if (!queue) return 0;
......
...@@ -39,7 +39,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { ...@@ -39,7 +39,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
worker->pool = pool; worker->pool = pool;
} }
uDebug("qworker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
return 0; return 0;
} }
...@@ -64,12 +64,12 @@ void tQWorkerCleanup(SQWorkerPool *pool) { ...@@ -64,12 +64,12 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
taosCloseQset(pool->qset); taosCloseQset(pool->qset);
pthread_mutex_destroy(&pool->mutex); pthread_mutex_destroy(&pool->mutex);
uDebug("qworker:%s is closed", pool->name); uDebug("worker:%s is closed", pool->name);
} }
static void *tQWorkerThreadFp(SQWorker *worker) { static void *tQWorkerThreadFp(SQWorker *worker) {
SQWorkerPool *pool = worker->pool; SQWorkerPool *pool = worker->pool;
FProcessItem fp = NULL; FItem fp = NULL;
void * msg = NULL; void * msg = NULL;
void * ahandle = NULL; void * ahandle = NULL;
...@@ -77,11 +77,11 @@ static void *tQWorkerThreadFp(SQWorker *worker) { ...@@ -77,11 +77,11 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName(pool->name); setThreadName(pool->name);
uDebug("qworker:%s:%d is running", pool->name, worker->id); uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) { while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
uDebug("qworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break; break;
} }
...@@ -93,7 +93,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) { ...@@ -93,7 +93,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
return NULL; return NULL;
} }
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp) { STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) {
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
STaosQueue *queue = taosOpenQueue(); STaosQueue *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
...@@ -114,8 +114,8 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f ...@@ -114,8 +114,8 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { if (pthread_create(&worker->thread, &thAttr, threadFp, worker) != 0) {
uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
taosCloseQueue(queue); taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
queue = NULL; queue = NULL;
...@@ -124,21 +124,63 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f ...@@ -124,21 +124,63 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
pool->num++; pool->num++;
uDebug("qworker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
} while (pool->num < pool->min); } while (pool->num < pool->min);
} }
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
uDebug("qworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
return queue; return queue;
} }
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp);
}
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
taosCloseQueue(queue); taosCloseQueue(queue);
uDebug("qworker:%s, queue:%p is freed", pool->name, queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue);
} }
int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); }
void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); }
static void *tFWorkerThreadFp(SQWorker *worker) {
SQWorkerPool *pool = worker->pool;
FItem fp = NULL;
void * msg = NULL;
void * ahandle = NULL;
int32_t code = 0;
taosBlockSIGPIPE();
setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) {
if (taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id) == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break;
}
if (fp != NULL) {
(*fp)(ahandle, msg);
}
taosResetQsetThread(pool->qset, msg);
}
return NULL;
}
STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tFWorkerThreadFp);
}
void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); }
int32_t tWWorkerInit(SWWorkerPool *pool) { int32_t tWWorkerInit(SWWorkerPool *pool) {
pool->nextId = 0; pool->nextId = 0;
pool->workers = calloc(sizeof(SWWorker), pool->max); pool->workers = calloc(sizeof(SWWorker), pool->max);
...@@ -160,7 +202,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) { ...@@ -160,7 +202,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) {
worker->pool = pool; worker->pool = pool;
} }
uInfo("wworker:%s is initialized, max:%d", pool->name, pool->max); uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
return 0; return 0;
} }
...@@ -186,12 +228,12 @@ void tWWorkerCleanup(SWWorkerPool *pool) { ...@@ -186,12 +228,12 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
tfree(pool->workers); tfree(pool->workers);
pthread_mutex_destroy(&pool->mutex); pthread_mutex_destroy(&pool->mutex);
uInfo("wworker:%s is closed", pool->name); uInfo("worker:%s is closed", pool->name);
} }
static void *tWriteWorkerThreadFp(SWWorker *worker) { static void *tWriteWorkerThreadFp(SWWorker *worker) {
SWWorkerPool *pool = worker->pool; SWWorkerPool *pool = worker->pool;
FProcessItems fp = NULL; FItems fp = NULL;
void * msg = NULL; void * msg = NULL;
void * ahandle = NULL; void * ahandle = NULL;
...@@ -200,12 +242,12 @@ static void *tWriteWorkerThreadFp(SWWorker *worker) { ...@@ -200,12 +242,12 @@ static void *tWriteWorkerThreadFp(SWWorker *worker) {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName(pool->name); setThreadName(pool->name);
uDebug("wworker:%s:%d is running", pool->name, worker->id); uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp); numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
uDebug("wworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
break; break;
} }
...@@ -217,7 +259,7 @@ static void *tWriteWorkerThreadFp(SWWorker *worker) { ...@@ -217,7 +259,7 @@ static void *tWriteWorkerThreadFp(SWWorker *worker) {
return NULL; return NULL;
} }
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp) { STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
SWWorker *worker = pool->workers + pool->nextId; SWWorker *worker = pool->workers + pool->nextId;
...@@ -252,14 +294,14 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems ...@@ -252,14 +294,14 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) { if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) {
uError("wworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
taosFreeQall(worker->qall); taosFreeQall(worker->qall);
taosCloseQset(worker->qset); taosCloseQset(worker->qset);
taosCloseQueue(queue); taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
queue = NULL; queue = NULL;
} else { } else {
uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
pool->nextId = (pool->nextId + 1) % pool->max; pool->nextId = (pool->nextId + 1) % pool->max;
} }
...@@ -270,12 +312,12 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems ...@@ -270,12 +312,12 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems
} }
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
uDebug("wworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
return queue; return queue;
} }
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
taosCloseQueue(queue); taosCloseQueue(queue);
uDebug("wworker:%s, queue:%p is freed", pool->name, queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册