tqueue.c 12.9 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tqueue.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
20

21 22 23
int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0;

24
typedef struct STaosQnode STaosQnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
25

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26
typedef struct STaosQnode {
27
  STaosQnode *next;
S
Shengliang Guan 已提交
28
  STaosQueue *queue;
D
dapan1121 已提交
29
  int64_t     timestamp;
30
  int32_t     size;
31 32
  int8_t      itype;
  int8_t      reserved[3];
33
  char        item[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
34 35
} STaosQnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
36
typedef struct STaosQueue {
L
Liu Jicong 已提交
37 38 39 40 41
  STaosQnode   *head;
  STaosQnode   *tail;
  STaosQueue   *next;     // for queue set
  STaosQset    *qset;     // for queue set
  void         *ahandle;  // for queue set
42 43
  FItem         itemFp;
  FItems        itemsFp;
wafwerar's avatar
wafwerar 已提交
44
  TdThreadMutex mutex;
45 46
  int64_t       memOfItems;
  int32_t       numOfItems;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47 48
} STaosQueue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49
typedef struct STaosQset {
L
Liu Jicong 已提交
50 51
  STaosQueue   *head;
  STaosQueue   *current;
wafwerar's avatar
wafwerar 已提交
52
  TdThreadMutex mutex;
53
  tsem_t        sem;
54 55
  int32_t       numOfQueues;
  int32_t       numOfItems;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
56 57
} STaosQset;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58
typedef struct STaosQall {
S
Shengliang Guan 已提交
59 60 61 62 63
  STaosQnode *current;
  STaosQnode *start;
  int32_t     numOfItems;
} STaosQall;

64
STaosQueue *taosOpenQueue() {
wafwerar's avatar
wafwerar 已提交
65
  STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
66
  if (queue == NULL) {
S
Shengliang Guan 已提交
67
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
69 70
  }

wafwerar's avatar
wafwerar 已提交
71
  if (taosThreadMutexInit(&queue->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
72 73 74
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
75

S
Shengliang Guan 已提交
76
  uDebug("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77 78
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79

S
Shengliang Guan 已提交
80
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
81
  if (queue == NULL) return;
S
Shengliang Guan 已提交
82 83
  queue->itemFp = itemFp;
  queue->itemsFp = itemsFp;
S
Shengliang Guan 已提交
84 85
}

86 87
void taosCloseQueue(STaosQueue *queue) {
  if (queue == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
88
  STaosQnode *pTemp;
L
Liu Jicong 已提交
89
  STaosQset  *qset;
90

wafwerar's avatar
wafwerar 已提交
91
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
92
  STaosQnode *pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93
  queue->head = NULL;
94
  qset = queue->qset;
wafwerar's avatar
wafwerar 已提交
95
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96

S
Shengliang Guan 已提交
97 98 99
  if (queue->qset) {
    taosRemoveFromQset(qset, queue);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101 102 103
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
wafwerar's avatar
wafwerar 已提交
104
    taosMemoryFree(pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105 106
  }

wafwerar's avatar
wafwerar 已提交
107
  taosThreadMutexDestroy(&queue->mutex);
wafwerar's avatar
wafwerar 已提交
108
  taosMemoryFree(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109

S
Shengliang Guan 已提交
110
  uDebug("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
111
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112

113 114
bool taosQueueEmpty(STaosQueue *queue) {
  if (queue == NULL) return true;
S
Shengliang Guan 已提交
115 116

  bool empty = false;
wafwerar's avatar
wafwerar 已提交
117
  taosThreadMutexLock(&queue->mutex);
118
  if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) {
S
Shengliang Guan 已提交
119 120
    empty = true;
  }
wafwerar's avatar
wafwerar 已提交
121
  taosThreadMutexUnlock(&queue->mutex);
S
Shengliang Guan 已提交
122 123 124 125

  return empty;
}

126 127 128 129 130 131 132 133
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
  if (queue == NULL) return;

  taosThreadMutexLock(&queue->mutex);
  queue->numOfItems -= items;
  taosThreadMutexUnlock(&queue->mutex);
}

134
int32_t taosQueueItemSize(STaosQueue *queue) {
135 136
  if (queue == NULL) return 0;

wafwerar's avatar
wafwerar 已提交
137
  taosThreadMutexLock(&queue->mutex);
138
  int32_t numOfItems = queue->numOfItems;
wafwerar's avatar
wafwerar 已提交
139
  taosThreadMutexUnlock(&queue->mutex);
140 141 142
  return numOfItems;
}

143
int64_t taosQueueMemorySize(STaosQueue *queue) {
144
  taosThreadMutexLock(&queue->mutex);
145
  int64_t memOfItems = queue->memOfItems;
146 147 148 149
  taosThreadMutexUnlock(&queue->mutex);
  return memOfItems;
}

150
void *taosAllocateQitem(int32_t size, EQItype itype) {
wafwerar's avatar
wafwerar 已提交
151
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
S
Shengliang Guan 已提交
152 153 154 155
  if (pNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157 158 159 160
  pNode->size = size;
  pNode->itype = itype;
  pNode->timestamp = taosGetTimestampUs();

161 162
  if (itype == RPC_QITEM) {
    int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size);
dengyihao's avatar
dengyihao 已提交
163
    if (alloced > tsRpcQueueMemoryAllowed) {
164 165 166 167 168 169 170 171 172
      taosMemoryFree(pNode);
      terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
      return NULL;
    }
    uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
  } else {
    uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
  }

S
Shengliang Guan 已提交
173
  return pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175

S
Shengliang Guan 已提交
176 177
void taosFreeQitem(void *pItem) {
  if (pItem == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178

179 180 181 182 183 184 185 186 187
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
  if (pNode->itype > 0) {
    int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size);
    uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
  } else {
    uTrace("item:%p, node:%p is freed", pItem, pNode);
  }

  taosMemoryFree(pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189

S
Shengliang Guan 已提交
190
void taosWriteQitem(STaosQueue *queue, void *pItem) {
191
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
192
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193

wafwerar's avatar
wafwerar 已提交
194
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196 197 198 199 200
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
S
Shengliang Guan 已提交
201
    queue->tail = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202 203 204
  }

  queue->numOfItems++;
205
  queue->memOfItems += pNode->size;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
207
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
208

wafwerar's avatar
wafwerar 已提交
209
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210

211
  if (queue->qset) tsem_post(&queue->qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212 213
}

214
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215
  STaosQnode *pNode = NULL;
216
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217

wafwerar's avatar
wafwerar 已提交
218
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220
  if (queue->head) {
S
Shengliang Guan 已提交
221
    pNode = queue->head;
222
    *ppItem = pNode->item;
S
Shengliang Guan 已提交
223 224 225
    queue->head = pNode->next;
    if (queue->head == NULL) queue->tail = NULL;
    queue->numOfItems--;
226
    queue->memOfItems -= pNode->size;
S
Shengliang Guan 已提交
227 228
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
    code = 1;
229 230
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
           queue->memOfItems);
S
Shengliang Guan 已提交
231
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232

wafwerar's avatar
wafwerar 已提交
233
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235 236
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237

238 239 240 241 242 243 244
STaosQall *taosAllocateQall() {
  STaosQall *qall = taosMemoryCalloc(1, sizeof(STaosQall));
  if (qall != NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }
  return qall;
}
245

wafwerar's avatar
wafwerar 已提交
246
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
247

248 249 250
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
  int32_t code = 0;
  bool    empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
251

wafwerar's avatar
wafwerar 已提交
252
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253

J
Jun Li 已提交
254 255
  empty = queue->head == NULL;
  if (!empty) {
256 257 258 259 260 261 262 263 264
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
    code = qall->numOfItems;

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
265
    queue->memOfItems = 0;
266
    uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems);
267
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
268
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269

wafwerar's avatar
wafwerar 已提交
270
  taosThreadMutexUnlock(&queue->mutex);
J
Jun Li 已提交
271 272 273 274 275 276 277 278

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
280

281
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
282
  STaosQnode *pNode;
283
  int32_t     num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285
  pNode = qall->current;
S
Shengliang Guan 已提交
286 287
  if (pNode) qall->current = pNode->next;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
288
  if (pNode) {
289
    *ppItem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290
    num = 1;
291
    uTrace("item:%p is fetched", *ppItem);
L
Liu Jicong 已提交
292 293
  } else {
    *ppItem = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294 295
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
297 298
}

299
STaosQset *taosOpenQset() {
wafwerar's avatar
wafwerar 已提交
300
  STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
301
  if (qset == NULL) {
S
Shengliang Guan 已提交
302
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
304 305
  }

wafwerar's avatar
wafwerar 已提交
306
  taosThreadMutexInit(&qset->mutex, NULL);
307
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308

S
Shengliang Guan 已提交
309
  uDebug("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
310 311
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312

313 314
void taosCloseQset(STaosQset *qset) {
  if (qset == NULL) return;
315 316

  // remove all the queues from qset
wafwerar's avatar
wafwerar 已提交
317
  taosThreadMutexLock(&qset->mutex);
318 319 320 321 322 323 324
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
wafwerar's avatar
wafwerar 已提交
325
  taosThreadMutexUnlock(&qset->mutex);
326

wafwerar's avatar
wafwerar 已提交
327
  taosThreadMutexDestroy(&qset->mutex);
328
  tsem_destroy(&qset->sem);
wafwerar's avatar
wafwerar 已提交
329
  taosMemoryFree(qset);
S
Shengliang Guan 已提交
330
  uDebug("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
331 332
}

333 334 335
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
336
void taosQsetThreadResume(STaosQset *qset) {
S
TD-1670  
Shengliang Guan 已提交
337
  uDebug("qset:%p, it will exit", qset);
338 339 340
  tsem_post(&qset->sem);
}

341
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
S
Shengliang Guan 已提交
342
  if (queue->qset) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343

wafwerar's avatar
wafwerar 已提交
344
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
345 346

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
347
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348 349 350
  qset->head = queue;
  qset->numOfQueues++;

wafwerar's avatar
wafwerar 已提交
351
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
352 353
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
wafwerar's avatar
wafwerar 已提交
354
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355

wafwerar's avatar
wafwerar 已提交
356
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
357

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
359 360 361
  return 0;
}

362
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
363
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364

wafwerar's avatar
wafwerar 已提交
365
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
366 367 368 369

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
370
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
371 372 373 374
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
375
        assert(tqueue->qset);
S
Shengliang Guan 已提交
376
        if (tqueue == queue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
377
          prev->next = tqueue->next;
378
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
379 380 381 382 383 384
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
385 386 387 388 389

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

wafwerar's avatar
wafwerar 已提交
390
      taosThreadMutexLock(&queue->mutex);
391 392
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
393
      queue->next = NULL;
wafwerar's avatar
wafwerar 已提交
394
      taosThreadMutexUnlock(&queue->mutex);
395
    }
S
Shengliang Guan 已提交
396 397
  }

wafwerar's avatar
wafwerar 已提交
398
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
399

S
Shengliang Guan 已提交
400
  uDebug("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
401 402
}

403
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
404
  STaosQnode *pNode = NULL;
405
  int32_t     code = 0;
S
Shengliang Guan 已提交
406

407 408
  tsem_wait(&qset->sem);

wafwerar's avatar
wafwerar 已提交
409
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
410

411
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
S
Shengliang Guan 已提交
412
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
413
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
414 415
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
416
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
417

wafwerar's avatar
wafwerar 已提交
418
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
419 420

    if (queue->head) {
S
Shengliang Guan 已提交
421
      pNode = queue->head;
422
      *ppItem = pNode->item;
423 424 425 426
      qinfo->ahandle = queue->ahandle;
      qinfo->fp = queue->itemFp;
      qinfo->queue = queue;
      qinfo->timestamp = pNode->timestamp;
S
Shengliang Guan 已提交
427

S
Shengliang Guan 已提交
428 429
      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
430
      // queue->numOfItems--;
431
      queue->memOfItems -= pNode->size;
S
Shengliang Guan 已提交
432 433
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
434
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
435
             queue->memOfItems);
S
Shengliang Guan 已提交
436
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
437

wafwerar's avatar
wafwerar 已提交
438
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
439 440 441
    if (pNode) break;
  }

wafwerar's avatar
wafwerar 已提交
442
  taosThreadMutexUnlock(&qset->mutex);
J
Jeff Tao 已提交
443

S
Shengliang Guan 已提交
444
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
445 446
}

447
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
448
  STaosQueue *queue;
449
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
450

451
  tsem_wait(&qset->sem);
wafwerar's avatar
wafwerar 已提交
452
  taosThreadMutexLock(&qset->mutex);
J
Jeff Tao 已提交
453

454 455
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
456
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
457 458
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
459
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
460

wafwerar's avatar
wafwerar 已提交
461
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
462 463

    if (queue->head) {
464 465 466 467
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      code = qall->numOfItems;
468 469 470
      qinfo->ahandle = queue->ahandle;
      qinfo->fp = queue->itemsFp;
      qinfo->queue = queue;
S
Shengliang Guan 已提交
471

472 473
      queue->head = NULL;
      queue->tail = NULL;
474
      // queue->numOfItems = 0;
475
      queue->memOfItems = 0;
476
      uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
477

478
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
S
Shengliang Guan 已提交
479 480 481
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
        tsem_wait(&qset->sem);
      }
482
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
483

wafwerar's avatar
wafwerar 已提交
484
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
485

486
    if (code != 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
487
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
488

wafwerar's avatar
wafwerar 已提交
489
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
490 491
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
492

S
Shengliang Guan 已提交
493
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
S
Shengliang Guan 已提交
494 495
void    taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
S
Shengliang Guan 已提交
496 497 498

#if 0

S
Shengliang Guan 已提交
499 500 501 502
void taosResetQsetThread(STaosQset *qset, void *pItem) {
  if (pItem == NULL) return;
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));

wafwerar's avatar
wafwerar 已提交
503
  taosThreadMutexLock(&qset->mutex);
S
Shengliang Guan 已提交
504 505 506
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
    tsem_post(&qset->sem);
  }
wafwerar's avatar
wafwerar 已提交
507
  taosThreadMutexUnlock(&qset->mutex);
S
Shengliang Guan 已提交
508
}
S
Shengliang Guan 已提交
509 510

#endif