tqueue.c 10.9 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tqueue.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
19 20 21
#include "ulog.h"

typedef struct STaosQnode STaosQnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
typedef struct STaosQnode {
24 25
  STaosQnode *next;
  char        item[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26 27
} STaosQnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
28
typedef struct STaosQueue {
29 30
  int32_t         itemSize;
  int32_t         numOfItems;
S
Shengliang Guan 已提交
31 32 33 34 35
  STaosQnode *    head;
  STaosQnode *    tail;
  STaosQueue *    next;     // for queue set
  STaosQset *     qset;     // for queue set
  void *          ahandle;  // for queue set
36 37 38
  FProcessItem    itemFp;
  FProcessItems   itemsFp;
  pthread_mutex_t mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39 40
} STaosQueue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41
typedef struct STaosQset {
S
Shengliang Guan 已提交
42 43
  STaosQueue *    head;
  STaosQueue *    current;
S
Shengliang Guan 已提交
44 45 46 47
  pthread_mutex_t mutex;
  int32_t         numOfQueues;
  int32_t         numOfItems;
  tsem_t          sem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48 49
} STaosQset;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50
typedef struct STaosQall {
S
Shengliang Guan 已提交
51 52 53 54 55 56
  STaosQnode *current;
  STaosQnode *start;
  int32_t     itemSize;
  int32_t     numOfItems;
} STaosQall;

57
STaosQueue *taosOpenQueue() {
S
Shengliang Guan 已提交
58
  STaosQueue *queue = calloc(1, sizeof(STaosQueue));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59
  if (queue == NULL) {
S
Shengliang Guan 已提交
60
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62 63
  }

S
Shengliang Guan 已提交
64 65 66 67
  if (pthread_mutex_init(&queue->mutex, NULL) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
68

S
Shengliang Guan 已提交
69
  uDebug("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70 71
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72

73 74
void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) {
  if (queue == NULL) return;
S
Shengliang Guan 已提交
75 76
  queue->itemFp = itemFp;
  queue->itemsFp = itemsFp;
S
Shengliang Guan 已提交
77 78
}

79 80
void taosCloseQueue(STaosQueue *queue) {
  if (queue == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81
  STaosQnode *pTemp;
S
Shengliang Guan 已提交
82
  STaosQset * qset;
83 84

  pthread_mutex_lock(&queue->mutex);
S
Shengliang Guan 已提交
85
  STaosQnode *pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86
  queue->head = NULL;
87 88
  qset = queue->qset;
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89

S
Shengliang Guan 已提交
90 91 92
  if (queue->qset) {
    taosRemoveFromQset(qset, queue);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94 95 96
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
S
Shengliang Guan 已提交
97
    free(pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98 99
  }

100
  pthread_mutex_destroy(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101
  free(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102

S
Shengliang Guan 已提交
103
  uDebug("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105

106 107
bool taosQueueEmpty(STaosQueue *queue) {
  if (queue == NULL) return true;
S
Shengliang Guan 已提交
108 109 110 111 112 113

  bool empty = false;
  pthread_mutex_lock(&queue->mutex);
  if (queue->head == NULL && queue->tail == NULL) {
    empty = true;
  }
114
  pthread_mutex_unlock(&queue->mutex);
S
Shengliang Guan 已提交
115 116 117 118

  return empty;
}

119 120 121 122 123 124 125
int32_t taosQueueSize(STaosQueue *queue) {
  pthread_mutex_lock(&queue->mutex);
  int32_t numOfItems = queue->numOfItems;
  pthread_mutex_unlock(&queue->mutex);
  return numOfItems;
}

126
void *taosAllocateQitem(int32_t size) {
S
Shengliang Guan 已提交
127 128 129 130 131 132
  STaosQnode *pNode = calloc(1, sizeof(STaosQnode) + size);

  if (pNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
133

134
  uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
135 136
  return (void *)pNode->item;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138 139 140
void taosFreeQitem(void *param) {
  if (param == NULL) return;

S
Shengliang Guan 已提交
141
  char *temp = param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142
  temp -= sizeof(STaosQnode);
143
  uTrace("item:%p, node:%p is freed", param, temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144 145
  free(temp);
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146

147 148
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
149
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151
  pthread_mutex_lock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
153 154 155 156 157
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
S
Shengliang Guan 已提交
158
    queue->tail = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159 160 161 162
  }

  queue->numOfItems++;
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
163
  uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
164

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
165
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166

167 168
  if (queue->qset) tsem_post(&queue->qset->sem);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170 171
}

172
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173
  STaosQnode *pNode = NULL;
174
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176
  pthread_mutex_lock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178
  if (queue->head) {
S
Shengliang Guan 已提交
179
    pNode = queue->head;
180
    *ppItem = pNode->item;
S
Shengliang Guan 已提交
181 182 183 184 185
    queue->head = pNode->next;
    if (queue->head == NULL) queue->tail = NULL;
    queue->numOfItems--;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
    code = 1;
S
Shengliang Guan 已提交
186
    uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
S
Shengliang Guan 已提交
187
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191 192
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193

S
Shengliang Guan 已提交
194
STaosQall *taosAllocateQall() { return calloc(1, sizeof(STaosQall)); }
195

196
void taosFreeQall(STaosQall *qall) { free(qall); }
197

198 199 200
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
  int32_t code = 0;
  bool    empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201 202 203

  pthread_mutex_lock(&queue->mutex);

J
Jun Li 已提交
204 205
  empty = queue->head == NULL;
  if (!empty) {
206 207 208 209 210 211 212 213 214 215 216
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
    qall->itemSize = queue->itemSize;
    code = qall->numOfItems;

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
217
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219
  pthread_mutex_unlock(&queue->mutex);
J
Jun Li 已提交
220 221 222 223 224 225 226 227

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229

230
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231
  STaosQnode *pNode;
232
  int32_t     num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234
  pNode = qall->current;
S
Shengliang Guan 已提交
235 236
  if (pNode) qall->current = pNode->next;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237
  if (pNode) {
238
    *ppItem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239
    num = 1;
240
    uTrace("item:%p is fetched", *ppItem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241 242
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244 245
}

246
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
247

248
STaosQset *taosOpenQset() {
S
Shengliang Guan 已提交
249
  STaosQset *qset = calloc(sizeof(STaosQset), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250
  if (qset == NULL) {
S
Shengliang Guan 已提交
251
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253 254
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255
  pthread_mutex_init(&qset->mutex, NULL);
256
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257

S
Shengliang Guan 已提交
258
  uDebug("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259 260
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
261

262 263
void taosCloseQset(STaosQset *qset) {
  if (qset == NULL) return;
264 265 266 267 268 269 270 271 272 273 274 275

  // remove all the queues from qset
  pthread_mutex_lock(&qset->mutex);
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
  pthread_mutex_unlock(&qset->mutex);

276 277
  pthread_mutex_destroy(&qset->mutex);
  tsem_destroy(&qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
278
  free(qset);
S
Shengliang Guan 已提交
279
  uDebug("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
280 281
}

282 283 284
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
285
void taosQsetThreadResume(STaosQset *qset) {
S
TD-1670  
Shengliang Guan 已提交
286
  uDebug("qset:%p, it will exit", qset);
287 288 289
  tsem_post(&qset->sem);
}

290
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
S
Shengliang Guan 已提交
291
  if (queue->qset) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
292 293 294 295

  pthread_mutex_lock(&qset->mutex);

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
297 298 299 300 301 302 303 304 305
  qset->head = queue;
  qset->numOfQueues++;

  pthread_mutex_lock(&queue->mutex);
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
  pthread_mutex_unlock(&queue->mutex);

  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
306

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
307
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308 309 310
  return 0;
}

311
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
312
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
313 314 315 316 317 318

  pthread_mutex_lock(&qset->mutex);

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
319
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
320 321 322 323
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
324
        assert(tqueue->qset);
S
Shengliang Guan 已提交
325
        if (tqueue == queue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326
          prev->next = tqueue->next;
327
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
328 329 330 331 332 333
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
334 335 336 337 338 339 340 341

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

      pthread_mutex_lock(&queue->mutex);
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
342
      queue->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343
      pthread_mutex_unlock(&queue->mutex);
344
    }
S
Shengliang Guan 已提交
345 346
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
347
  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348

S
Shengliang Guan 已提交
349
  uDebug("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
350 351
}

352
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
353

354
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355
  STaosQnode *pNode = NULL;
356
  int32_t     code = 0;
S
Shengliang Guan 已提交
357

358 359
  tsem_wait(&qset->sem);

J
Jeff Tao 已提交
360
  pthread_mutex_lock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
361

362
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
S
Shengliang Guan 已提交
363
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
365 366
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
367
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368 369 370 371

    pthread_mutex_lock(&queue->mutex);

    if (queue->head) {
S
Shengliang Guan 已提交
372
      pNode = queue->head;
373
      *ppItem = pNode->item;
S
Shengliang Guan 已提交
374
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
375
      if (itemFp) *itemFp = queue->itemFp;
S
Shengliang Guan 已提交
376

S
Shengliang Guan 已提交
377 378 379 380 381
      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
      queue->numOfItems--;
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
382
      uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
S
Shengliang Guan 已提交
383
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
384 385 386 387 388

    pthread_mutex_unlock(&queue->mutex);
    if (pNode) break;
  }

J
Jeff Tao 已提交
389 390
  pthread_mutex_unlock(&qset->mutex);

S
Shengliang Guan 已提交
391
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
392 393
}

394
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395
  STaosQueue *queue;
396
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
397

398
  tsem_wait(&qset->sem);
J
Jeff Tao 已提交
399 400
  pthread_mutex_lock(&qset->mutex);

401 402
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
403
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
404 405
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
406
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
407 408 409 410

    pthread_mutex_lock(&queue->mutex);

    if (queue->head) {
411 412 413 414 415
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      qall->itemSize = queue->itemSize;
      code = qall->numOfItems;
S
Shengliang Guan 已提交
416
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
417
      if (itemsFp) *itemsFp = queue->itemsFp;
S
Shengliang Guan 已提交
418

419 420 421 422
      queue->head = NULL;
      queue->tail = NULL;
      queue->numOfItems = 0;
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
423 424
      for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem);
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
425 426 427

    pthread_mutex_unlock(&queue->mutex);

428
    if (code != 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
429
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
430

J
Jeff Tao 已提交
431
  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
432 433
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
434

435
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
J
Jun Li 已提交
436 437
  if (!queue) return 0;

438
  int32_t num;
J
Jun Li 已提交
439 440 441 442
  pthread_mutex_lock(&queue->mutex);
  num = queue->numOfItems;
  pthread_mutex_unlock(&queue->mutex);
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
443 444
}

445
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
J
Jun Li 已提交
446 447
  if (!qset) return 0;

448
  int32_t num = 0;
J
Jun Li 已提交
449 450 451 452
  pthread_mutex_lock(&qset->mutex);
  num = qset->numOfItems;
  pthread_mutex_unlock(&qset->mutex);
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
453
}