tqueue.c 11.1 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
S
slguan 已提交
17
#include "tulog.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18
#include "taoserror.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
19 20
#include "tqueue.h"

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
21
typedef struct STaosQnode {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22
  int                 type;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
  struct STaosQnode  *next;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
24 25 26
  char                item[];
} STaosQnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27
typedef struct STaosQueue {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
28 29
  int32_t             itemSize;
  int32_t             numOfItems;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30 31 32 33
  struct STaosQnode  *head;
  struct STaosQnode  *tail;
  struct STaosQueue  *next;    // for queue set
  struct STaosQset   *qset;    // for queue set
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
34
  void               *ahandle; // for queue set
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
35 36 37
  pthread_mutex_t     mutex;  
} STaosQueue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
38
typedef struct STaosQset {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39 40 41
  STaosQueue        *head;
  STaosQueue        *current;
  pthread_mutex_t    mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42 43
  int32_t            numOfQueues;
  int32_t            numOfItems;
44
  tsem_t             sem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45 46
} STaosQset;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47
typedef struct STaosQall {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48 49
  STaosQnode   *current;
  STaosQnode   *start;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50 51
  int32_t       itemSize;
  int32_t       numOfItems;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52 53
} STaosQall; 
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
54
taos_queue taosOpenQueue() {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55 56 57
  
  STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1);
  if (queue == NULL) {
58
    terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
60 61
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62
  pthread_mutex_init(&queue->mutex, NULL);
63

S
TD-1843  
Shengliang Guan 已提交
64
  uTrace("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
65 66
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68
void taosCloseQueue(taos_queue param) {
69
  if (param == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70 71
  STaosQueue *queue = (STaosQueue *)param;
  STaosQnode *pTemp;
72 73 74
  STaosQset  *qset;

  pthread_mutex_lock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75 76
  STaosQnode *pNode = queue->head;  
  queue->head = NULL;
77 78
  qset = queue->qset;
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79

80
  if (queue->qset) taosRemoveFromQset(qset, queue); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
82 83 84 85
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
    free (pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86 87
  }

88
  pthread_mutex_destroy(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89
  free(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90 91

  uTrace("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94 95
void *taosAllocateQitem(int size) {
  STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
96
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97
  if (pNode == NULL) return NULL;
98
  uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99 100
  return (void *)pNode->item;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102 103 104 105 106
void taosFreeQitem(void *param) {
  if (param == NULL) return;

  char *temp = (char *)param;
  temp -= sizeof(STaosQnode);
107
  uTrace("item:%p, node:%p is freed", param, temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108 109
  free(temp);
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
110

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
111 112
int taosWriteQitem(taos_queue param, int type, void *item) {
  STaosQueue *queue = (STaosQueue *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113
  STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
114
  pNode->type = type;
115
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
116

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117
  pthread_mutex_lock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
119 120 121 122 123 124 125 126 127 128
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
    queue->tail = pNode; 
  }

  queue->numOfItems++;
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
129
  uTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems);
130

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132

133 134
  if (queue->qset) tsem_post(&queue->qset->sem);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
135
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
136 137
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138
int taosReadQitem(taos_queue param, int *type, void **pitem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139 140 141
  STaosQueue *queue = (STaosQueue *)param;
  STaosQnode *pNode = NULL;
  int         code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
143
  pthread_mutex_lock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145 146
  if (queue->head) {
      pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147 148
      *pitem = pNode->item;
      *type = pNode->type;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
149 150 151 152 153 154
      queue->head = pNode->next;
      if (queue->head == NULL) 
        queue->tail = NULL;
      queue->numOfItems--;
      if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
      code = 1;
155
      uDebug("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156
  } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
158
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160 161
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162

163
void *taosAllocateQall() {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
164
  void *p = calloc(sizeof(STaosQall), 1);
165 166 167 168 169 170 171 172
  return p;
}

void taosFreeQall(void *param) {
  free(param);
}

int taosReadAllQitems(taos_queue param, taos_qall p2) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173
  STaosQueue *queue = (STaosQueue *)param;
174
  STaosQall  *qall = (STaosQall *)p2;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175
  int         code = 0;
J
Jun Li 已提交
176
  bool        empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177 178 179

  pthread_mutex_lock(&queue->mutex);

J
Jun Li 已提交
180 181
  empty = queue->head == NULL;
  if (!empty) {
182 183 184 185 186 187 188 189 190 191 192
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
    qall->itemSize = queue->itemSize;
    code = qall->numOfItems;

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
193
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195
  pthread_mutex_unlock(&queue->mutex);
J
Jun Li 已提交
196 197 198 199 200 201 202 203

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206
int taosGetQitem(taos_qall param, int *type, void **pitem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
207 208 209
  STaosQall  *qall = (STaosQall *)param;
  STaosQnode *pNode;
  int         num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211 212 213 214 215
  pNode = qall->current;
  if (pNode)
    qall->current = pNode->next;
 
  if (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
216
    *pitem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217
    *type = pNode->type;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218
    num = 1;
219
    uTrace("item:%p is fetched, type:%d", *pitem, *type);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220 221
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223 224
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225 226 227 228 229 230 231 232 233
void taosResetQitems(taos_qall param) {
  STaosQall  *qall = (STaosQall *)param;
  qall->current = qall->start;
}

taos_qset taosOpenQset() {

  STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1);
  if (qset == NULL) {
234
    terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236 237
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
238
  pthread_mutex_init(&qset->mutex, NULL);
239
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240

S
TD-1843  
Shengliang Guan 已提交
241
  uTrace("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242 243
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245
void taosCloseQset(taos_qset param) {
246
  if (param == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
247
  STaosQset *qset = (STaosQset *)param;
248 249 250 251 252 253 254 255 256 257 258 259

  // remove all the queues from qset
  pthread_mutex_lock(&qset->mutex);
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
  pthread_mutex_unlock(&qset->mutex);

260 261
  pthread_mutex_destroy(&qset->mutex);
  tsem_destroy(&qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262
  free(qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263
  uTrace("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
264 265
}

266 267 268 269 270
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
void taosQsetThreadResume(taos_qset param) {
  STaosQset *qset = (STaosQset *)param;
S
TD-1670  
Shengliang Guan 已提交
271
  uDebug("qset:%p, it will exit", qset);
272 273 274
  tsem_post(&qset->sem);
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276 277 278 279 280 281 282 283
  STaosQueue *queue = (STaosQueue *)p2;
  STaosQset  *qset = (STaosQset *)p1;

  if (queue->qset) return -1; 

  pthread_mutex_lock(&qset->mutex);

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285 286 287 288 289 290 291 292 293
  qset->head = queue;
  qset->numOfQueues++;

  pthread_mutex_lock(&queue->mutex);
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
  pthread_mutex_unlock(&queue->mutex);

  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
295
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296 297 298
  return 0;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
299 300 301 302
void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
  STaosQueue *queue = (STaosQueue *)p2;
  STaosQset  *qset = (STaosQset *)p1;
 
303
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
304 305 306 307 308 309

  pthread_mutex_lock(&qset->mutex);

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
310
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
311 312 313 314
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
315
        assert(tqueue->qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
316 317
        if (tqueue== queue) {
          prev->next = tqueue->next;
318
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
319 320 321 322 323 324
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
325 326 327 328 329 330 331 332

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

      pthread_mutex_lock(&queue->mutex);
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
333
      queue->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
334
      pthread_mutex_unlock(&queue->mutex);
335
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
336 337 338
  } 
  
  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
339 340

  uTrace("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
341 342 343 344 345 346
}

int taosGetQueueNumber(taos_qset param) {
  return ((STaosQset *)param)->numOfQueues;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
347
int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phandle) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348 349 350
  STaosQset  *qset = (STaosQset *)param;
  STaosQnode *pNode = NULL;
  int         code = 0;
J
Jeff Tao 已提交
351
   
352 353
  tsem_wait(&qset->sem);

J
Jeff Tao 已提交
354
  pthread_mutex_lock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355 356 357 358 359

  for(int i=0; i<qset->numOfQueues; ++i) {
    if (qset->current == NULL) 
      qset->current = qset->head;   
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
360 361
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
362
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
363 364 365 366 367

    pthread_mutex_lock(&queue->mutex);

    if (queue->head) {
        pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368
        *pitem = pNode->item;
369 370
        if (type) *type = pNode->type;
        if (phandle) *phandle = queue->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
371 372 373 374 375 376
        queue->head = pNode->next;
        if (queue->head == NULL) 
          queue->tail = NULL;
        queue->numOfItems--;
        atomic_sub_fetch_32(&qset->numOfItems, 1);
        code = 1;
S
TD-2370  
Shengliang Guan 已提交
377
        uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, pNode->type, queue->numOfItems);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
378 379 380 381 382 383
    } 

    pthread_mutex_unlock(&queue->mutex);
    if (pNode) break;
  }

J
Jeff Tao 已提交
384 385
  pthread_mutex_unlock(&qset->mutex);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386 387 388
  return code; 
}

389
int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
390 391
  STaosQset  *qset = (STaosQset *)param;
  STaosQueue *queue;
392
  STaosQall  *qall = (STaosQall *)p2;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
393 394
  int         code = 0;

395
  tsem_wait(&qset->sem);
J
Jeff Tao 已提交
396 397
  pthread_mutex_lock(&qset->mutex);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
398 399 400 401
  for(int i=0; i<qset->numOfQueues; ++i) {
    if (qset->current == NULL) 
      qset->current = qset->head;   
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
402 403
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
404
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405 406 407 408

    pthread_mutex_lock(&queue->mutex);

    if (queue->head) {
409 410 411 412 413 414
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      qall->itemSize = queue->itemSize;
      code = qall->numOfItems;
      *phandle = queue->ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
415
          
416 417 418 419
      queue->head = NULL;
      queue->tail = NULL;
      queue->numOfItems = 0;
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
420
      for (int j=1; j<qall->numOfItems; ++j) tsem_wait(&qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
421 422 423 424 425 426
    } 

    pthread_mutex_unlock(&queue->mutex);

    if (code != 0) break;  
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
427

J
Jeff Tao 已提交
428
  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
429 430
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
431

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
432 433
int taosGetQueueItemsNumber(taos_queue param) {
  STaosQueue *queue = (STaosQueue *)param;
J
Jun Li 已提交
434 435 436 437 438 439 440
  if (!queue) return 0;

  int num;
  pthread_mutex_lock(&queue->mutex);
  num = queue->numOfItems;
  pthread_mutex_unlock(&queue->mutex);
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
441 442
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
443 444
int taosGetQsetItemsNumber(taos_qset param) {
  STaosQset *qset = (STaosQset *)param;
J
Jun Li 已提交
445 446 447 448 449 450 451
  if (!qset) return 0;

  int num = 0;
  pthread_mutex_lock(&qset->mutex);
  num = qset->numOfItems;
  pthread_mutex_unlock(&qset->mutex);
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
452
}