From 2eadeb93ad23c245d4488187ba862a6b472c0471 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 8 Mar 2020 10:28:30 +0800 Subject: [PATCH] a new version fro tqueue --- src/inc/taoserror.h | 1 + src/inc/trpc.h | 2 +- src/{rpc => util}/inc/tqueue.h | 0 src/{rpc => util}/src/tqueue.c | 22 +++++++++++++--------- 4 files changed, 15 insertions(+), 10 deletions(-) rename src/{rpc => util}/inc/tqueue.h (100%) rename src/{rpc => util}/src/tqueue.c (95%) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index f153a9a573..c598c56d15 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -46,6 +46,7 @@ static STaosError errors[] = { #endif TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress") +TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, 0, 4, "message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_LAST_SESSION_NOT_FINISHED, 0, 5, "last session not finished") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SESSION_ID, 0, 6, "invalid session id") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TRAN_ID, 0, 7, "invalid transaction id") diff --git a/src/inc/trpc.h b/src/inc/trpc.h index bde863522a..17deb33f37 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -43,7 +43,7 @@ typedef struct { } SRpcConnInfo; typedef struct { - char msgType; + uint8_t msgType; void *pCont; int contLen; int32_t code; diff --git a/src/rpc/inc/tqueue.h b/src/util/inc/tqueue.h similarity index 100% rename from src/rpc/inc/tqueue.h rename to src/util/inc/tqueue.h diff --git a/src/rpc/src/tqueue.c b/src/util/src/tqueue.c similarity index 95% rename from src/rpc/src/tqueue.c rename to src/util/src/tqueue.c index ab440d4819..cb218126a8 100644 --- a/src/rpc/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -24,8 +24,8 @@ typedef struct _taos_qnode { } STaosQnode; typedef struct _taos_q { - int itemSize; - int numOfItems; + int32_t itemSize; + int32_t numOfItems; struct _taos_qnode *head; struct _taos_qnode *tail; struct _taos_q *next; // for queue set @@ -37,15 +37,15 @@ typedef struct _taos_qset { STaosQueue *head; STaosQueue *current; pthread_mutex_t mutex; - int numOfQueues; - int numOfItems; + int32_t numOfQueues; + int32_t numOfItems; } STaosQset; typedef struct _taos_qall { STaosQnode *current; STaosQnode *start; - int itemSize; - int numOfItems; + int32_t itemSize; + int32_t numOfItems; } STaosQall; taos_queue taosOpenQueue(int itemSize) { @@ -57,7 +57,7 @@ taos_queue taosOpenQueue(int itemSize) { } pthread_mutex_init(&queue->mutex, NULL); - queue->itemSize = itemSize; + queue->itemSize = (int32_t)itemSize; return queue; } @@ -108,6 +108,8 @@ int taosWriteQitem(taos_queue param, void *item) { if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); pthread_mutex_unlock(&queue->mutex); + + return 0; } int taosReadQitem(taos_queue param, void *item) { @@ -291,8 +293,9 @@ int taosReadQitemFromQset(taos_qset param, void *item) { if (qset->current == NULL) qset->current = qset->head; STaosQueue *queue = qset->current; - qset->current = queue->next; + if (queue) qset->current = queue->next; pthread_mutex_unlock(&qset->mutex); + if (queue == NULL) break; pthread_mutex_lock(&queue->mutex); @@ -326,8 +329,9 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { if (qset->current == NULL) qset->current = qset->head; queue = qset->current; - qset->current = queue->next; + if (queue) qset->current = queue->next; pthread_mutex_unlock(&qset->mutex); + if (queue == NULL) break; pthread_mutex_lock(&queue->mutex); -- GitLab