提交 8685fb79 编写于 作者: S Shengliang Guan

refact worker

上级 5fca9015
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#ifndef _TD_UTIL_QUEUE_H #ifndef _TD_UTIL_QUEUE_H
#define _TD_UTIL_QUEUE_H #define _TD_UTIL_QUEUE_H
#include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#ifndef _TD_UTIL_WORKER_H #ifndef _TD_UTIL_WORKER_H
#define _TD_UTIL_WORKER_H #define _TD_UTIL_WORKER_H
#include "os.h"
#include "tqueue.h" #include "tqueue.h"
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -13,10 +13,9 @@ ...@@ -13,10 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #define _DEFAULT_SOURCE
#include "taoserror.h"
#include "tqueue.h" #include "tqueue.h"
#include "taoserror.h"
#include "ulog.h" #include "ulog.h"
typedef struct STaosQnode STaosQnode; typedef struct STaosQnode STaosQnode;
...@@ -29,19 +28,19 @@ typedef struct STaosQnode { ...@@ -29,19 +28,19 @@ typedef struct STaosQnode {
typedef struct STaosQueue { typedef struct STaosQueue {
int32_t itemSize; int32_t itemSize;
int32_t numOfItems; int32_t numOfItems;
STaosQnode *head; STaosQnode * head;
STaosQnode *tail; STaosQnode * tail;
STaosQueue *next; // for queue set STaosQueue * next; // for queue set
STaosQset *qset; // for queue set STaosQset * qset; // for queue set
void *ahandle; // for queue set void * ahandle; // for queue set
FProcessItem itemFp; FProcessItem itemFp;
FProcessItems itemsFp; FProcessItems itemsFp;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} STaosQueue; } STaosQueue;
typedef struct STaosQset { typedef struct STaosQset {
STaosQueue *head; STaosQueue * head;
STaosQueue *current; STaosQueue * current;
pthread_mutex_t mutex; pthread_mutex_t mutex;
int32_t numOfQueues; int32_t numOfQueues;
int32_t numOfItems; int32_t numOfItems;
...@@ -56,15 +55,18 @@ typedef struct STaosQall { ...@@ -56,15 +55,18 @@ typedef struct STaosQall {
} STaosQall; } STaosQall;
STaosQueue *taosOpenQueue() { STaosQueue *taosOpenQueue() {
STaosQueue *queue = calloc(sizeof(STaosQueue), 1); STaosQueue *queue = calloc(1, sizeof(STaosQueue));
if (queue == NULL) { if (queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pthread_mutex_init(&queue->mutex, NULL); if (pthread_mutex_init(&queue->mutex, NULL) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
uTrace("queue:%p is opened", queue); uDebug("queue:%p is opened", queue);
return queue; return queue;
} }
...@@ -77,7 +79,7 @@ void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsF ...@@ -77,7 +79,7 @@ void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsF
void taosCloseQueue(STaosQueue *queue) { void taosCloseQueue(STaosQueue *queue) {
if (queue == NULL) return; if (queue == NULL) return;
STaosQnode *pTemp; STaosQnode *pTemp;
STaosQset *qset; STaosQset * qset;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
STaosQnode *pNode = queue->head; STaosQnode *pNode = queue->head;
...@@ -85,7 +87,9 @@ void taosCloseQueue(STaosQueue *queue) { ...@@ -85,7 +87,9 @@ void taosCloseQueue(STaosQueue *queue) {
qset = queue->qset; qset = queue->qset;
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
if (queue->qset) taosRemoveFromQset(qset, queue); if (queue->qset) {
taosRemoveFromQset(qset, queue);
}
while (pNode) { while (pNode) {
pTemp = pNode; pTemp = pNode;
...@@ -96,7 +100,7 @@ void taosCloseQueue(STaosQueue *queue) { ...@@ -96,7 +100,7 @@ void taosCloseQueue(STaosQueue *queue) {
pthread_mutex_destroy(&queue->mutex); pthread_mutex_destroy(&queue->mutex);
free(queue); free(queue);
uTrace("queue:%p is closed", queue); uDebug("queue:%p is closed", queue);
} }
bool taosQueueEmpty(STaosQueue *queue) { bool taosQueueEmpty(STaosQueue *queue) {
...@@ -120,9 +124,13 @@ int32_t taosQueueSize(STaosQueue *queue) { ...@@ -120,9 +124,13 @@ int32_t taosQueueSize(STaosQueue *queue) {
} }
void *taosAllocateQitem(int32_t size) { void *taosAllocateQitem(int32_t size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); STaosQnode *pNode = calloc(1, sizeof(STaosQnode) + size);
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
if (pNode == NULL) return NULL;
uTrace("item:%p, node:%p is allocated", pNode->item, pNode); uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
return (void *)pNode->item; return (void *)pNode->item;
} }
...@@ -130,7 +138,7 @@ void *taosAllocateQitem(int32_t size) { ...@@ -130,7 +138,7 @@ void *taosAllocateQitem(int32_t size) {
void taosFreeQitem(void *param) { void taosFreeQitem(void *param) {
if (param == NULL) return; if (param == NULL) return;
char *temp = (char *)param; char *temp = param;
temp -= sizeof(STaosQnode); temp -= sizeof(STaosQnode);
uTrace("item:%p, node:%p is freed", param, temp); uTrace("item:%p, node:%p is freed", param, temp);
free(temp); free(temp);
...@@ -175,7 +183,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { ...@@ -175,7 +183,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
queue->numOfItems--; queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1; code = 1;
uDebug("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
...@@ -183,7 +191,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { ...@@ -183,7 +191,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
return code; return code;
} }
STaosQall *taosAllocateQall() { return calloc(sizeof(STaosQall), 1); } STaosQall *taosAllocateQall() { return calloc(1, sizeof(STaosQall)); }
void taosFreeQall(STaosQall *qall) { free(qall); } void taosFreeQall(STaosQall *qall) { free(qall); }
...@@ -238,7 +246,7 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) { ...@@ -238,7 +246,7 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
STaosQset *taosOpenQset() { STaosQset *taosOpenQset() {
STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); STaosQset *qset = calloc(sizeof(STaosQset), 1);
if (qset == NULL) { if (qset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -247,7 +255,7 @@ STaosQset *taosOpenQset() { ...@@ -247,7 +255,7 @@ STaosQset *taosOpenQset() {
pthread_mutex_init(&qset->mutex, NULL); pthread_mutex_init(&qset->mutex, NULL);
tsem_init(&qset->sem, 0, 0); tsem_init(&qset->sem, 0, 0);
uTrace("qset:%p is opened", qset); uDebug("qset:%p is opened", qset);
return qset; return qset;
} }
...@@ -268,7 +276,7 @@ void taosCloseQset(STaosQset *qset) { ...@@ -268,7 +276,7 @@ void taosCloseQset(STaosQset *qset) {
pthread_mutex_destroy(&qset->mutex); pthread_mutex_destroy(&qset->mutex);
tsem_destroy(&qset->sem); tsem_destroy(&qset->sem);
free(qset); free(qset);
uTrace("qset:%p is closed", qset); uDebug("qset:%p is closed", qset);
} }
// tsem_post 'qset->sem', so that reader threads waiting for it // tsem_post 'qset->sem', so that reader threads waiting for it
...@@ -338,7 +346,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { ...@@ -338,7 +346,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
pthread_mutex_unlock(&qset->mutex); pthread_mutex_unlock(&qset->mutex);
uTrace("queue:%p is removed from qset:%p", queue, qset); uDebug("queue:%p is removed from qset:%p", queue, qset);
} }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
...@@ -365,6 +373,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP ...@@ -365,6 +373,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP
*ppItem = pNode->item; *ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle; if (ahandle) *ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp; if (itemFp) *itemFp = queue->itemFp;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--; queue->numOfItems--;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tworker.h" #include "tworker.h"
#include "taoserror.h"
#include "ulog.h" #include "ulog.h"
typedef void *(*ThreadFp)(void *param); typedef void *(*ThreadFp)(void *param);
...@@ -22,7 +23,13 @@ typedef void *(*ThreadFp)(void *param); ...@@ -22,7 +23,13 @@ typedef void *(*ThreadFp)(void *param);
int32_t tQWorkerInit(SQWorkerPool *pool) { int32_t tQWorkerInit(SQWorkerPool *pool) {
pool->qset = taosOpenQset(); pool->qset = taosOpenQset();
pool->workers = calloc(sizeof(SQWorker), pool->max); pool->workers = calloc(sizeof(SQWorker), pool->max);
if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (pthread_mutex_init(&pool->mutex, NULL)) { if (pthread_mutex_init(&pool->mutex, NULL)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -91,6 +98,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f ...@@ -91,6 +98,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f
STaosQueue *queue = taosOpenQueue(); STaosQueue *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -109,6 +117,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f ...@@ -109,6 +117,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
taosCloseQueue(queue); taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
queue = NULL; queue = NULL;
break; break;
} }
...@@ -133,9 +142,16 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { ...@@ -133,9 +142,16 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
int32_t tWWorkerInit(SWWorkerPool *pool) { int32_t tWWorkerInit(SWWorkerPool *pool) {
pool->nextId = 0; pool->nextId = 0;
pool->workers = calloc(sizeof(SWWorker), pool->max); pool->workers = calloc(sizeof(SWWorker), pool->max);
if (pool->workers == NULL) return -1; if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (pthread_mutex_init(&pool->mutex, NULL) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pthread_mutex_init(&pool->mutex, NULL);
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SWWorker *worker = pool->workers + i; SWWorker *worker = pool->workers + i;
worker->id = i; worker->id = i;
...@@ -208,6 +224,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems ...@@ -208,6 +224,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems
STaosQueue *queue = taosOpenQueue(); STaosQueue *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -227,6 +244,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems ...@@ -227,6 +244,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems
taosCloseQset(worker->qset); taosCloseQset(worker->qset);
taosCloseQueue(queue); taosCloseQueue(queue);
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pthread_attr_t thAttr; pthread_attr_t thAttr;
...@@ -238,6 +256,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems ...@@ -238,6 +256,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems
taosFreeQall(worker->qall); taosFreeQall(worker->qall);
taosCloseQset(worker->qset); taosCloseQset(worker->qset);
taosCloseQueue(queue); taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
queue = NULL; queue = NULL;
} else { } else {
uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册