tqueue.c 11.8 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tqueue.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
20 21

typedef struct STaosQnode STaosQnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
typedef struct STaosQnode {
24
  STaosQnode *next;
S
Shengliang Guan 已提交
25
  STaosQueue *queue;
26
  int32_t     size;
27
  char        item[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
28 29
} STaosQnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30
typedef struct STaosQueue {
31 32 33 34 35 36 37 38 39 40
  int32_t       memOfItems;
  int32_t       numOfItems;
  int32_t       threadId;
  STaosQnode   *head;
  STaosQnode   *tail;
  STaosQueue   *next;     // for queue set
  STaosQset    *qset;     // for queue set
  void         *ahandle;  // for queue set
  FItem         itemFp;
  FItems        itemsFp;
wafwerar's avatar
wafwerar 已提交
41
  TdThreadMutex mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42 43
} STaosQueue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
44
typedef struct STaosQset {
45 46
  STaosQueue   *head;
  STaosQueue   *current;
wafwerar's avatar
wafwerar 已提交
47
  TdThreadMutex mutex;
48 49 50
  int32_t       numOfQueues;
  int32_t       numOfItems;
  tsem_t        sem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
51 52
} STaosQset;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
53
typedef struct STaosQall {
S
Shengliang Guan 已提交
54 55 56 57 58
  STaosQnode *current;
  STaosQnode *start;
  int32_t     numOfItems;
} STaosQall;

59
STaosQueue *taosOpenQueue() {
wafwerar's avatar
wafwerar 已提交
60
  STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
  if (queue == NULL) {
S
Shengliang Guan 已提交
62
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64 65
  }

wafwerar's avatar
wafwerar 已提交
66
  if (taosThreadMutexInit(&queue->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
67 68 69
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
70

S
Shengliang Guan 已提交
71
  uDebug("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72 73
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
74

S
Shengliang Guan 已提交
75
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
76
  if (queue == NULL) return;
S
Shengliang Guan 已提交
77 78
  queue->itemFp = itemFp;
  queue->itemsFp = itemsFp;
S
Shengliang Guan 已提交
79 80
}

81 82
void taosCloseQueue(STaosQueue *queue) {
  if (queue == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83
  STaosQnode *pTemp;
S
queue  
Shengliang Guan 已提交
84
  STaosQset  *qset;
85

wafwerar's avatar
wafwerar 已提交
86
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
87
  STaosQnode *pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
88
  queue->head = NULL;
89
  qset = queue->qset;
wafwerar's avatar
wafwerar 已提交
90
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91

S
Shengliang Guan 已提交
92 93 94
  if (queue->qset) {
    taosRemoveFromQset(qset, queue);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97 98
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
wafwerar's avatar
wafwerar 已提交
99
    taosMemoryFree(pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100 101
  }

wafwerar's avatar
wafwerar 已提交
102
  taosThreadMutexDestroy(&queue->mutex);
wafwerar's avatar
wafwerar 已提交
103
  taosMemoryFree(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104

S
Shengliang Guan 已提交
105
  uDebug("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107

108 109
bool taosQueueEmpty(STaosQueue *queue) {
  if (queue == NULL) return true;
S
Shengliang Guan 已提交
110 111

  bool empty = false;
wafwerar's avatar
wafwerar 已提交
112
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
113 114 115
  if (queue->head == NULL && queue->tail == NULL) {
    empty = true;
  }
wafwerar's avatar
wafwerar 已提交
116
  taosThreadMutexUnlock(&queue->mutex);
S
Shengliang Guan 已提交
117 118 119 120

  return empty;
}

121
int32_t taosQueueItemSize(STaosQueue *queue) {
wafwerar's avatar
wafwerar 已提交
122
  taosThreadMutexLock(&queue->mutex);
123
  int32_t numOfItems = queue->numOfItems;
wafwerar's avatar
wafwerar 已提交
124
  taosThreadMutexUnlock(&queue->mutex);
125 126 127
  return numOfItems;
}

128 129 130 131 132 133 134
int32_t taosQueueMemorySize(STaosQueue *queue) {
  taosThreadMutexLock(&queue->mutex);
  int32_t memOfItems = queue->memOfItems;
  taosThreadMutexUnlock(&queue->mutex);
  return memOfItems;
}

135
void *taosAllocateQitem(int32_t size) {
wafwerar's avatar
wafwerar 已提交
136
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
137
  pNode->size = size;
S
Shengliang Guan 已提交
138 139 140 141 142

  if (pNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
143

144
  uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145 146
  return (void *)pNode->item;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147

S
Shengliang Guan 已提交
148 149
void taosFreeQitem(void *pItem) {
  if (pItem == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150

S
Shengliang Guan 已提交
151
  char *temp = pItem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152
  temp -= sizeof(STaosQnode);
S
Shengliang Guan 已提交
153
  uTrace("item:%p, node:%p is freed", pItem, temp);
wafwerar's avatar
wafwerar 已提交
154
  taosMemoryFree(temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156

S
Shengliang Guan 已提交
157
void taosWriteQitem(STaosQueue *queue, void *pItem) {
158
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
159
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160

wafwerar's avatar
wafwerar 已提交
161
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163 164 165 166 167
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
S
Shengliang Guan 已提交
168
    queue->tail = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169 170 171
  }

  queue->numOfItems++;
172
  queue->memOfItems += pNode->size;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
174
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
175

wafwerar's avatar
wafwerar 已提交
176
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177

178
  if (queue->qset) tsem_post(&queue->qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179 180
}

181
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182
  STaosQnode *pNode = NULL;
183
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
184

wafwerar's avatar
wafwerar 已提交
185
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
186

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187
  if (queue->head) {
S
Shengliang Guan 已提交
188
    pNode = queue->head;
189
    *ppItem = pNode->item;
S
Shengliang Guan 已提交
190 191 192
    queue->head = pNode->next;
    if (queue->head == NULL) queue->tail = NULL;
    queue->numOfItems--;
193
    queue->memOfItems -= pNode->size;
S
Shengliang Guan 已提交
194 195
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
    code = 1;
196 197
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
           queue->memOfItems);
S
Shengliang Guan 已提交
198
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199

wafwerar's avatar
wafwerar 已提交
200
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202 203
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204

wafwerar's avatar
wafwerar 已提交
205
STaosQall *taosAllocateQall() { return taosMemoryCalloc(1, sizeof(STaosQall)); }
206

wafwerar's avatar
wafwerar 已提交
207
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
208

209 210 211
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
  int32_t code = 0;
  bool    empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212

wafwerar's avatar
wafwerar 已提交
213
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214

J
Jun Li 已提交
215 216
  empty = queue->head == NULL;
  if (!empty) {
217 218 219 220 221 222 223 224 225
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
    code = qall->numOfItems;

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
226
    queue->memOfItems = 0;
227
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
228
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229

wafwerar's avatar
wafwerar 已提交
230
  taosThreadMutexUnlock(&queue->mutex);
J
Jun Li 已提交
231 232 233 234 235 236 237 238

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240

241
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242
  STaosQnode *pNode;
243
  int32_t     num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245
  pNode = qall->current;
S
Shengliang Guan 已提交
246 247
  if (pNode) qall->current = pNode->next;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
248
  if (pNode) {
249
    *ppItem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250
    num = 1;
251
    uTrace("item:%p is fetched", *ppItem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252 253
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
254
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255 256
}

257
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258

259
STaosQset *taosOpenQset() {
wafwerar's avatar
wafwerar 已提交
260
  STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
261
  if (qset == NULL) {
S
Shengliang Guan 已提交
262
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
264 265
  }

wafwerar's avatar
wafwerar 已提交
266
  taosThreadMutexInit(&qset->mutex, NULL);
267
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
268

S
Shengliang Guan 已提交
269
  uDebug("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270 271
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272

273 274
void taosCloseQset(STaosQset *qset) {
  if (qset == NULL) return;
275 276

  // remove all the queues from qset
wafwerar's avatar
wafwerar 已提交
277
  taosThreadMutexLock(&qset->mutex);
278 279 280 281 282 283 284
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
wafwerar's avatar
wafwerar 已提交
285
  taosThreadMutexUnlock(&qset->mutex);
286

wafwerar's avatar
wafwerar 已提交
287
  taosThreadMutexDestroy(&qset->mutex);
288
  tsem_destroy(&qset->sem);
wafwerar's avatar
wafwerar 已提交
289
  taosMemoryFree(qset);
S
Shengliang Guan 已提交
290
  uDebug("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
291 292
}

293 294 295
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
296
void taosQsetThreadResume(STaosQset *qset) {
S
TD-1670  
Shengliang Guan 已提交
297
  uDebug("qset:%p, it will exit", qset);
298 299 300
  tsem_post(&qset->sem);
}

301
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
S
Shengliang Guan 已提交
302
  if (queue->qset) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303

wafwerar's avatar
wafwerar 已提交
304
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
305 306

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
307
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308 309 310
  qset->head = queue;
  qset->numOfQueues++;

wafwerar's avatar
wafwerar 已提交
311
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312 313
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
wafwerar's avatar
wafwerar 已提交
314
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
315

wafwerar's avatar
wafwerar 已提交
316
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
317

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
318
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
319 320 321
  return 0;
}

322
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
323
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
324

wafwerar's avatar
wafwerar 已提交
325
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326 327 328 329

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
330
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
331 332 333 334
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
335
        assert(tqueue->qset);
S
Shengliang Guan 已提交
336
        if (tqueue == queue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
337
          prev->next = tqueue->next;
338
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
339 340 341 342 343 344
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
345 346 347 348 349

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

wafwerar's avatar
wafwerar 已提交
350
      taosThreadMutexLock(&queue->mutex);
351 352
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
353
      queue->next = NULL;
wafwerar's avatar
wafwerar 已提交
354
      taosThreadMutexUnlock(&queue->mutex);
355
    }
S
Shengliang Guan 已提交
356 357
  }

wafwerar's avatar
wafwerar 已提交
358
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
359

S
Shengliang Guan 已提交
360
  uDebug("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
361 362
}

363
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364

S
Shengliang Guan 已提交
365
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
366
  STaosQnode *pNode = NULL;
367
  int32_t     code = 0;
S
Shengliang Guan 已提交
368

369 370
  tsem_wait(&qset->sem);

wafwerar's avatar
wafwerar 已提交
371
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
372

373
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
S
Shengliang Guan 已提交
374
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
375
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
376 377
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
378
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
379

wafwerar's avatar
wafwerar 已提交
380
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
381 382

    if (queue->head) {
S
Shengliang Guan 已提交
383
      pNode = queue->head;
384
      *ppItem = pNode->item;
S
Shengliang Guan 已提交
385
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
386
      if (itemFp) *itemFp = queue->itemFp;
S
Shengliang Guan 已提交
387

S
Shengliang Guan 已提交
388 389 390
      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
      queue->numOfItems--;
391
      queue->memOfItems -= pNode->size;
S
Shengliang Guan 已提交
392 393
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
394 395
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
             queue->memOfItems);
S
Shengliang Guan 已提交
396
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
397

wafwerar's avatar
wafwerar 已提交
398
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
399 400 401
    if (pNode) break;
  }

wafwerar's avatar
wafwerar 已提交
402
  taosThreadMutexUnlock(&qset->mutex);
J
Jeff Tao 已提交
403

S
Shengliang Guan 已提交
404
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405 406
}

S
Shengliang Guan 已提交
407
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
408
  STaosQueue *queue;
409
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
410

411
  tsem_wait(&qset->sem);
wafwerar's avatar
wafwerar 已提交
412
  taosThreadMutexLock(&qset->mutex);
J
Jeff Tao 已提交
413

414 415
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
416
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
417 418
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
419
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
420

wafwerar's avatar
wafwerar 已提交
421
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
422 423

    if (queue->head) {
424 425 426 427
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      code = qall->numOfItems;
S
Shengliang Guan 已提交
428
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
429
      if (itemsFp) *itemsFp = queue->itemsFp;
S
Shengliang Guan 已提交
430

431 432 433
      queue->head = NULL;
      queue->tail = NULL;
      queue->numOfItems = 0;
434
      queue->memOfItems = 0;
435
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
S
Shengliang Guan 已提交
436 437 438
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
        tsem_wait(&qset->sem);
      }
439
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
440

wafwerar's avatar
wafwerar 已提交
441
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
442

443
    if (code != 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
444
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
445

wafwerar's avatar
wafwerar 已提交
446
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
447 448
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
449

S
Shengliang Guan 已提交
450 451 452 453
void taosResetQsetThread(STaosQset *qset, void *pItem) {
  if (pItem == NULL) return;
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));

wafwerar's avatar
wafwerar 已提交
454
  taosThreadMutexLock(&qset->mutex);
S
Shengliang Guan 已提交
455 456 457
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
    tsem_post(&qset->sem);
  }
wafwerar's avatar
wafwerar 已提交
458
  taosThreadMutexUnlock(&qset->mutex);
S
Shengliang Guan 已提交
459 460
}

461
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
J
Jun Li 已提交
462 463
  if (!queue) return 0;

464
  int32_t num;
wafwerar's avatar
wafwerar 已提交
465
  taosThreadMutexLock(&queue->mutex);
J
Jun Li 已提交
466
  num = queue->numOfItems;
wafwerar's avatar
wafwerar 已提交
467
  taosThreadMutexUnlock(&queue->mutex);
J
Jun Li 已提交
468
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
469 470
}

471
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
J
Jun Li 已提交
472 473
  if (!qset) return 0;

474
  int32_t num = 0;
wafwerar's avatar
wafwerar 已提交
475
  taosThreadMutexLock(&qset->mutex);
J
Jun Li 已提交
476
  num = qset->numOfItems;
wafwerar's avatar
wafwerar 已提交
477
  taosThreadMutexUnlock(&qset->mutex);
J
Jun Li 已提交
478
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
479
}