tqueue.c 11.4 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tqueue.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
20 21

typedef struct STaosQnode STaosQnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
typedef struct STaosQnode {
24
  STaosQnode *next;
S
Shengliang Guan 已提交
25
  STaosQueue *queue;
26
  char        item[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27 28
} STaosQnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
29
typedef struct STaosQueue {
30 31
  int32_t         itemSize;
  int32_t         numOfItems;
S
Shengliang Guan 已提交
32
  int32_t         threadId;
S
queue  
Shengliang Guan 已提交
33 34 35 36 37
  STaosQnode     *head;
  STaosQnode     *tail;
  STaosQueue     *next;     // for queue set
  STaosQset      *qset;     // for queue set
  void           *ahandle;  // for queue set
S
Shengliang Guan 已提交
38 39
  FItem           itemFp;
  FItems          itemsFp;
wafwerar's avatar
wafwerar 已提交
40
  TdThreadMutex mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41 42
} STaosQueue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43
typedef struct STaosQset {
S
queue  
Shengliang Guan 已提交
44 45
  STaosQueue     *head;
  STaosQueue     *current;
wafwerar's avatar
wafwerar 已提交
46
  TdThreadMutex mutex;
S
Shengliang Guan 已提交
47 48 49
  int32_t         numOfQueues;
  int32_t         numOfItems;
  tsem_t          sem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50 51
} STaosQset;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52
typedef struct STaosQall {
S
Shengliang Guan 已提交
53 54 55 56 57 58
  STaosQnode *current;
  STaosQnode *start;
  int32_t     itemSize;
  int32_t     numOfItems;
} STaosQall;

59
STaosQueue *taosOpenQueue() {
wafwerar's avatar
wafwerar 已提交
60
  STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
  if (queue == NULL) {
S
Shengliang Guan 已提交
62
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64 65
  }

wafwerar's avatar
wafwerar 已提交
66
  if (taosThreadMutexInit(&queue->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
67 68 69
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
70

S
Shengliang Guan 已提交
71
  uDebug("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72 73
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
74

S
Shengliang Guan 已提交
75
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
76
  if (queue == NULL) return;
S
Shengliang Guan 已提交
77 78
  queue->itemFp = itemFp;
  queue->itemsFp = itemsFp;
S
Shengliang Guan 已提交
79 80
}

81 82
void taosCloseQueue(STaosQueue *queue) {
  if (queue == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83
  STaosQnode *pTemp;
S
queue  
Shengliang Guan 已提交
84
  STaosQset  *qset;
85

wafwerar's avatar
wafwerar 已提交
86
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
87
  STaosQnode *pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
88
  queue->head = NULL;
89
  qset = queue->qset;
wafwerar's avatar
wafwerar 已提交
90
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91

S
Shengliang Guan 已提交
92 93 94
  if (queue->qset) {
    taosRemoveFromQset(qset, queue);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97 98
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
wafwerar's avatar
wafwerar 已提交
99
    taosMemoryFree(pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100 101
  }

wafwerar's avatar
wafwerar 已提交
102
  taosThreadMutexDestroy(&queue->mutex);
wafwerar's avatar
wafwerar 已提交
103
  taosMemoryFree(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104

S
Shengliang Guan 已提交
105
  uDebug("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107

108 109
bool taosQueueEmpty(STaosQueue *queue) {
  if (queue == NULL) return true;
S
Shengliang Guan 已提交
110 111

  bool empty = false;
wafwerar's avatar
wafwerar 已提交
112
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
113 114 115
  if (queue->head == NULL && queue->tail == NULL) {
    empty = true;
  }
wafwerar's avatar
wafwerar 已提交
116
  taosThreadMutexUnlock(&queue->mutex);
S
Shengliang Guan 已提交
117 118 119 120

  return empty;
}

121
int32_t taosQueueSize(STaosQueue *queue) {
wafwerar's avatar
wafwerar 已提交
122
  taosThreadMutexLock(&queue->mutex);
123
  int32_t numOfItems = queue->numOfItems;
wafwerar's avatar
wafwerar 已提交
124
  taosThreadMutexUnlock(&queue->mutex);
125 126 127
  return numOfItems;
}

128
void *taosAllocateQitem(int32_t size) {
wafwerar's avatar
wafwerar 已提交
129
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
S
Shengliang Guan 已提交
130 131 132 133 134

  if (pNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
135

136
  uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137 138
  return (void *)pNode->item;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139

S
Shengliang Guan 已提交
140 141
void taosFreeQitem(void *pItem) {
  if (pItem == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142

S
Shengliang Guan 已提交
143
  char *temp = pItem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144
  temp -= sizeof(STaosQnode);
S
Shengliang Guan 已提交
145
  uTrace("item:%p, node:%p is freed", pItem, temp);
wafwerar's avatar
wafwerar 已提交
146
  taosMemoryFree(temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148

S
Shengliang Guan 已提交
149
void taosWriteQitem(STaosQueue *queue, void *pItem) {
150
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
151
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152

wafwerar's avatar
wafwerar 已提交
153
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155 156 157 158 159
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
S
Shengliang Guan 已提交
160
    queue->tail = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161 162 163 164
  }

  queue->numOfItems++;
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
165
  uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
166

wafwerar's avatar
wafwerar 已提交
167
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168

169
  if (queue->qset) tsem_post(&queue->qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170 171
}

172
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173
  STaosQnode *pNode = NULL;
174
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175

wafwerar's avatar
wafwerar 已提交
176
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178
  if (queue->head) {
S
Shengliang Guan 已提交
179
    pNode = queue->head;
180
    *ppItem = pNode->item;
S
Shengliang Guan 已提交
181 182 183 184 185
    queue->head = pNode->next;
    if (queue->head == NULL) queue->tail = NULL;
    queue->numOfItems--;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
    code = 1;
S
Shengliang Guan 已提交
186
    uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
S
Shengliang Guan 已提交
187
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
188

wafwerar's avatar
wafwerar 已提交
189
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191 192
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193

wafwerar's avatar
wafwerar 已提交
194
STaosQall *taosAllocateQall() { return taosMemoryCalloc(1, sizeof(STaosQall)); }
195

wafwerar's avatar
wafwerar 已提交
196
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
197

198 199 200
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
  int32_t code = 0;
  bool    empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201

wafwerar's avatar
wafwerar 已提交
202
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
203

J
Jun Li 已提交
204 205
  empty = queue->head == NULL;
  if (!empty) {
206 207 208 209 210 211 212 213 214 215 216
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
    qall->itemSize = queue->itemSize;
    code = qall->numOfItems;

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
217
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218

wafwerar's avatar
wafwerar 已提交
219
  taosThreadMutexUnlock(&queue->mutex);
J
Jun Li 已提交
220 221 222 223 224 225 226 227

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229

230
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231
  STaosQnode *pNode;
232
  int32_t     num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234
  pNode = qall->current;
S
Shengliang Guan 已提交
235 236
  if (pNode) qall->current = pNode->next;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237
  if (pNode) {
238
    *ppItem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239
    num = 1;
240
    uTrace("item:%p is fetched", *ppItem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241 242
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244 245
}

246
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
247

248
STaosQset *taosOpenQset() {
wafwerar's avatar
wafwerar 已提交
249
  STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250
  if (qset == NULL) {
S
Shengliang Guan 已提交
251
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253 254
  }

wafwerar's avatar
wafwerar 已提交
255
  taosThreadMutexInit(&qset->mutex, NULL);
256
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257

S
Shengliang Guan 已提交
258
  uDebug("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259 260
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
261

262 263
void taosCloseQset(STaosQset *qset) {
  if (qset == NULL) return;
264 265

  // remove all the queues from qset
wafwerar's avatar
wafwerar 已提交
266
  taosThreadMutexLock(&qset->mutex);
267 268 269 270 271 272 273
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
wafwerar's avatar
wafwerar 已提交
274
  taosThreadMutexUnlock(&qset->mutex);
275

wafwerar's avatar
wafwerar 已提交
276
  taosThreadMutexDestroy(&qset->mutex);
277
  tsem_destroy(&qset->sem);
wafwerar's avatar
wafwerar 已提交
278
  taosMemoryFree(qset);
S
Shengliang Guan 已提交
279
  uDebug("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
280 281
}

282 283 284
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
285
void taosQsetThreadResume(STaosQset *qset) {
S
TD-1670  
Shengliang Guan 已提交
286
  uDebug("qset:%p, it will exit", qset);
287 288 289
  tsem_post(&qset->sem);
}

290
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
S
Shengliang Guan 已提交
291
  if (queue->qset) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
292

wafwerar's avatar
wafwerar 已提交
293
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294 295

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
297 298 299
  qset->head = queue;
  qset->numOfQueues++;

wafwerar's avatar
wafwerar 已提交
300
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
301 302
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
wafwerar's avatar
wafwerar 已提交
303
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
304

wafwerar's avatar
wafwerar 已提交
305
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
306

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
307
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308 309 310
  return 0;
}

311
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
312
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
313

wafwerar's avatar
wafwerar 已提交
314
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
315 316 317 318

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
319
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
320 321 322 323
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
324
        assert(tqueue->qset);
S
Shengliang Guan 已提交
325
        if (tqueue == queue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326
          prev->next = tqueue->next;
327
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
328 329 330 331 332 333
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
334 335 336 337 338

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

wafwerar's avatar
wafwerar 已提交
339
      taosThreadMutexLock(&queue->mutex);
340 341
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
342
      queue->next = NULL;
wafwerar's avatar
wafwerar 已提交
343
      taosThreadMutexUnlock(&queue->mutex);
344
    }
S
Shengliang Guan 已提交
345 346
  }

wafwerar's avatar
wafwerar 已提交
347
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348

S
Shengliang Guan 已提交
349
  uDebug("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
350 351
}

352
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
353

S
Shengliang Guan 已提交
354
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355
  STaosQnode *pNode = NULL;
356
  int32_t     code = 0;
S
Shengliang Guan 已提交
357

358 359
  tsem_wait(&qset->sem);

wafwerar's avatar
wafwerar 已提交
360
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
361

362
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
S
Shengliang Guan 已提交
363
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
365 366
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
367
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368

wafwerar's avatar
wafwerar 已提交
369
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
370 371

    if (queue->head) {
S
Shengliang Guan 已提交
372
      pNode = queue->head;
373
      *ppItem = pNode->item;
S
Shengliang Guan 已提交
374
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
375
      if (itemFp) *itemFp = queue->itemFp;
S
Shengliang Guan 已提交
376

S
Shengliang Guan 已提交
377 378 379 380 381
      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
      queue->numOfItems--;
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
382
      uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
S
Shengliang Guan 已提交
383
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
384

wafwerar's avatar
wafwerar 已提交
385
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386 387 388
    if (pNode) break;
  }

wafwerar's avatar
wafwerar 已提交
389
  taosThreadMutexUnlock(&qset->mutex);
J
Jeff Tao 已提交
390

S
Shengliang Guan 已提交
391
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
392 393
}

S
Shengliang Guan 已提交
394
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395
  STaosQueue *queue;
396
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
397

398
  tsem_wait(&qset->sem);
wafwerar's avatar
wafwerar 已提交
399
  taosThreadMutexLock(&qset->mutex);
J
Jeff Tao 已提交
400

401 402
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
403
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
404 405
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
406
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
407

wafwerar's avatar
wafwerar 已提交
408
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
409 410

    if (queue->head) {
411 412 413 414 415
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      qall->itemSize = queue->itemSize;
      code = qall->numOfItems;
S
Shengliang Guan 已提交
416
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
417
      if (itemsFp) *itemsFp = queue->itemsFp;
S
Shengliang Guan 已提交
418

419 420 421 422
      queue->head = NULL;
      queue->tail = NULL;
      queue->numOfItems = 0;
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
S
Shengliang Guan 已提交
423 424 425
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
        tsem_wait(&qset->sem);
      }
426
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
427

wafwerar's avatar
wafwerar 已提交
428
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
429

430
    if (code != 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
431
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
432

wafwerar's avatar
wafwerar 已提交
433
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
434 435
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
436

S
Shengliang Guan 已提交
437 438 439 440
void taosResetQsetThread(STaosQset *qset, void *pItem) {
  if (pItem == NULL) return;
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));

wafwerar's avatar
wafwerar 已提交
441
  taosThreadMutexLock(&qset->mutex);
S
Shengliang Guan 已提交
442 443 444
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
    tsem_post(&qset->sem);
  }
wafwerar's avatar
wafwerar 已提交
445
  taosThreadMutexUnlock(&qset->mutex);
S
Shengliang Guan 已提交
446 447
}

448
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
J
Jun Li 已提交
449 450
  if (!queue) return 0;

451
  int32_t num;
wafwerar's avatar
wafwerar 已提交
452
  taosThreadMutexLock(&queue->mutex);
J
Jun Li 已提交
453
  num = queue->numOfItems;
wafwerar's avatar
wafwerar 已提交
454
  taosThreadMutexUnlock(&queue->mutex);
J
Jun Li 已提交
455
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
456 457
}

458
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
J
Jun Li 已提交
459 460
  if (!qset) return 0;

461
  int32_t num = 0;
wafwerar's avatar
wafwerar 已提交
462
  taosThreadMutexLock(&qset->mutex);
J
Jun Li 已提交
463
  num = qset->numOfItems;
wafwerar's avatar
wafwerar 已提交
464
  taosThreadMutexUnlock(&qset->mutex);
J
Jun Li 已提交
465
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
466
}