tqueue.c 11.4 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
S
Shengliang Guan 已提交
17
#include "ulog.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18
#include "taoserror.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
19 20
#include "tqueue.h"

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
21
typedef struct STaosQnode {
S
Shengliang Guan 已提交
22 23
  struct STaosQnode *next;
  char               item[];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
24 25
} STaosQnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26
typedef struct STaosQueue {
S
Shengliang Guan 已提交
27 28 29 30 31 32 33
  int32_t            itemSize;
  int32_t            numOfItems;
  struct STaosQnode *head;
  struct STaosQnode *tail;
  struct STaosQueue *next;     // for queue set
  struct STaosQset  *qset;     // for queue set
  void              *ahandle;  // for queue set
S
Shengliang Guan 已提交
34 35
  FProcessItem       itemFp;
  FProcessItems      itemsFp;
S
Shengliang Guan 已提交
36
  pthread_mutex_t    mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
37 38
} STaosQueue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39
typedef struct STaosQset {
S
Shengliang Guan 已提交
40 41 42 43 44 45
  STaosQueue     *head;
  STaosQueue     *current;
  pthread_mutex_t mutex;
  int32_t         numOfQueues;
  int32_t         numOfItems;
  tsem_t          sem;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46 47
} STaosQset;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48
typedef struct STaosQall {
S
Shengliang Guan 已提交
49 50 51 52 53 54
  STaosQnode *current;
  STaosQnode *start;
  int32_t     itemSize;
  int32_t     numOfItems;
} STaosQall;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55
taos_queue taosOpenQueue() {
S
Shengliang Guan 已提交
56
  STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
  if (queue == NULL) {
S
Shengliang Guan 已提交
58
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
60 61
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62
  pthread_mutex_init(&queue->mutex, NULL);
63

S
TD-1843  
Shengliang Guan 已提交
64
  uTrace("queue:%p is opened", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
65 66
  return queue;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67

S
Shengliang Guan 已提交
68
void taosSetQueueFp(taos_queue param, FProcessItem itemFp, FProcessItems itemsFp) {
S
Shengliang Guan 已提交
69 70
  if (param == NULL) return;
  STaosQueue *queue = (STaosQueue *)param;
S
Shengliang Guan 已提交
71 72
  queue->itemFp = itemFp;
  queue->itemsFp = itemsFp;
S
Shengliang Guan 已提交
73 74
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75
void taosCloseQueue(taos_queue param) {
76
  if (param == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77 78
  STaosQueue *queue = (STaosQueue *)param;
  STaosQnode *pTemp;
79 80 81
  STaosQset  *qset;

  pthread_mutex_lock(&queue->mutex);
S
Shengliang Guan 已提交
82
  STaosQnode *pNode = queue->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83
  queue->head = NULL;
84 85
  qset = queue->qset;
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86

S
Shengliang Guan 已提交
87
  if (queue->qset) taosRemoveFromQset(qset, queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
88

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89 90 91
  while (pNode) {
    pTemp = pNode;
    pNode = pNode->next;
S
Shengliang Guan 已提交
92
    free(pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93 94
  }

95
  pthread_mutex_destroy(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96
  free(queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97 98

  uTrace("queue:%p is closed", queue);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100

S
Shengliang Guan 已提交
101 102 103 104 105 106 107 108 109
bool taosQueueEmpty(taos_queue param) {
  if (param == NULL) return true;
  STaosQueue *queue = (STaosQueue *)param;

  bool empty = false;
  pthread_mutex_lock(&queue->mutex);
  if (queue->head == NULL && queue->tail == NULL) {
    empty = true;
  }
110
  pthread_mutex_unlock(&queue->mutex);
S
Shengliang Guan 已提交
111 112 113 114

  return empty;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
115 116
void *taosAllocateQitem(int size) {
  STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
S
Shengliang Guan 已提交
117

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118
  if (pNode == NULL) return NULL;
119
  uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
120 121
  return (void *)pNode->item;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
122

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123 124 125 126 127
void taosFreeQitem(void *param) {
  if (param == NULL) return;

  char *temp = (char *)param;
  temp -= sizeof(STaosQnode);
128
  uTrace("item:%p, node:%p is freed", param, temp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129 130
  free(temp);
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131

S
Shengliang Guan 已提交
132
int taosWriteQitem(taos_queue param, void *item) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133
  STaosQueue *queue = (STaosQueue *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
134
  STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode));
135
  pNode->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
136

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137
  pthread_mutex_lock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139 140 141 142 143
  if (queue->tail) {
    queue->tail->next = pNode;
    queue->tail = pNode;
  } else {
    queue->head = pNode;
S
Shengliang Guan 已提交
144
    queue->tail = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145 146 147 148
  }

  queue->numOfItems++;
  if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
S
Shengliang Guan 已提交
149
  uTrace("item:%p is put into queue:%p, items:%d", item, queue, queue->numOfItems);
150

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152

153 154
  if (queue->qset) tsem_post(&queue->qset->sem);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155
  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156 157
}

S
Shengliang Guan 已提交
158
int taosReadQitem(taos_queue param, void **pitem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159 160 161
  STaosQueue *queue = (STaosQueue *)param;
  STaosQnode *pNode = NULL;
  int         code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163
  pthread_mutex_lock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
164

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
165
  if (queue->head) {
S
Shengliang Guan 已提交
166 167 168 169 170 171 172 173 174
    pNode = queue->head;
    *pitem = pNode->item;
    queue->head = pNode->next;
    if (queue->head == NULL) queue->tail = NULL;
    queue->numOfItems--;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
    code = 1;
    uDebug("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176
  pthread_mutex_unlock(&queue->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178 179
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180

181
void *taosAllocateQall() {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182
  void *p = calloc(sizeof(STaosQall), 1);
183 184 185
  return p;
}

S
Shengliang Guan 已提交
186
void taosFreeQall(void *param) { free(param); }
187 188

int taosReadAllQitems(taos_queue param, taos_qall p2) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189
  STaosQueue *queue = (STaosQueue *)param;
190
  STaosQall  *qall = (STaosQall *)p2;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191
  int         code = 0;
J
Jun Li 已提交
192
  bool        empty;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193 194 195

  pthread_mutex_lock(&queue->mutex);

J
Jun Li 已提交
196 197
  empty = queue->head == NULL;
  if (!empty) {
198 199 200 201 202 203 204 205 206 207 208
    memset(qall, 0, sizeof(STaosQall));
    qall->current = queue->head;
    qall->start = queue->head;
    qall->numOfItems = queue->numOfItems;
    qall->itemSize = queue->itemSize;
    code = qall->numOfItems;

    queue->head = NULL;
    queue->tail = NULL;
    queue->numOfItems = 0;
    if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
J
Jun Li 已提交
209
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211
  pthread_mutex_unlock(&queue->mutex);
J
Jun Li 已提交
212 213 214 215 216 217 218 219

  // if source queue is empty, we set destination qall to empty too.
  if (empty) {
    qall->current = NULL;
    qall->start = NULL;
    qall->numOfItems = 0;
  }
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221

S
Shengliang Guan 已提交
222
int taosGetQitem(taos_qall param, void **pitem) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223 224 225
  STaosQall  *qall = (STaosQall *)param;
  STaosQnode *pNode;
  int         num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227
  pNode = qall->current;
S
Shengliang Guan 已提交
228 229
  if (pNode) qall->current = pNode->next;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230
  if (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231
    *pitem = pNode->item;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232
    num = 1;
S
Shengliang Guan 已提交
233
    uTrace("item:%p is fetched", *pitem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234 235
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237 238
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239
void taosResetQitems(taos_qall param) {
S
Shengliang Guan 已提交
240
  STaosQall *qall = (STaosQall *)param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241 242 243 244
  qall->current = qall->start;
}

taos_qset taosOpenQset() {
S
Shengliang Guan 已提交
245
  STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246
  if (qset == NULL) {
S
Shengliang Guan 已提交
247
    terrno = TSDB_CODE_OUT_OF_MEMORY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
248
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249 250
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
251
  pthread_mutex_init(&qset->mutex, NULL);
252
  tsem_init(&qset->sem, 0, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253

S
TD-1843  
Shengliang Guan 已提交
254
  uTrace("qset:%p is opened", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255 256
  return qset;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258
void taosCloseQset(taos_qset param) {
259
  if (param == NULL) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260
  STaosQset *qset = (STaosQset *)param;
261 262 263 264 265 266 267 268 269 270 271 272

  // remove all the queues from qset
  pthread_mutex_lock(&qset->mutex);
  while (qset->head) {
    STaosQueue *queue = qset->head;
    qset->head = qset->head->next;

    queue->qset = NULL;
    queue->next = NULL;
  }
  pthread_mutex_unlock(&qset->mutex);

273 274
  pthread_mutex_destroy(&qset->mutex);
  tsem_destroy(&qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275
  free(qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276
  uTrace("qset:%p is closed", qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277 278
}

279 280 281 282 283
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
void taosQsetThreadResume(taos_qset param) {
  STaosQset *qset = (STaosQset *)param;
S
TD-1670  
Shengliang Guan 已提交
284
  uDebug("qset:%p, it will exit", qset);
285 286 287
  tsem_post(&qset->sem);
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
288
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289 290 291
  STaosQueue *queue = (STaosQueue *)p2;
  STaosQset  *qset = (STaosQset *)p1;

S
Shengliang Guan 已提交
292
  if (queue->qset) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
293 294 295 296

  pthread_mutex_lock(&qset->mutex);

  queue->next = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
297
  queue->ahandle = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
298 299 300 301 302 303 304 305 306
  qset->head = queue;
  qset->numOfQueues++;

  pthread_mutex_lock(&queue->mutex);
  atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
  queue->qset = qset;
  pthread_mutex_unlock(&queue->mutex);

  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
307

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308
  uTrace("queue:%p is added into qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
309 310 311
  return 0;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312 313 314
void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
  STaosQueue *queue = (STaosQueue *)p2;
  STaosQset  *qset = (STaosQset *)p1;
S
Shengliang Guan 已提交
315

316
  STaosQueue *tqueue = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
317 318 319 320 321 322

  pthread_mutex_lock(&qset->mutex);

  if (qset->head) {
    if (qset->head == queue) {
      qset->head = qset->head->next;
323
      tqueue = queue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
324 325 326 327
    } else {
      STaosQueue *prev = qset->head;
      tqueue = qset->head->next;
      while (tqueue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
328
        assert(tqueue->qset);
S
Shengliang Guan 已提交
329
        if (tqueue == queue) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
330
          prev->next = tqueue->next;
331
          break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
332 333 334 335 336 337
        } else {
          prev = tqueue;
          tqueue = tqueue->next;
        }
      }
    }
338 339 340 341 342 343 344 345

    if (tqueue) {
      if (qset->current == queue) qset->current = tqueue->next;
      qset->numOfQueues--;

      pthread_mutex_lock(&queue->mutex);
      atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
      queue->qset = NULL;
346
      queue->next = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
347
      pthread_mutex_unlock(&queue->mutex);
348
    }
S
Shengliang Guan 已提交
349 350
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
351
  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
352 353

  uTrace("queue:%p is removed from qset:%p", queue, qset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
354 355
}

S
Shengliang Guan 已提交
356
int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
357

S
Shengliang Guan 已提交
358
int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProcessItem *itemFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
359 360 361
  STaosQset  *qset = (STaosQset *)param;
  STaosQnode *pNode = NULL;
  int         code = 0;
S
Shengliang Guan 已提交
362

363 364
  tsem_wait(&qset->sem);

J
Jeff Tao 已提交
365
  pthread_mutex_lock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
366

S
Shengliang Guan 已提交
367 368
  for (int i = 0; i < qset->numOfQueues; ++i) {
    if (qset->current == NULL) qset->current = qset->head;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
369
    STaosQueue *queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
370 371
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
372
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
373 374 375 376

    pthread_mutex_lock(&queue->mutex);

    if (queue->head) {
S
Shengliang Guan 已提交
377 378 379
      pNode = queue->head;
      *pitem = pNode->item;
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
380
      if (itemFp) *itemFp = queue->itemFp;
S
Shengliang Guan 已提交
381 382 383 384 385 386 387
      queue->head = pNode->next;
      if (queue->head == NULL) queue->tail = NULL;
      queue->numOfItems--;
      atomic_sub_fetch_32(&qset->numOfItems, 1);
      code = 1;
      uTrace("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems);
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
388 389 390 391 392

    pthread_mutex_unlock(&queue->mutex);
    if (pNode) break;
  }

J
Jeff Tao 已提交
393 394
  pthread_mutex_unlock(&qset->mutex);

S
Shengliang Guan 已提交
395
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
396 397
}

S
Shengliang Guan 已提交
398
int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FProcessItems *itemsFp) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
399 400
  STaosQset  *qset = (STaosQset *)param;
  STaosQueue *queue;
401
  STaosQall  *qall = (STaosQall *)p2;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
402 403
  int         code = 0;

404
  tsem_wait(&qset->sem);
J
Jeff Tao 已提交
405 406
  pthread_mutex_lock(&qset->mutex);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
407 408 409 410
  for(int i=0; i<qset->numOfQueues; ++i) {
    if (qset->current == NULL) 
      qset->current = qset->head;   
    queue = qset->current;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
411 412
    if (queue) qset->current = queue->next;
    if (queue == NULL) break;
J
jtao1735 已提交
413
    if (queue->head == NULL) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
414 415 416 417

    pthread_mutex_lock(&queue->mutex);

    if (queue->head) {
418 419 420 421 422
      qall->current = queue->head;
      qall->start = queue->head;
      qall->numOfItems = queue->numOfItems;
      qall->itemSize = queue->itemSize;
      code = qall->numOfItems;
S
Shengliang Guan 已提交
423
      if (ahandle) *ahandle = queue->ahandle;
S
Shengliang Guan 已提交
424
      if (itemsFp) *itemsFp = queue->itemsFp;
S
Shengliang Guan 已提交
425

426 427 428 429
      queue->head = NULL;
      queue->tail = NULL;
      queue->numOfItems = 0;
      atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
430
      for (int j=1; j<qall->numOfItems; ++j) tsem_wait(&qset->sem);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
431 432 433 434 435 436
    } 

    pthread_mutex_unlock(&queue->mutex);

    if (code != 0) break;  
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
437

J
Jeff Tao 已提交
438
  pthread_mutex_unlock(&qset->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
439 440
  return code;
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
441

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
442 443
int taosGetQueueItemsNumber(taos_queue param) {
  STaosQueue *queue = (STaosQueue *)param;
J
Jun Li 已提交
444 445 446 447 448 449 450
  if (!queue) return 0;

  int num;
  pthread_mutex_lock(&queue->mutex);
  num = queue->numOfItems;
  pthread_mutex_unlock(&queue->mutex);
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
451 452
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
453 454
int taosGetQsetItemsNumber(taos_qset param) {
  STaosQset *qset = (STaosQset *)param;
J
Jun Li 已提交
455 456 457 458 459 460 461
  if (!qset) return 0;

  int num = 0;
  pthread_mutex_lock(&qset->mutex);
  num = qset->numOfItems;
  pthread_mutex_unlock(&qset->mutex);
  return num;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
462
}