tworker.c 13.2 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
25
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
31
  (void)taosThreadMutexInit(&pool->mutex, NULL);
S
Shengliang Guan 已提交
32 33

  for (int32_t i = 0; i < pool->max; ++i) {
34
    SQueueWorker *worker = pool->workers + i;
35 36
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
37 38
  }

39
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
45
    SQueueWorker *worker = pool->workers + i;
46
    if (taosCheckPthreadValid(worker->thread)) {
47
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
48 49 50
    }
  }

S
Shengliang Guan 已提交
51
  for (int32_t i = 0; i < pool->max; ++i) {
52
    SQueueWorker *worker = pool->workers + i;
53
    if (taosCheckPthreadValid(worker->thread)) {
54
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
wafwerar's avatar
wafwerar 已提交
55
      taosThreadJoin(worker->thread, NULL);
56
      taosThreadClear(&worker->thread);
57
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
S
TD-2393  
Shengliang Guan 已提交
58 59 60
    }
  }

wafwerar's avatar
wafwerar 已提交
61
  taosMemoryFreeClear(pool->workers);
62
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
63
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
64

65
  uInfo("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
66 67
}

68
static void *tQWorkerThreadFp(SQueueWorker *worker) {
S
Shengliang Guan 已提交
69
  SQWorkerPool *pool = worker->pool;
70 71 72
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
73 74 75

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
76 77
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
78 79

  while (1) {
H
Hongze Cheng 已提交
80
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
81
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
S
Shengliang Guan 已提交
82
            worker->pid);
83 84 85
      break;
    }

86 87 88 89
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItem)qinfo.fp))(&qinfo, msg);
S
Shengliang Guan 已提交
90
    }
91 92

    taosUpdateItemSize(qinfo.queue, 1);
93 94 95 96 97
  }

  return NULL;
}

S
Shengliang Guan 已提交
98
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
99
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
100
  if (queue == NULL) return NULL;
S
TD-2393  
Shengliang Guan 已提交
101

S
Shengliang Guan 已提交
102
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
103
  taosSetQueueFp(queue, fp, NULL);
104
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
105 106

  // spawn a thread to process queue
107
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
108
    do {
109
      SQueueWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
110

wafwerar's avatar
wafwerar 已提交
111 112 113
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
114

S
Shengliang Guan 已提交
115
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
116
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
117
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
118 119
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
120 121
      }

wafwerar's avatar
wafwerar 已提交
122
      taosThreadAttrDestroy(&thAttr);
123
      pool->num++;
124
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
125
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
126 127
  }

wafwerar's avatar
wafwerar 已提交
128
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
129
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
130 131 132 133

  return queue;
}

S
Shengliang Guan 已提交
134
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
135 136 137 138 139 140
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
  taosCloseQueue(queue);
}

int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
  pool->qset = taosOpenQset();
141
  pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
142 143 144 145 146 147 148 149 150 151 152 153 154 155
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  (void)taosThreadMutexInit(&pool->mutex, NULL);

  uInfo("worker:%s is initialized as auto", pool->name);
  return 0;
}

void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
  int32_t size = taosArrayGetSize(pool->workers);
  for (int32_t i = 0; i < size; ++i) {
156
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
157 158 159 160 161 162
    if (taosCheckPthreadValid(worker->thread)) {
      taosQsetThreadResume(pool->qset);
    }
  }

  for (int32_t i = 0; i < size; ++i) {
163
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
164 165 166 167 168 169
    if (taosCheckPthreadValid(worker->thread)) {
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
      taosThreadJoin(worker->thread, NULL);
      taosThreadClear(&worker->thread);
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
    }
170
    taosMemoryFree(worker);
171 172 173 174 175 176 177 178 179
  }

  taosArrayDestroy(pool->workers);
  taosCloseQset(pool->qset);
  taosThreadMutexDestroy(&pool->mutex);

  uInfo("worker:%s is closed", pool->name);
}

180
static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
  SAutoQWorkerPool *pool = worker->pool;
  SQueueInfo        qinfo = {0};
  void             *msg = NULL;
  int32_t           code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);

  while (1) {
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
            worker->pid);
      break;
    }

    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = taosArrayGetSize(pool->workers);
      (*((FItem)qinfo.fp))(&qinfo, msg);
    }

    taosUpdateItemSize(qinfo.queue, 1);
  }

  return NULL;
}

STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
  STaosQueue *queue = taosOpenQueue();
  if (queue == NULL) return NULL;

  taosThreadMutexLock(&pool->mutex);
  taosSetQueueFp(queue, fp, NULL);
  taosAddIntoQset(pool->qset, queue, ahandle);

  int32_t queueNum = taosGetQueueNumber(pool->qset);
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
220 221
  int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
  if (dstWorkerNum < 1) dstWorkerNum = 1;
222

223
  // spawn a thread to process queue
224
  while (curWorkerNum < dstWorkerNum) {
225
    SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
226 227 228
    if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
      uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
      taosMemoryFree(worker);
229
      taosCloseQueue(queue);
S
Shengliang Guan 已提交
230
      taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
231
      terrno = TSDB_CODE_OUT_OF_MEMORY;
232 233
      return NULL;
    }
234 235 236
    worker->id = curWorkerNum;
    worker->pool = pool;

