tqueue.c 13.4 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tqueue.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
20

21 22 23
int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0;

dengyihao's avatar
dengyihao 已提交
24 25 26
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }

27
STaosQueue *taosOpenQueue() {
wafwerar's avatar
wafwerar 已提交
28
  STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
29
  if (queue == NULL) {
S
Shengliang Guan 已提交
30
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
31
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
32 33
  }

wafwerar's avatar
wafwerar 已提交
34
  if (taosThreadMutexInit(&queue->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
35 36 37
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
38

S
Shengliang Guan 已提交
39
  uDebug("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40 41
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42

S
Shengliang Guan 已提交
43
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
44
  if (queue == NULL) return;
S
Shengliang Guan 已提交
45 46
  queue->itemFp = itemFp;
  queue->itemsFp = itemsFp;
S
Shengliang Guan 已提交
47 48
}

49 50
void taosCloseQueue(STaosQueue *queue) {
  if (queue == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
51
  STaosQnode *pTemp;
L
Liu Jicong 已提交
52
  STaosQset  *qset;
53

wafwerar's avatar
wafwerar 已提交
54
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
55
  STaosQnode *pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
56
  queue->head = NULL;
57
  qset = queue->qset;
wafwerar's avatar
wafwerar 已提交
58
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59

S
Shengliang Guan 已提交
60 61 62
  if (queue->qset) {
    taosRemoveFromQset(qset, queue);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64 65 66
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
wafwerar's avatar
wafwerar 已提交
67
    taosMemoryFree(pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68 69
  }

wafwerar's avatar
wafwerar 已提交
70
  taosThreadMutexDestroy(&queue->mutex);
wafwerar's avatar
wafwerar 已提交
71
  taosMemoryFree(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72

S
Shengliang Guan 已提交
73
  uDebug("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
74
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75

76 77
bool taosQueueEmpty(STaosQueue *queue) {
  if (queue == NULL) return true;
S
Shengliang Guan 已提交
78 79

  bool empty = false;
wafwerar's avatar
wafwerar 已提交
80
  taosThreadMutexLock(&queue->mutex);
dengyihao's avatar
dengyihao 已提交
81
  if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
S
Shengliang Guan 已提交
82 83
    empty = true;
  }
wafwerar's avatar
wafwerar 已提交
84
  taosThreadMutexUnlock(&queue->mutex);
S
Shengliang Guan 已提交
85 86 87 88

  return empty;
}

89 90 91 92 93 94 95 96
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
  if (queue == NULL) return;

  taosThreadMutexLock(&queue->mutex);
  queue->numOfItems -= items;
  taosThreadMutexUnlock(&queue->mutex);
}

97
int32_t taosQueueItemSize(STaosQueue *queue) {
98 99
  if (queue == NULL) return 0;

wafwerar's avatar
wafwerar 已提交
100
  taosThreadMutexLock(&queue->mutex);
101
  int32_t numOfItems = queue->numOfItems;
wafwerar's avatar
wafwerar 已提交
102
  taosThreadMutexUnlock(&queue->mutex);
S
Shengliang Guan 已提交
103 104

  uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
105 106 107
  return numOfItems;
}

108
int64_t taosQueueMemorySize(STaosQueue *queue) {
109
  taosThreadMutexLock(&queue->mutex);
110
  int64_t memOfItems = queue->memOfItems;
111 112 113 114
  taosThreadMutexUnlock(&queue->mutex);
  return memOfItems;
}

S
Shengliang Guan 已提交
115
void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) {
wafwerar's avatar
wafwerar 已提交
116
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
S
Shengliang Guan 已提交
117 118 119 120
  if (pNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
121

S
Shengliang Guan 已提交
122
  pNode->dataSize = dataSize;
S
Shengliang Guan 已提交
123 124 125 126
  pNode->size = size;
  pNode->itype = itype;
  pNode->timestamp = taosGetTimestampUs();

127
  if (itype == RPC_QITEM) {
S
Shengliang Guan 已提交
128
    int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
dengyihao's avatar
dengyihao 已提交
129
    if (alloced > tsRpcQueueMemoryAllowed) {
S
Shengliang Guan 已提交
130 131 132
      uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
             tsRpcQueueMemoryUsed);
      atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
133 134 135 136 137 138 139 140 141
      taosMemoryFree(pNode);
      terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
      return NULL;
    }
    uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
  } else {
    uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
  }

S
Shengliang Guan 已提交
142
  return pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
143
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144

S
Shengliang Guan 已提交
145 146
void taosFreeQitem(void *pItem) {
  if (pItem == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147

148
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
S
Shengliang Guan 已提交
149 150
  if (pNode->itype == RPC_QITEM) {
    int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size + pNode->dataSize);
151 152 153 154 155 156
    uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
  } else {
    uTrace("item:%p, node:%p is freed", pItem, pNode);
  }

  taosMemoryFree(pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
158

dengyihao's avatar
dengyihao 已提交
159 160
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
  int32_t     code = 0;
161
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
162
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163

wafwerar's avatar
wafwerar 已提交
164
  taosThreadMutexLock(&queue->mutex);
dengyihao's avatar
dengyihao 已提交
165
  if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
dengyihao's avatar
dengyihao 已提交
166 167 168 169 170 171
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
    uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
           queue->memLimit, tstrerror(code));

    taosThreadMutexUnlock(&queue->mutex);
    return code;
dengyihao's avatar
dengyihao 已提交
172
  } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
dengyihao's avatar
dengyihao 已提交
173 174 175 176 177 178
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
    uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
           queue->itemLimit, tstrerror(code));
    taosThreadMutexUnlock(&queue->mutex);
    return code;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180 181 182 183 184
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
S
Shengliang Guan 已提交
185
    queue->tail = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
186 187
  }
  queue->numOfItems++;
188
  queue->memOfItems += (pNode->size + pNode->dataSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
dengyihao's avatar
dengyihao 已提交
190

191
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
192

wafwerar's avatar
wafwerar 已提交
193
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194

195
  if (queue->qset) tsem_post(&queue->qset->sem);
dengyihao's avatar
dengyihao 已提交
196
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197 198
}

199
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200
  STaosQnode *pNode = NULL;
201
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202

wafwerar's avatar
wafwerar 已提交
203
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205
  if (queue->head) {
S
Shengliang Guan 已提交
206
    pNode = queue->head;
207
    *ppItem = pNode->item;
S
Shengliang Guan 已提交
208 209 210
    queue->head = pNode->next;
    if (queue->head == NULL) queue->tail = NULL;
    queue->numOfItems--;
dengyihao's avatar
dengyihao 已提交
211
    queue->memOfItems -= (pNode->size + pNode->dataSize);
S
Shengliang Guan 已提交
212 213
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
    code = 1;
214 215
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
           queue->memOfItems);
S
Shengliang Guan 已提交
216
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217

wafwerar's avatar
wafwerar 已提交
218
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220 221
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222

223 224 225 226 227 228 229
STaosQall *taosAllocateQall() {
  STaosQall *qall = taosMemoryCalloc(1, sizeof(STaosQall));
  if (qall != NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }
  return qall;
}
230

wafwerar's avatar
wafwerar 已提交
231
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
232

233
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
234
  int32_t numOfItems = 0;
235
  bool    empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236

wafwerar's avatar
wafwerar 已提交
237
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
238

J
Jun Li 已提交
239 240
  empty = queue->head == NULL;
  if (!empty) {
241 242 243 244
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
245
    numOfItems = qall->numOfItems;
246 247 248 249

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
250
    queue->memOfItems = 0;
251 252
    uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
           queue->memOfItems);
253
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
254
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255

wafwerar's avatar
wafwerar 已提交
256
  taosThreadMutexUnlock(&queue->mutex);
J
Jun Li 已提交
257 258 259 260 261 262 263

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
264
  return numOfItems;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266

267
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
268
  STaosQnode *pNode;
269
  int32_t     num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271
  pNode = qall->current;
S
Shengliang Guan 已提交
272 273
  if (pNode) qall->current = pNode->next;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274
  if (pNode) {
275
    *ppItem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276
    num = 1;
277
    uTrace("item:%p is fetched", *ppItem);
L
Liu Jicong 已提交
278 279
  } else {
    *ppItem = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
280 281
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
282
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283 284
}

285
STaosQset *taosOpenQset() {
wafwerar's avatar
wafwerar 已提交
286
  STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
287
  if (qset == NULL) {
S
Shengliang Guan 已提交
288
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290 291
  }

wafwerar's avatar
wafwerar 已提交
292
  taosThreadMutexInit(&qset->mutex, NULL);
293
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294

S
Shengliang Guan 已提交
295
  uDebug("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296 297
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
298

299 300
void taosCloseQset(STaosQset *qset) {
  if (qset == NULL) return;
301 302

  // remove all the queues from qset
wafwerar's avatar
wafwerar 已提交
303
  taosThreadMutexLock(&qset->mutex);
304 305 306 307 308 309 310
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
wafwerar's avatar
wafwerar 已提交
311
  taosThreadMutexUnlock(&qset->mutex);
312

wafwerar's avatar
wafwerar 已提交
313
  taosThreadMutexDestroy(&qset->mutex);
314
  tsem_destroy(&qset->sem);
wafwerar's avatar
wafwerar 已提交
315
  taosMemoryFree(qset);
S
Shengliang Guan 已提交
316
  uDebug("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
317 318
}

319 320 321
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
322
void taosQsetThreadResume(STaosQset *qset) {
S
TD-1670  
Shengliang Guan 已提交
323
  uDebug("qset:%p, it will exit", qset);
324 325 326
  tsem_post(&qset->sem);
}

327
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
S
Shengliang Guan 已提交
328
  if (queue->qset) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
329

wafwerar's avatar
wafwerar 已提交
330
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
331 332

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
333
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
334 335 336
  qset->head = queue;
  qset->numOfQueues++;

wafwerar's avatar
wafwerar 已提交
337
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
338 339
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
wafwerar's avatar
wafwerar 已提交
340
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
341

wafwerar's avatar
wafwerar 已提交
342
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
344
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
345 346 347
  return 0;
}

348
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
349
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
350

wafwerar's avatar
wafwerar 已提交
351
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
352 353 354 355

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
356
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
357 358 359 360
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
S
Shengliang Guan 已提交
361
        if (tqueue == queue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
362
          prev->next = tqueue->next;
363
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364 365 366 367 368 369
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
370 371 372 373 374

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

wafwerar's avatar
wafwerar 已提交
375
      taosThreadMutexLock(&queue->mutex);
376 377
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
378
      queue->next = NULL;
wafwerar's avatar
wafwerar 已提交
379
      taosThreadMutexUnlock(&queue->mutex);
380
    }
S
Shengliang Guan 已提交
381 382
  }

wafwerar's avatar
wafwerar 已提交
383
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
384

S
Shengliang Guan 已提交
385
  uDebug("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386 387
}

388
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
389
  STaosQnode *pNode = NULL;
390
  int32_t     code = 0;
S
Shengliang Guan 已提交
391

392 393
  tsem_wait(&qset->sem);

wafwerar's avatar
wafwerar 已提交
394
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395

396
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
S
Shengliang Guan 已提交
397
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
398
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
399 400
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
401
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
402

wafwerar's avatar
wafwerar 已提交
403
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
404 405

    if (queue->head) {
S
Shengliang Guan 已提交
406
      pNode = queue->head;
407
      *ppItem = pNode->item;
408 409 410 411
      qinfo->ahandle = queue->ahandle;
      qinfo->fp = queue->itemFp;
      qinfo->queue = queue;
      qinfo->timestamp = pNode->timestamp;
S
Shengliang Guan 已提交
412

S
Shengliang Guan 已提交
413 414
      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
415
      // queue->numOfItems--;
dengyihao's avatar
dengyihao 已提交
416
      queue->memOfItems -= (pNode->size + pNode->dataSize);
S
Shengliang Guan 已提交
417 418
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
419
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
420
             queue->memOfItems);
S
Shengliang Guan 已提交
421
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
422

wafwerar's avatar
wafwerar 已提交
423
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
424 425 426
    if (pNode) break;
  }

wafwerar's avatar
wafwerar 已提交
427
  taosThreadMutexUnlock(&qset->mutex);
J
Jeff Tao 已提交
428

S
Shengliang Guan 已提交
429
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
430 431
}

432
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
433
  STaosQueue *queue;
434
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
435

436
  tsem_wait(&qset->sem);
wafwerar's avatar
wafwerar 已提交
437
  taosThreadMutexLock(&qset->mutex);
J
Jeff Tao 已提交
438

439 440
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
441
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
442 443
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
444
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
445

wafwerar's avatar
wafwerar 已提交
446
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
447 448

    if (queue->head) {
449 450 451 452
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      code = qall->numOfItems;
453 454 455
      qinfo->ahandle = queue->ahandle;
      qinfo->fp = queue->itemsFp;
      qinfo->queue = queue;
S
Shengliang Guan 已提交
456

457 458
      queue->head = NULL;
      queue->tail = NULL;
459
      // queue->numOfItems = 0;
460
      queue->memOfItems = 0;
461
      uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
462

463
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
S
Shengliang Guan 已提交
464 465 466
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
        tsem_wait(&qset->sem);
      }
467
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
468

wafwerar's avatar
wafwerar 已提交
469
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
470

471
    if (code != 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
472
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
473

wafwerar's avatar
wafwerar 已提交
474
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
475 476
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
477

S
Shengliang Guan 已提交
478
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
S
Shengliang Guan 已提交
479 480
void    taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
S
Shengliang Guan 已提交
481 482 483

#if 0

S
Shengliang Guan 已提交
484 485 486 487
void taosResetQsetThread(STaosQset *qset, void *pItem) {
  if (pItem == NULL) return;
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));

wafwerar's avatar
wafwerar 已提交
488
  taosThreadMutexLock(&qset->mutex);
S
Shengliang Guan 已提交
489 490 491
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
    tsem_post(&qset->sem);
  }
wafwerar's avatar
wafwerar 已提交
492
  taosThreadMutexUnlock(&qset->mutex);
S
Shengliang Guan 已提交
493
}
S
Shengliang Guan 已提交
494 495

#endif