tqueue.c 12.2 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tqueue.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
20

21 22 23
int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0;

24
typedef struct STaosQnode STaosQnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
25

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26
typedef struct STaosQnode {
27
  STaosQnode *next;
S
Shengliang Guan 已提交
28
  STaosQueue *queue;
29
  int32_t     size;
30 31
  int8_t      itype;
  int8_t      reserved[3];
32
  char        item[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
33 34
} STaosQnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
35
typedef struct STaosQueue {
dengyihao's avatar
dengyihao 已提交
36 37 38 39 40
  STaosQnode *  head;
  STaosQnode *  tail;
  STaosQueue *  next;     // for queue set
  STaosQset *   qset;     // for queue set
  void *        ahandle;  // for queue set
41 42
  FItem         itemFp;
  FItems        itemsFp;
wafwerar's avatar
wafwerar 已提交
43
  TdThreadMutex mutex;
44 45
  int64_t       memOfItems;
  int32_t       numOfItems;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46 47
} STaosQueue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48
typedef struct STaosQset {
dengyihao's avatar
dengyihao 已提交
49 50
  STaosQueue *  head;
  STaosQueue *  current;
wafwerar's avatar
wafwerar 已提交
51
  TdThreadMutex mutex;
52
  tsem_t        sem;
53 54
  int32_t       numOfQueues;
  int32_t       numOfItems;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55 56
} STaosQset;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
typedef struct STaosQall {
S
Shengliang Guan 已提交
58 59 60 61 62
  STaosQnode *current;
  STaosQnode *start;
  int32_t     numOfItems;
} STaosQall;

63
STaosQueue *taosOpenQueue() {
wafwerar's avatar
wafwerar 已提交
64
  STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
65
  if (queue == NULL) {
S
Shengliang Guan 已提交
66
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68 69
  }

wafwerar's avatar
wafwerar 已提交
70
  if (taosThreadMutexInit(&queue->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
71 72 73
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
74

S
Shengliang Guan 已提交
75
  uDebug("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76 77
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78

S
Shengliang Guan 已提交
79
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
80
  if (queue == NULL) return;
S
Shengliang Guan 已提交
81 82
  queue->itemFp = itemFp;
  queue->itemsFp = itemsFp;
S
Shengliang Guan 已提交
83 84
}

85 86
void taosCloseQueue(STaosQueue *queue) {
  if (queue == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
87
  STaosQnode *pTemp;
dengyihao's avatar
dengyihao 已提交
88
  STaosQset * qset;
89

wafwerar's avatar
wafwerar 已提交
90
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
91
  STaosQnode *pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92
  queue->head = NULL;
93
  qset = queue->qset;
wafwerar's avatar
wafwerar 已提交
94
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95

S
Shengliang Guan 已提交
96 97 98
  if (queue->qset) {
    taosRemoveFromQset(qset, queue);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100 101 102
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
wafwerar's avatar
wafwerar 已提交
103
    taosMemoryFree(pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104 105
  }

wafwerar's avatar
wafwerar 已提交
106
  taosThreadMutexDestroy(&queue->mutex);
wafwerar's avatar
wafwerar 已提交
107
  taosMemoryFree(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108

S
Shengliang Guan 已提交
109
  uDebug("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
110
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
111

112 113
bool taosQueueEmpty(STaosQueue *queue) {
  if (queue == NULL) return true;
S
Shengliang Guan 已提交
114 115

  bool empty = false;
wafwerar's avatar
wafwerar 已提交
116
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
117 118 119
  if (queue->head == NULL && queue->tail == NULL) {
    empty = true;
  }
wafwerar's avatar
wafwerar 已提交
120
  taosThreadMutexUnlock(&queue->mutex);
S
Shengliang Guan 已提交
121 122 123 124

  return empty;
}

125
int32_t taosQueueItemSize(STaosQueue *queue) {
126 127
  if (queue == NULL) return 0;

wafwerar's avatar
wafwerar 已提交
128
  taosThreadMutexLock(&queue->mutex);
129
  int32_t numOfItems = queue->numOfItems;
wafwerar's avatar
wafwerar 已提交
130
  taosThreadMutexUnlock(&queue->mutex);
131 132 133
  return numOfItems;
}

134
int64_t taosQueueMemorySize(STaosQueue *queue) {
135 136
  if (queue == NULL) return 0;

137
  taosThreadMutexLock(&queue->mutex);
138
  int64_t memOfItems = queue->memOfItems;
139 140 141 142
  taosThreadMutexUnlock(&queue->mutex);
  return memOfItems;
}

143
void *taosAllocateQitem(int32_t size, EQItype itype) {
wafwerar's avatar
wafwerar 已提交
144
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
145
  pNode->size = size;
146
  pNode->itype = itype;
S
Shengliang Guan 已提交
147 148 149 150 151

  if (pNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
152

153 154
  if (itype == RPC_QITEM) {
    int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size);
dengyihao's avatar
dengyihao 已提交
155
    if (alloced > tsRpcQueueMemoryAllowed) {
156 157 158 159 160 161 162 163 164
      taosMemoryFree(pNode);
      terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
      return NULL;
    }
    uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
  } else {
    uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
165 166
  return (void *)pNode->item;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
167

S
Shengliang Guan 已提交
168 169
void taosFreeQitem(void *pItem) {
  if (pItem == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170

171 172 173 174 175 176 177 178 179
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
  if (pNode->itype > 0) {
    int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size);
    uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
  } else {
    uTrace("item:%p, node:%p is freed", pItem, pNode);
  }

  taosMemoryFree(pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181

S
Shengliang Guan 已提交
182
void taosWriteQitem(STaosQueue *queue, void *pItem) {
183
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
184
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
185

wafwerar's avatar
wafwerar 已提交
186
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188 189 190 191 192
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
S
Shengliang Guan 已提交
193
    queue->tail = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194 195 196
  }

  queue->numOfItems++;
197
  queue->memOfItems += pNode->size;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
198
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
199
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
200

wafwerar's avatar
wafwerar 已提交
201
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202

203
  if (queue->qset) tsem_post(&queue->qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204 205
}

206
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
207
  STaosQnode *pNode = NULL;
208
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
209

wafwerar's avatar
wafwerar 已提交
210
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212
  if (queue->head) {
S
Shengliang Guan 已提交
213
    pNode = queue->head;
214
    *ppItem = pNode->item;
S
Shengliang Guan 已提交
215 216 217
    queue->head = pNode->next;
    if (queue->head == NULL) queue->tail = NULL;
    queue->numOfItems--;
218
    queue->memOfItems -= pNode->size;
S
Shengliang Guan 已提交
219 220
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
    code = 1;
221 222
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
           queue->memOfItems);
S
Shengliang Guan 已提交
223
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
224

wafwerar's avatar
wafwerar 已提交
225
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227 228
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229

230 231 232 233 234 235 236
STaosQall *taosAllocateQall() {
  STaosQall *qall = taosMemoryCalloc(1, sizeof(STaosQall));
  if (qall != NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }
  return qall;
}
237

wafwerar's avatar
wafwerar 已提交
238
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
239

240 241 242
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
  int32_t code = 0;
  bool    empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243

wafwerar's avatar
wafwerar 已提交
244
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245

J
Jun Li 已提交
246 247
  empty = queue->head == NULL;
  if (!empty) {
248 249 250 251 252 253 254 255 256
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
    code = qall->numOfItems;

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
257
    queue->memOfItems = 0;
258
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
259
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260

wafwerar's avatar
wafwerar 已提交
261
  taosThreadMutexUnlock(&queue->mutex);
J
Jun Li 已提交
262 263 264 265 266 267 268 269

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271

272
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
273
  STaosQnode *pNode;
274
  int32_t     num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276
  pNode = qall->current;
S
Shengliang Guan 已提交
277 278
  if (pNode) qall->current = pNode->next;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279
  if (pNode) {
280
    *ppItem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281
    num = 1;
282
    uTrace("item:%p is fetched", *ppItem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283 284
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
286 287
}

288
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289

290
STaosQset *taosOpenQset() {
wafwerar's avatar
wafwerar 已提交
291
  STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
292
  if (qset == NULL) {
S
Shengliang Guan 已提交
293
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
295 296
  }

wafwerar's avatar
wafwerar 已提交
297
  taosThreadMutexInit(&qset->mutex, NULL);
298
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
299

S
Shengliang Guan 已提交
300
  uDebug("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
301 302
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303

304 305
void taosCloseQset(STaosQset *qset) {
  if (qset == NULL) return;
306 307

  // remove all the queues from qset
wafwerar's avatar
wafwerar 已提交
308
  taosThreadMutexLock(&qset->mutex);
309 310 311 312 313 314 315
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
wafwerar's avatar
wafwerar 已提交
316
  taosThreadMutexUnlock(&qset->mutex);
317

wafwerar's avatar
wafwerar 已提交
318
  taosThreadMutexDestroy(&qset->mutex);
319
  tsem_destroy(&qset->sem);
wafwerar's avatar
wafwerar 已提交
320
  taosMemoryFree(qset);
S
Shengliang Guan 已提交
321
  uDebug("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
322 323
}

324 325 326
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
327
void taosQsetThreadResume(STaosQset *qset) {
S
TD-1670  
Shengliang Guan 已提交
328
  uDebug("qset:%p, it will exit", qset);
329 330 331
  tsem_post(&qset->sem);
}

332
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
S
Shengliang Guan 已提交
333
  if (queue->qset) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
334

wafwerar's avatar
wafwerar 已提交
335
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
336 337

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
338
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
339 340 341
  qset->head = queue;
  qset->numOfQueues++;

wafwerar's avatar
wafwerar 已提交
342
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343 344
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
wafwerar's avatar
wafwerar 已提交
345
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
346

wafwerar's avatar
wafwerar 已提交
347
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
349
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
350 351 352
  return 0;
}

353
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
354
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355

wafwerar's avatar
wafwerar 已提交
356
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
357 358 359 360

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
361
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
362 363 364 365
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
366
        assert(tqueue->qset);
S
Shengliang Guan 已提交
367
        if (tqueue == queue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368
          prev->next = tqueue->next;
369
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
370 371 372 373 374 375
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
376 377 378 379 380

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

wafwerar's avatar
wafwerar 已提交
381
      taosThreadMutexLock(&queue->mutex);
382 383
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
384
      queue->next = NULL;
wafwerar's avatar
wafwerar 已提交
385
      taosThreadMutexUnlock(&queue->mutex);
386
    }
S
Shengliang Guan 已提交
387 388
  }

wafwerar's avatar
wafwerar 已提交
389
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
390

S
Shengliang Guan 已提交
391
  uDebug("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
392 393
}

394
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395

S
Shengliang Guan 已提交
396
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
397
  STaosQnode *pNode = NULL;
398
  int32_t     code = 0;
S
Shengliang Guan 已提交
399

400 401
  tsem_wait(&qset->sem);

wafwerar's avatar
wafwerar 已提交
402
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
403

404
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
S
Shengliang Guan 已提交
405
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
406
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
407 408
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
409
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
410

wafwerar's avatar
wafwerar 已提交
411
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
412 413

    if (queue->head) {
S
Shengliang Guan 已提交
414
      pNode = queue->head;
415
      *ppItem = pNode->item;
S
Shengliang Guan 已提交
416
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
417
      if (itemFp) *itemFp = queue->itemFp;
S
Shengliang Guan 已提交
418

S
Shengliang Guan 已提交
419 420 421
      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
      queue->numOfItems--;
422
      queue->memOfItems -= pNode->size;
S
Shengliang Guan 已提交
423 424
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
425 426
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
             queue->memOfItems);
S
Shengliang Guan 已提交
427
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
428

wafwerar's avatar
wafwerar 已提交
429
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
430 431 432
    if (pNode) break;
  }

wafwerar's avatar
wafwerar 已提交
433
  taosThreadMutexUnlock(&qset->mutex);
J
Jeff Tao 已提交
434

S
Shengliang Guan 已提交
435
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
436 437
}

S
Shengliang Guan 已提交
438
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
439
  STaosQueue *queue;
440
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
441

442
  tsem_wait(&qset->sem);
wafwerar's avatar
wafwerar 已提交
443
  taosThreadMutexLock(&qset->mutex);
J
Jeff Tao 已提交
444

445 446
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
447
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
448 449
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
450
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
451

wafwerar's avatar
wafwerar 已提交
452
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
453 454

    if (queue->head) {
455 456 457 458
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      code = qall->numOfItems;
S
Shengliang Guan 已提交
459
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
460
      if (itemsFp) *itemsFp = queue->itemsFp;
S
Shengliang Guan 已提交
461

462 463 464
      queue->head = NULL;
      queue->tail = NULL;
      queue->numOfItems = 0;
465
      queue->memOfItems = 0;
466
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
S
Shengliang Guan 已提交
467 468 469
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
        tsem_wait(&qset->sem);
      }
470
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
471

wafwerar's avatar
wafwerar 已提交
472
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
473

474
    if (code != 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
475
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
476

wafwerar's avatar
wafwerar 已提交
477
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
478 479
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
480

S
Shengliang Guan 已提交
481 482 483 484
void taosResetQsetThread(STaosQset *qset, void *pItem) {
  if (pItem == NULL) return;
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));

wafwerar's avatar
wafwerar 已提交
485
  taosThreadMutexLock(&qset->mutex);
S
Shengliang Guan 已提交
486 487 488
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
    tsem_post(&qset->sem);
  }
wafwerar's avatar
wafwerar 已提交
489
  taosThreadMutexUnlock(&qset->mutex);
S
Shengliang Guan 已提交
490
}