tqueue.c 12.6 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tqueue.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
20 21

typedef struct STaosQnode STaosQnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
typedef struct STaosQnode {
24
  STaosQnode *next;
S
Shengliang Guan 已提交
25
  STaosQueue *queue;
26
  char        item[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27 28
} STaosQnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
29
typedef struct STaosQueue {
30 31
  int32_t         itemSize;
  int32_t         numOfItems;
S
Shengliang Guan 已提交
32
  int32_t         threadId;
S
queue  
Shengliang Guan 已提交
33 34 35 36 37
  STaosQnode     *head;
  STaosQnode     *tail;
  STaosQueue     *next;     // for queue set
  STaosQset      *qset;     // for queue set
  void           *ahandle;  // for queue set
S
Shengliang Guan 已提交
38 39
  FItem           itemFp;
  FItems          itemsFp;
40
  pthread_mutex_t mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41 42
} STaosQueue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43
typedef struct STaosQset {
S
queue  
Shengliang Guan 已提交
44 45
  STaosQueue     *head;
  STaosQueue     *current;
S
Shengliang Guan 已提交
46 47 48 49
  pthread_mutex_t mutex;
  int32_t         numOfQueues;
  int32_t         numOfItems;
  tsem_t          sem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50 51
} STaosQset;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52
typedef struct STaosQall {
S
Shengliang Guan 已提交
53 54 55 56 57 58
  STaosQnode *current;
  STaosQnode *start;
  int32_t     itemSize;
  int32_t     numOfItems;
} STaosQall;

59
STaosQueue *taosOpenQueue() {
S
Shengliang Guan 已提交
60
  STaosQueue *queue = calloc(1, sizeof(STaosQueue));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
  if (queue == NULL) {
S
Shengliang Guan 已提交
62
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64 65
  }

S
Shengliang Guan 已提交
66 67 68 69
  if (pthread_mutex_init(&queue->mutex, NULL) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
70

S
Shengliang Guan 已提交
71
  queue->threadId = -1;
S
Shengliang Guan 已提交
72
  uDebug("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73 74
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75

S
Shengliang Guan 已提交
76
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
77
  if (queue == NULL) return;
S
Shengliang Guan 已提交
78 79
  queue->itemFp = itemFp;
  queue->itemsFp = itemsFp;
S
Shengliang Guan 已提交
80 81
}

82 83
void taosCloseQueue(STaosQueue *queue) {
  if (queue == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
84
  STaosQnode *pTemp;
S
queue  
Shengliang Guan 已提交
85
  STaosQset  *qset;
86 87

  pthread_mutex_lock(&queue->mutex);
S
Shengliang Guan 已提交
88
  STaosQnode *pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89
  queue->head = NULL;
90 91
  qset = queue->qset;
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92

S
Shengliang Guan 已提交
93 94 95
  if (queue->qset) {
    taosRemoveFromQset(qset, queue);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97 98 99
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
S
Shengliang Guan 已提交
100
    free(pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101 102
  }

103
  pthread_mutex_destroy(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104
  free(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105

S
Shengliang Guan 已提交
106
  uDebug("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108

109 110
bool taosQueueEmpty(STaosQueue *queue) {
  if (queue == NULL) return true;
S
Shengliang Guan 已提交
111 112 113 114 115 116

  bool empty = false;
  pthread_mutex_lock(&queue->mutex);
  if (queue->head == NULL && queue->tail == NULL) {
    empty = true;
  }
117
  pthread_mutex_unlock(&queue->mutex);
S
Shengliang Guan 已提交
118 119 120 121

  return empty;
}

122 123 124 125 126 127 128
int32_t taosQueueSize(STaosQueue *queue) {
  pthread_mutex_lock(&queue->mutex);
  int32_t numOfItems = queue->numOfItems;
  pthread_mutex_unlock(&queue->mutex);
  return numOfItems;
}

129
void *taosAllocateQitem(int32_t size) {
S
Shengliang Guan 已提交
130 131 132 133 134 135
  STaosQnode *pNode = calloc(1, sizeof(STaosQnode) + size);

  if (pNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
136

137
  uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138 139
  return (void *)pNode->item;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
140

S
Shengliang Guan 已提交
141 142
void taosFreeQitem(void *pItem) {
  if (pItem == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
143

S
Shengliang Guan 已提交
144
  char *temp = pItem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145
  temp -= sizeof(STaosQnode);
S
Shengliang Guan 已提交
146
  uTrace("item:%p, node:%p is freed", pItem, temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147 148
  free(temp);
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
149

150 151
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
152
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
153

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154
  pthread_mutex_lock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156 157 158 159 160
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
S
Shengliang Guan 已提交
161
    queue->tail = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162 163 164 165
  }

  queue->numOfItems++;
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
166
  uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
167

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169

170 171
  if (queue->qset) tsem_post(&queue->qset->sem);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
172
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173 174
}

175
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176
  STaosQnode *pNode = NULL;
177
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179
  pthread_mutex_lock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181
  if (queue->head) {
S
Shengliang Guan 已提交
182
    pNode = queue->head;
183
    *ppItem = pNode->item;
S
Shengliang Guan 已提交
184 185 186 187 188
    queue->head = pNode->next;
    if (queue->head == NULL) queue->tail = NULL;
    queue->numOfItems--;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
    code = 1;
S
Shengliang Guan 已提交
189
    uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
S
Shengliang Guan 已提交
190
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
192
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194 195
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196

S
Shengliang Guan 已提交
197
STaosQall *taosAllocateQall() { return calloc(1, sizeof(STaosQall)); }
198

199
void taosFreeQall(STaosQall *qall) { free(qall); }
200

201 202 203
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
  int32_t code = 0;
  bool    empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204 205 206

  pthread_mutex_lock(&queue->mutex);

J
Jun Li 已提交
207 208
  empty = queue->head == NULL;
  if (!empty) {
209 210 211 212 213 214 215 216 217 218 219
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
    qall->itemSize = queue->itemSize;
    code = qall->numOfItems;

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
220
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222
  pthread_mutex_unlock(&queue->mutex);
J
Jun Li 已提交
223 224 225 226 227 228 229 230

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232

233
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234
  STaosQnode *pNode;
235
  int32_t     num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237
  pNode = qall->current;
S
Shengliang Guan 已提交
238 239
  if (pNode) qall->current = pNode->next;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240
  if (pNode) {
241
    *ppItem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242
    num = 1;
243
    uTrace("item:%p is fetched", *ppItem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244 245
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
247 248
}

249
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250

251
STaosQset *taosOpenQset() {
S
Shengliang Guan 已提交
252
  STaosQset *qset = calloc(sizeof(STaosQset), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253
  if (qset == NULL) {
S
Shengliang Guan 已提交
254
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256 257
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258
  pthread_mutex_init(&qset->mutex, NULL);
259
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260

S
Shengliang Guan 已提交
261
  uDebug("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262 263
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
264

265 266
void taosCloseQset(STaosQset *qset) {
  if (qset == NULL) return;
267 268 269 270 271 272 273 274 275 276 277 278

  // remove all the queues from qset
  pthread_mutex_lock(&qset->mutex);
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
  pthread_mutex_unlock(&qset->mutex);

279 280
  pthread_mutex_destroy(&qset->mutex);
  tsem_destroy(&qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281
  free(qset);
S
Shengliang Guan 已提交
282
  uDebug("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283 284
}

285 286 287
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
288
void taosQsetThreadResume(STaosQset *qset) {
S
TD-1670  
Shengliang Guan 已提交
289
  uDebug("qset:%p, it will exit", qset);
290 291 292
  tsem_post(&qset->sem);
}

293
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
S
Shengliang Guan 已提交
294
  if (queue->qset) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
295 296 297 298

  pthread_mutex_lock(&qset->mutex);

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
299
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
300 301 302 303 304 305 306 307 308
  qset->head = queue;
  qset->numOfQueues++;

  pthread_mutex_lock(&queue->mutex);
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
  pthread_mutex_unlock(&queue->mutex);

  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
309

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
310
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
311 312 313
  return 0;
}

314
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
315
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
316 317 318 319 320 321

  pthread_mutex_lock(&qset->mutex);

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
322
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
323 324 325 326
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
327
        assert(tqueue->qset);
S
Shengliang Guan 已提交
328
        if (tqueue == queue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
329
          prev->next = tqueue->next;
330
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
331 332 333 334 335 336
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
337 338 339 340 341 342 343 344

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

      pthread_mutex_lock(&queue->mutex);
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
345
      queue->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
346
      pthread_mutex_unlock(&queue->mutex);
347
    }
S
Shengliang Guan 已提交
348 349
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
350
  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
351

S
Shengliang Guan 已提交
352
  uDebug("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
353 354
}

355
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
356

S
Shengliang Guan 已提交
357
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358
  STaosQnode *pNode = NULL;
359
  int32_t     code = 0;
S
Shengliang Guan 已提交
360

361 362
  tsem_wait(&qset->sem);

J
Jeff Tao 已提交
363
  pthread_mutex_lock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364

365
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
S
Shengliang Guan 已提交
366
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
367
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368 369
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
370
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
371 372 373 374

    pthread_mutex_lock(&queue->mutex);

    if (queue->head) {
S
Shengliang Guan 已提交
375
      pNode = queue->head;
376
      *ppItem = pNode->item;
S
Shengliang Guan 已提交
377
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
378
      if (itemFp) *itemFp = queue->itemFp;
S
Shengliang Guan 已提交
379

S
Shengliang Guan 已提交
380 381 382 383 384
      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
      queue->numOfItems--;
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
385
      uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
S
Shengliang Guan 已提交
386
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
387 388 389 390 391

    pthread_mutex_unlock(&queue->mutex);
    if (pNode) break;
  }

J
Jeff Tao 已提交
392 393
  pthread_mutex_unlock(&qset->mutex);

S
Shengliang Guan 已提交
394
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395 396
}

S
Shengliang Guan 已提交
397
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
398
  STaosQueue *queue;
399
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
400

401
  tsem_wait(&qset->sem);
J
Jeff Tao 已提交
402 403
  pthread_mutex_lock(&qset->mutex);

404 405
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
406
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
407 408
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
409
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
410 411 412 413

    pthread_mutex_lock(&queue->mutex);

    if (queue->head) {
414 415 416 417 418
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      qall->itemSize = queue->itemSize;
      code = qall->numOfItems;
S
Shengliang Guan 已提交
419
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
420
      if (itemsFp) *itemsFp = queue->itemsFp;
S
Shengliang Guan 已提交
421

422 423 424 425
      queue->head = NULL;
      queue->tail = NULL;
      queue->numOfItems = 0;
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
S
Shengliang Guan 已提交
426 427 428
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
        tsem_wait(&qset->sem);
      }
429
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
430 431 432

    pthread_mutex_unlock(&queue->mutex);

433
    if (code != 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
434
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
435

J
Jeff Tao 已提交
436
  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
437 438
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
439

S
Shengliang Guan 已提交
440 441
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) {
  STaosQnode *pNode = NULL;
S
Shengliang Guan 已提交
442
  int32_t     code = -1;
S
Shengliang Guan 已提交
443 444 445 446 447 448 449 450 451 452 453

  tsem_wait(&qset->sem);

  pthread_mutex_lock(&qset->mutex);

  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
    STaosQueue *queue = qset->current;
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
    if (queue->head == NULL) continue;
S
Shengliang Guan 已提交
454 455 456 457
    if (queue->threadId != -1 && queue->threadId != threadId) {
      code = 0;
      continue;
    }
S
Shengliang Guan 已提交
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492

    pthread_mutex_lock(&queue->mutex);

    if (queue->head) {
      pNode = queue->head;
      pNode->queue = queue;
      queue->threadId = threadId;
      *ppItem = pNode->item;

      if (ahandle) *ahandle = queue->ahandle;
      if (itemFp) *itemFp = queue->itemFp;

      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
      queue->numOfItems--;
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
      uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
    }

    pthread_mutex_unlock(&queue->mutex);
    if (pNode) break;
  }

  pthread_mutex_unlock(&qset->mutex);

  return code;
}

void taosResetQsetThread(STaosQset *qset, void *pItem) {
  if (pItem == NULL) return;
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));

  pthread_mutex_lock(&qset->mutex);
  pNode->queue->threadId = -1;
S
Shengliang Guan 已提交
493 494 495
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
    tsem_post(&qset->sem);
  }
S
Shengliang Guan 已提交
496 497 498
  pthread_mutex_unlock(&qset->mutex);
}

499
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
J
Jun Li 已提交
500 501
  if (!queue) return 0;

502
  int32_t num;
J
Jun Li 已提交
503 504 505 506
  pthread_mutex_lock(&queue->mutex);
  num = queue->numOfItems;
  pthread_mutex_unlock(&queue->mutex);
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
507 508
}

509
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
J
Jun Li 已提交
510 511
  if (!qset) return 0;

512
  int32_t num = 0;
J
Jun Li 已提交
513 514 515 516
  pthread_mutex_lock(&qset->mutex);
  num = qset->numOfItems;
  pthread_mutex_unlock(&qset->mutex);
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
517
}