提交 2eadeb93 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

a new version fro tqueue

上级 cebd07db
...@@ -46,6 +46,7 @@ static STaosError errors[] = { ...@@ -46,6 +46,7 @@ static STaosError errors[] = {
#endif #endif
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, 0, 4, "message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_LAST_SESSION_NOT_FINISHED, 0, 5, "last session not finished") TAOS_DEFINE_ERROR(TSDB_CODE_LAST_SESSION_NOT_FINISHED, 0, 5, "last session not finished")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SESSION_ID, 0, 6, "invalid session id") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SESSION_ID, 0, 6, "invalid session id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TRAN_ID, 0, 7, "invalid transaction id") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TRAN_ID, 0, 7, "invalid transaction id")
......
...@@ -43,7 +43,7 @@ typedef struct { ...@@ -43,7 +43,7 @@ typedef struct {
} SRpcConnInfo; } SRpcConnInfo;
typedef struct { typedef struct {
char msgType; uint8_t msgType;
void *pCont; void *pCont;
int contLen; int contLen;
int32_t code; int32_t code;
......
...@@ -24,8 +24,8 @@ typedef struct _taos_qnode { ...@@ -24,8 +24,8 @@ typedef struct _taos_qnode {
} STaosQnode; } STaosQnode;
typedef struct _taos_q { typedef struct _taos_q {
int itemSize; int32_t itemSize;
int numOfItems; int32_t numOfItems;
struct _taos_qnode *head; struct _taos_qnode *head;
struct _taos_qnode *tail; struct _taos_qnode *tail;
struct _taos_q *next; // for queue set struct _taos_q *next; // for queue set
...@@ -37,15 +37,15 @@ typedef struct _taos_qset { ...@@ -37,15 +37,15 @@ typedef struct _taos_qset {
STaosQueue *head; STaosQueue *head;
STaosQueue *current; STaosQueue *current;
pthread_mutex_t mutex; pthread_mutex_t mutex;
int numOfQueues; int32_t numOfQueues;
int numOfItems; int32_t numOfItems;
} STaosQset; } STaosQset;
typedef struct _taos_qall { typedef struct _taos_qall {
STaosQnode *current; STaosQnode *current;
STaosQnode *start; STaosQnode *start;
int itemSize; int32_t itemSize;
int numOfItems; int32_t numOfItems;
} STaosQall; } STaosQall;
taos_queue taosOpenQueue(int itemSize) { taos_queue taosOpenQueue(int itemSize) {
...@@ -57,7 +57,7 @@ taos_queue taosOpenQueue(int itemSize) { ...@@ -57,7 +57,7 @@ taos_queue taosOpenQueue(int itemSize) {
} }
pthread_mutex_init(&queue->mutex, NULL); pthread_mutex_init(&queue->mutex, NULL);
queue->itemSize = itemSize; queue->itemSize = (int32_t)itemSize;
return queue; return queue;
} }
...@@ -108,6 +108,8 @@ int taosWriteQitem(taos_queue param, void *item) { ...@@ -108,6 +108,8 @@ int taosWriteQitem(taos_queue param, void *item) {
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
return 0;
} }
int taosReadQitem(taos_queue param, void *item) { int taosReadQitem(taos_queue param, void *item) {
...@@ -291,8 +293,9 @@ int taosReadQitemFromQset(taos_qset param, void *item) { ...@@ -291,8 +293,9 @@ int taosReadQitemFromQset(taos_qset param, void *item) {
if (qset->current == NULL) if (qset->current == NULL)
qset->current = qset->head; qset->current = qset->head;
STaosQueue *queue = qset->current; STaosQueue *queue = qset->current;
qset->current = queue->next; if (queue) qset->current = queue->next;
pthread_mutex_unlock(&qset->mutex); pthread_mutex_unlock(&qset->mutex);
if (queue == NULL) break;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
...@@ -326,8 +329,9 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { ...@@ -326,8 +329,9 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
if (qset->current == NULL) if (qset->current == NULL)
qset->current = qset->head; qset->current = qset->head;
queue = qset->current; queue = qset->current;
qset->current = queue->next; if (queue) qset->current = queue->next;
pthread_mutex_unlock(&qset->mutex); pthread_mutex_unlock(&qset->mutex);
if (queue == NULL) break;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册