diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index f153a9a5739ebbae5c84b4a36065776472aa8083..c598c56d15c6e7dbdf19425a7de7556d765ff56d 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -46,6 +46,7 @@ static STaosError errors[] = { #endif TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress") +TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, 0, 4, "message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_LAST_SESSION_NOT_FINISHED, 0, 5, "last session not finished") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SESSION_ID, 0, 6, "invalid session id") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TRAN_ID, 0, 7, "invalid transaction id") diff --git a/src/inc/trpc.h b/src/inc/trpc.h index bde863522a5b7cd4b51b03eee9e8e2f634cdc97b..17deb33f37e31972fd3afffad6fccf5518230c00 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -43,7 +43,7 @@ typedef struct { } SRpcConnInfo; typedef struct { - char msgType; + uint8_t msgType; void *pCont; int contLen; int32_t code; diff --git a/src/rpc/inc/tqueue.h b/src/util/inc/tqueue.h similarity index 100% rename from src/rpc/inc/tqueue.h rename to src/util/inc/tqueue.h diff --git a/src/rpc/src/tqueue.c b/src/util/src/tqueue.c similarity index 95% rename from src/rpc/src/tqueue.c rename to src/util/src/tqueue.c index ab440d4819b359d6a140ae5d754c046df560fa7d..cb218126a877714c71ef2363960695fa4fb7694b 100644 --- a/src/rpc/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -24,8 +24,8 @@ typedef struct _taos_qnode { } STaosQnode; typedef struct _taos_q { - int itemSize; - int numOfItems; + int32_t itemSize; + int32_t numOfItems; struct _taos_qnode *head; struct _taos_qnode *tail; struct _taos_q *next; // for queue set @@ -37,15 +37,15 @@ typedef struct _taos_qset { STaosQueue *head; STaosQueue *current; pthread_mutex_t mutex; - int numOfQueues; - int numOfItems; + int32_t numOfQueues; + int32_t numOfItems; } STaosQset; typedef struct _taos_qall { STaosQnode *current; STaosQnode *start; - int itemSize; - int numOfItems; + int32_t itemSize; + int32_t numOfItems; } STaosQall; taos_queue taosOpenQueue(int itemSize) { @@ -57,7 +57,7 @@ taos_queue taosOpenQueue(int itemSize) { } pthread_mutex_init(&queue->mutex, NULL); - queue->itemSize = itemSize; + queue->itemSize = (int32_t)itemSize; return queue; } @@ -108,6 +108,8 @@ int taosWriteQitem(taos_queue param, void *item) { if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); pthread_mutex_unlock(&queue->mutex); + + return 0; } int taosReadQitem(taos_queue param, void *item) { @@ -291,8 +293,9 @@ int taosReadQitemFromQset(taos_qset param, void *item) { if (qset->current == NULL) qset->current = qset->head; STaosQueue *queue = qset->current; - qset->current = queue->next; + if (queue) qset->current = queue->next; pthread_mutex_unlock(&qset->mutex); + if (queue == NULL) break; pthread_mutex_lock(&queue->mutex); @@ -326,8 +329,9 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { if (qset->current == NULL) qset->current = qset->head; queue = qset->current; - qset->current = queue->next; + if (queue) qset->current = queue->next; pthread_mutex_unlock(&qset->mutex); + if (queue == NULL) break; pthread_mutex_lock(&queue->mutex);