tqueue.c 11.4 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "tqueue.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
20 21

typedef struct STaosQnode STaosQnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
typedef struct STaosQnode {
24
  STaosQnode *next;
S
Shengliang Guan 已提交
25
  STaosQueue *queue;
26
  char        item[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27 28
} STaosQnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
29
typedef struct STaosQueue {
30 31
  int32_t         itemSize;
  int32_t         numOfItems;
S
Shengliang Guan 已提交
32
  int32_t         threadId;
S
queue  
Shengliang Guan 已提交
33 34 35 36 37
  STaosQnode     *head;
  STaosQnode     *tail;
  STaosQueue     *next;     // for queue set
  STaosQset      *qset;     // for queue set
  void           *ahandle;  // for queue set
S
Shengliang Guan 已提交
38 39
  FItem           itemFp;
  FItems          itemsFp;
wafwerar's avatar
wafwerar 已提交
40
  TdThreadMutex mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41 42
} STaosQueue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43
typedef struct STaosQset {
S
queue  
Shengliang Guan 已提交
44 45
  STaosQueue     *head;
  STaosQueue     *current;
wafwerar's avatar
wafwerar 已提交
46
  TdThreadMutex mutex;
S
Shengliang Guan 已提交
47 48 49
  int32_t         numOfQueues;
  int32_t         numOfItems;
  tsem_t          sem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50 51
} STaosQset;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52
typedef struct STaosQall {
S
Shengliang Guan 已提交
53 54 55 56 57 58
  STaosQnode *current;
  STaosQnode *start;
  int32_t     itemSize;
  int32_t     numOfItems;
} STaosQall;

59
STaosQueue *taosOpenQueue() {
wafwerar's avatar
wafwerar 已提交
60
  STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
  if (queue == NULL) {
S
Shengliang Guan 已提交
62
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64 65
  }

wafwerar's avatar
wafwerar 已提交
66
  if (taosThreadMutexInit(&queue->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
67 68 69
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
70

S
Shengliang Guan 已提交
71
  uDebug("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72 73
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
74

S
Shengliang Guan 已提交
75
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
76
  if (queue == NULL) return;
S
Shengliang Guan 已提交
77 78
  queue->itemFp = itemFp;
  queue->itemsFp = itemsFp;
S
Shengliang Guan 已提交
79 80
}

81 82
void taosCloseQueue(STaosQueue *queue) {
  if (queue == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83
  STaosQnode *pTemp;
S
queue  
Shengliang Guan 已提交
84
  STaosQset  *qset;
85

wafwerar's avatar
wafwerar 已提交
86
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
87
  STaosQnode *pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
88
  queue->head = NULL;
89
  qset = queue->qset;
wafwerar's avatar
wafwerar 已提交
90
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91

S
Shengliang Guan 已提交
92 93 94
  if (queue->qset) {
    taosRemoveFromQset(qset, queue);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97 98
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
wafwerar's avatar
wafwerar 已提交
99
    taosMemoryFree(pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100 101
  }

wafwerar's avatar
wafwerar 已提交
102
  taosThreadMutexDestroy(&queue->mutex);
wafwerar's avatar
wafwerar 已提交
103
  taosMemoryFree(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104

S
Shengliang Guan 已提交
105
  uDebug("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107

108 109
bool taosQueueEmpty(STaosQueue *queue) {
  if (queue == NULL) return true;
S
Shengliang Guan 已提交
110 111

  bool empty = false;
wafwerar's avatar
wafwerar 已提交
112
  taosThreadMutexLock(&queue->mutex);
S
Shengliang Guan 已提交
113 114 115
  if (queue->head == NULL && queue->tail == NULL) {
    empty = true;
  }
wafwerar's avatar
wafwerar 已提交
116
  taosThreadMutexUnlock(&queue->mutex);
S
Shengliang Guan 已提交
117 118 119 120

  return empty;
}

121
int32_t taosQueueSize(STaosQueue *queue) {
wafwerar's avatar
wafwerar 已提交
122
  taosThreadMutexLock(&queue->mutex);
123
  int32_t numOfItems = queue->numOfItems;
wafwerar's avatar
wafwerar 已提交
124
  taosThreadMutexUnlock(&queue->mutex);
125 126 127
  return numOfItems;
}

128
void *taosAllocateQitem(int32_t size) {
wafwerar's avatar
wafwerar 已提交
129
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
S
Shengliang Guan 已提交
130 131 132 133 134

  if (pNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
135

136
  uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137 138
  return (void *)pNode->item;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139

S
Shengliang Guan 已提交
140 141
void taosFreeQitem(void *pItem) {
  if (pItem == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142

S
Shengliang Guan 已提交
143
  char *temp = pItem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144
  temp -= sizeof(STaosQnode);
S
Shengliang Guan 已提交
145
  uTrace("item:%p, node:%p is freed", pItem, temp);
wafwerar's avatar
wafwerar 已提交
146
  taosMemoryFree(temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148

149 150
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
151
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152

wafwerar's avatar
wafwerar 已提交
153
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155 156 157 158 159
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
S
Shengliang Guan 已提交
160
    queue->tail = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161 162 163 164
  }

  queue->numOfItems++;
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
165
  uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
166

wafwerar's avatar
wafwerar 已提交
167
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168

169 170
  if (queue->qset) tsem_post(&queue->qset->sem);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
171
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
172 173
}

174
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175
  STaosQnode *pNode = NULL;
176
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177

wafwerar's avatar
wafwerar 已提交
178
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180
  if (queue->head) {
S
Shengliang Guan 已提交
181
    pNode = queue->head;
182
    *ppItem = pNode->item;
S
Shengliang Guan 已提交
183 184 185 186 187
    queue->head = pNode->next;
    if (queue->head == NULL) queue->tail = NULL;
    queue->numOfItems--;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
    code = 1;
S
Shengliang Guan 已提交
188
    uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
S
Shengliang Guan 已提交
189
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190

wafwerar's avatar
wafwerar 已提交
191
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
192

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193 194
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195

wafwerar's avatar
wafwerar 已提交
196
STaosQall *taosAllocateQall() { return taosMemoryCalloc(1, sizeof(STaosQall)); }
197

wafwerar's avatar
wafwerar 已提交
198
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
199

200 201 202
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
  int32_t code = 0;
  bool    empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
203

wafwerar's avatar
wafwerar 已提交
204
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205

J
Jun Li 已提交
206 207
  empty = queue->head == NULL;
  if (!empty) {
208 209 210 211 212 213 214 215 216 217 218
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
    qall->itemSize = queue->itemSize;
    code = qall->numOfItems;

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
219
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220

wafwerar's avatar
wafwerar 已提交
221
  taosThreadMutexUnlock(&queue->mutex);
J
Jun Li 已提交
222 223 224 225 226 227 228 229

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231

232
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233
  STaosQnode *pNode;
234
  int32_t     num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236
  pNode = qall->current;
S
Shengliang Guan 已提交
237 238
  if (pNode) qall->current = pNode->next;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239
  if (pNode) {
240
    *ppItem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241
    num = 1;
242
    uTrace("item:%p is fetched", *ppItem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243 244
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246 247
}

248
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249

250
STaosQset *taosOpenQset() {
wafwerar's avatar
wafwerar 已提交
251
  STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252
  if (qset == NULL) {
S
Shengliang Guan 已提交
253
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
254
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255 256
  }

wafwerar's avatar
wafwerar 已提交
257
  taosThreadMutexInit(&qset->mutex, NULL);
258
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259

S
Shengliang Guan 已提交
260
  uDebug("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
261 262
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263

264 265
void taosCloseQset(STaosQset *qset) {
  if (qset == NULL) return;
266 267

  // remove all the queues from qset
wafwerar's avatar
wafwerar 已提交
268
  taosThreadMutexLock(&qset->mutex);
269 270 271 272 273 274 275
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
wafwerar's avatar
wafwerar 已提交
276
  taosThreadMutexUnlock(&qset->mutex);
277

wafwerar's avatar
wafwerar 已提交
278
  taosThreadMutexDestroy(&qset->mutex);
279
  tsem_destroy(&qset->sem);
wafwerar's avatar
wafwerar 已提交
280
  taosMemoryFree(qset);
S
Shengliang Guan 已提交
281
  uDebug("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
282 283
}

284 285 286
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
287
void taosQsetThreadResume(STaosQset *qset) {
S
TD-1670  
Shengliang Guan 已提交
288
  uDebug("qset:%p, it will exit", qset);
289 290 291
  tsem_post(&qset->sem);
}

292
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
S
Shengliang Guan 已提交
293
  if (queue->qset) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294

wafwerar's avatar
wafwerar 已提交
295
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296 297

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
298
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
299 300 301
  qset->head = queue;
  qset->numOfQueues++;

wafwerar's avatar
wafwerar 已提交
302
  taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303 304
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
wafwerar's avatar
wafwerar 已提交
305
  taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
306

wafwerar's avatar
wafwerar 已提交
307
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
309
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
310 311 312
  return 0;
}

313
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
314
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
315

wafwerar's avatar
wafwerar 已提交
316
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
317 318 319 320

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
321
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
322 323 324 325
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326
        assert(tqueue->qset);
S
Shengliang Guan 已提交
327
        if (tqueue == queue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
328
          prev->next = tqueue->next;
329
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
330 331 332 333 334 335
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
336 337 338 339 340

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

wafwerar's avatar
wafwerar 已提交
341
      taosThreadMutexLock(&queue->mutex);
342 343
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
344
      queue->next = NULL;
wafwerar's avatar
wafwerar 已提交
345
      taosThreadMutexUnlock(&queue->mutex);
346
    }
S
Shengliang Guan 已提交
347 348
  }

wafwerar's avatar
wafwerar 已提交
349
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
350

S
Shengliang Guan 已提交
351
  uDebug("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
352 353
}

354
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355

S
Shengliang Guan 已提交
356
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
357
  STaosQnode *pNode = NULL;
358
  int32_t     code = 0;
S
Shengliang Guan 已提交
359

360 361
  tsem_wait(&qset->sem);

wafwerar's avatar
wafwerar 已提交
362
  taosThreadMutexLock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
363

364
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
S
Shengliang Guan 已提交
365
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
366
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
367 368
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
369
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
370

wafwerar's avatar
wafwerar 已提交
371
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
372 373

    if (queue->head) {
S
Shengliang Guan 已提交
374
      pNode = queue->head;
375
      *ppItem = pNode->item;
S
Shengliang Guan 已提交
376
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
377
      if (itemFp) *itemFp = queue->itemFp;
S
Shengliang Guan 已提交
378

S
Shengliang Guan 已提交
379 380 381 382 383
      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
      queue->numOfItems--;
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
384
      uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
S
Shengliang Guan 已提交
385
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386

wafwerar's avatar
wafwerar 已提交
387
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
388 389 390
    if (pNode) break;
  }

wafwerar's avatar
wafwerar 已提交
391
  taosThreadMutexUnlock(&qset->mutex);
J
Jeff Tao 已提交
392

S
Shengliang Guan 已提交
393
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
394 395
}

S
Shengliang Guan 已提交
396
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
397
  STaosQueue *queue;
398
  int32_t     code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
399

400
  tsem_wait(&qset->sem);
wafwerar's avatar
wafwerar 已提交
401
  taosThreadMutexLock(&qset->mutex);
J
Jeff Tao 已提交
402

403 404
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
406 407
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
408
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
409

wafwerar's avatar
wafwerar 已提交
410
    taosThreadMutexLock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
411 412

    if (queue->head) {
413 414 415 416 417
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      qall->itemSize = queue->itemSize;
      code = qall->numOfItems;
S
Shengliang Guan 已提交
418
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
419
      if (itemsFp) *itemsFp = queue->itemsFp;
S
Shengliang Guan 已提交
420

421 422 423 424
      queue->head = NULL;
      queue->tail = NULL;
      queue->numOfItems = 0;
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
S
Shengliang Guan 已提交
425 426 427
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
        tsem_wait(&qset->sem);
      }
428
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
429

wafwerar's avatar
wafwerar 已提交
430
    taosThreadMutexUnlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
431

432
    if (code != 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
433
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
434

wafwerar's avatar
wafwerar 已提交
435
  taosThreadMutexUnlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
436 437
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
438

S
Shengliang Guan 已提交
439 440 441 442
void taosResetQsetThread(STaosQset *qset, void *pItem) {
  if (pItem == NULL) return;
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));

wafwerar's avatar
wafwerar 已提交
443
  taosThreadMutexLock(&qset->mutex);
S
Shengliang Guan 已提交
444 445 446
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
    tsem_post(&qset->sem);
  }
wafwerar's avatar
wafwerar 已提交
447
  taosThreadMutexUnlock(&qset->mutex);
S
Shengliang Guan 已提交
448 449
}

450
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
J
Jun Li 已提交
451 452
  if (!queue) return 0;

453
  int32_t num;
wafwerar's avatar
wafwerar 已提交
454
  taosThreadMutexLock(&queue->mutex);
J
Jun Li 已提交
455
  num = queue->numOfItems;
wafwerar's avatar
wafwerar 已提交
456
  taosThreadMutexUnlock(&queue->mutex);
J
Jun Li 已提交
457
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
458 459
}

460
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
J
Jun Li 已提交
461 462
  if (!qset) return 0;

463
  int32_t num = 0;
wafwerar's avatar
wafwerar 已提交
464
  taosThreadMutexLock(&qset->mutex);
J
Jun Li 已提交
465
  num = qset->numOfItems;
wafwerar's avatar
wafwerar 已提交
466
  taosThreadMutexUnlock(&qset->mutex);
J
Jun Li 已提交
467
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
468
}