237 238 239 240 241
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
242 243 244
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
      (void)taosArrayPop(pool->workers);
      taosMemoryFree(worker);
245 246 247 248 249 250
      taosCloseQueue(queue);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }

    taosThreadAttrDestroy(&thAttr);
251 252
    int32_t numOfThreads = taosArrayGetSize(pool->workers);
    uInfo("worker:%s:%d is launched, total:%d, expect:%d", pool->name, worker->id, numOfThreads, dstWorkerNum);
253 254 255 256 257 258 259 260 261 262 263 264

    curWorkerNum++;
  }

  taosThreadMutexUnlock(&pool->mutex);
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);

  return queue;
}

void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
265
  taosCloseQueue(queue);
266 267
}

S
Shengliang Guan 已提交
268
int32_t tWWorkerInit(SWWorkerPool *pool) {
269
  pool->nextId = 0;
wafwerar's avatar
wafwerar 已提交
270
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
271 272 273 274 275
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
276
  (void)taosThreadMutexInit(&pool->mutex, NULL);
277 278

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
279
    SWWorker *worker = pool->workers + i;
280 281 282 283 284 285
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

286
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
287 288 289
  return 0;
}

S
Shengliang Guan 已提交
290
void tWWorkerCleanup(SWWorkerPool *pool) {
291
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
292
    SWWorker *worker = pool->workers + i;
293
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
294 295 296
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
297 298 299 300
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
301
    SWWorker *worker = pool->workers + i;
302
    if (taosCheckPthreadValid(worker->thread)) {
303
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
wafwerar's avatar
wafwerar 已提交
304
      taosThreadJoin(worker->thread, NULL);
305
      taosThreadClear(&worker->thread);
306 307
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
308
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
309 310 311
    }
  }

wafwerar's avatar
wafwerar 已提交
312
  taosMemoryFreeClear(pool->workers);
wafwerar's avatar
wafwerar 已提交
313
  taosThreadMutexDestroy(&pool->mutex);
314

315
  uInfo("worker:%s is closed", pool->name);
316 317
}

S
Shengliang Guan 已提交
318
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
319
  SWWorkerPool *pool = worker->pool;
320 321 322 323
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
  int32_t       numOfMsgs = 0;
324 325 326

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
327 328
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
329 330

  while (1) {
331
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
332
    if (numOfMsgs == 0) {
333
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
S
Shengliang Guan 已提交
334
            worker->pid);
335 336 337
      break;
    }

338 339 340 341
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
342
    }
343
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
344 345 346 347 348
  }

  return NULL;
}

S
Shengliang Guan 已提交
349
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
350
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
351
  SWWorker *worker = pool->workers + pool->nextId;
S
Shengliang Guan 已提交
352
  int32_t   code = -1;
353

354
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
355
  if (queue == NULL) goto _OVER;
356

S
Shengliang Guan 已提交
357
  taosSetQueueFp(queue, NULL, fp);
358 359
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
S
Shengliang Guan 已提交
360
    if (worker->qset == NULL) goto _OVER;
361 362 363

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
S
Shengliang Guan 已提交
364 365
    if (worker->qall == NULL) goto _OVER;

wafwerar's avatar
wafwerar 已提交
366 367 368
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
369
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
370

371
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
S
Shengliang Guan 已提交
372
    pool->nextId = (pool->nextId + 1) % pool->max;
373

wafwerar's avatar
wafwerar 已提交
374
    taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
375 376
    pool->num++;
    if (pool->num > pool->max) pool->num = pool->max;
377 378 379 380 381
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

S
Shengliang Guan 已提交
382
  code = 0;
S
TD-2393  
Shengliang Guan 已提交
383

S
Shengliang Guan 已提交
384 385 386 387 388 389 390 391 392
_OVER:
  taosThreadMutexUnlock(&pool->mutex);

  if (code == -1) {
    if (queue != NULL) taosCloseQueue(queue);
    if (worker->qset != NULL) taosCloseQset(worker->qset);
    if (worker->qall != NULL) taosFreeQall(worker->qall);
    return NULL;
  } else {
S
Shengliang Guan 已提交
393 394
    while (worker->pid <= 0) taosMsleep(10);
    queue->threadId = worker->pid;
395 396
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle,
          queue->threadId);
S
Shengliang Guan 已提交
397 398
    return queue;
  }
S
TD-2393  
Shengliang Guan 已提交
399 400
}

S
Shengliang Guan 已提交
401
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
402
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
403
  taosCloseQueue(queue);
S
TD-2393  
Shengliang Guan 已提交
404
}
S
Shengliang Guan 已提交
405

S
Shengliang Guan 已提交
406
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
407 408
  SQWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
409 410
  pPool->min = pCfg->min;
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
411 412
  if (tQWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
413
  pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
414 415
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
416 417 418 419
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
420
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
S
Shengliang Guan 已提交
421 422 423 424 425 426 427 428 429 430
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tQWorkerCleanup(&pWorker->pool);
  tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}

S
Shengliang Guan 已提交
431
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
432 433
  SWWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
434
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
435 436
  if (tWWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
437
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
438 439
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
440 441 442 443
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
444
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
S
Shengliang Guan 已提交
445 446 447 448 449 450 451 452 453
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tWWorkerCleanup(&pWorker->pool);
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}