tworker.c 13.1 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
wafwerar's avatar
wafwerar 已提交
25
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker));
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
31
  (void)taosThreadMutexInit(&pool->mutex, NULL);
S
Shengliang Guan 已提交
32 33 34

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
35 36
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
37 38
  }

39
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
46
    if (taosCheckPthreadValid(worker->thread)) {
47
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
48 49 50
    }
  }

S
Shengliang Guan 已提交
51 52
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
53
    if (taosCheckPthreadValid(worker->thread)) {
54
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
wafwerar's avatar
wafwerar 已提交
55
      taosThreadJoin(worker->thread, NULL);
56
      taosThreadClear(&worker->thread);
57
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
S
TD-2393  
Shengliang Guan 已提交
58 59 60
    }
  }

wafwerar's avatar
wafwerar 已提交
61
  taosMemoryFreeClear(pool->workers);
62
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
63
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
64

65
  uInfo("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
66 67
}

S
Shengliang Guan 已提交
68 69
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
70 71 72
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
73 74 75

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
76 77
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
78 79

  while (1) {
H
Hongze Cheng 已提交
80
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
81
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
S
Shengliang Guan 已提交
82
            worker->pid);
83 84 85
      break;
    }

86 87 88 89
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItem)qinfo.fp))(&qinfo, msg);
S
Shengliang Guan 已提交
90
    }
91 92

    taosUpdateItemSize(qinfo.queue, 1);
93 94 95 96 97
  }

  return NULL;
}

S
Shengliang Guan 已提交
98
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
99
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
100
  if (queue == NULL) return NULL;
S
TD-2393  
Shengliang Guan 已提交
101

S
Shengliang Guan 已提交
102
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
103
  taosSetQueueFp(queue, fp, NULL);
104
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
105 106

  // spawn a thread to process queue
107
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
108
    do {
S
Shengliang Guan 已提交
109
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
110

wafwerar's avatar
wafwerar 已提交
111 112 113
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
114

S
Shengliang Guan 已提交
115
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
116
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
117
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
118 119
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
120 121
      }

wafwerar's avatar
wafwerar 已提交
122
      taosThreadAttrDestroy(&thAttr);
123
      pool->num++;
124
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
125
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
126 127
  }

wafwerar's avatar
wafwerar 已提交
128
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
129
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
130 131 132 133

  return queue;
}

S
Shengliang Guan 已提交
134
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
135 136 137 138 139 140
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
  taosCloseQueue(queue);
}

int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
  pool->qset = taosOpenQset();
141
  pool->workers = taosArrayInit(2, sizeof(SQWorker *));
142 143 144 145 146 147 148 149 150 151 152 153 154 155
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  (void)taosThreadMutexInit(&pool->mutex, NULL);

  uInfo("worker:%s is initialized as auto", pool->name);
  return 0;
}

void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
  int32_t size = taosArrayGetSize(pool->workers);
  for (int32_t i = 0; i < size; ++i) {
156
    SQWorker *worker = taosArrayGetP(pool->workers, i);
157 158 159 160 161 162
    if (taosCheckPthreadValid(worker->thread)) {
      taosQsetThreadResume(pool->qset);
    }
  }

  for (int32_t i = 0; i < size; ++i) {
163
    SQWorker *worker = taosArrayGetP(pool->workers, i);
164 165 166 167 168 169
    if (taosCheckPthreadValid(worker->thread)) {
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
      taosThreadJoin(worker->thread, NULL);
      taosThreadClear(&worker->thread);
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
    }
170
    taosMemoryFree(worker);
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
  }

  taosArrayDestroy(pool->workers);
  taosCloseQset(pool->qset);
  taosThreadMutexDestroy(&pool->mutex);

  uInfo("worker:%s is closed", pool->name);
}

static void *tAutoQWorkerThreadFp(SQWorker *worker) {
  SAutoQWorkerPool *pool = worker->pool;
  SQueueInfo        qinfo = {0};
  void             *msg = NULL;
  int32_t           code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);

  while (1) {
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
            worker->pid);
      break;
    }

    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = taosArrayGetSize(pool->workers);
      (*((FItem)qinfo.fp))(&qinfo, msg);
    }

    taosUpdateItemSize(qinfo.queue, 1);
  }

  return NULL;
}

STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
  STaosQueue *queue = taosOpenQueue();
  if (queue == NULL) return NULL;

  taosThreadMutexLock(&pool->mutex);
  taosSetQueueFp(queue, fp, NULL);
  taosAddIntoQset(pool->qset, queue, ahandle);

  int32_t queueNum = taosGetQueueNumber(pool->qset);
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
  int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
  if (dstWorkerNum < 1) dstWorkerNum = 1;

223
  // spawn a thread to process queue
224
  while (curWorkerNum < dstWorkerNum) {
225 226 227 228
    SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker));
    if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
      uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
      taosMemoryFree(worker);
229 230
      taosCloseQueue(queue);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
231
      taosThreadMutexUnlock(&pool->mutex);
232 233
      return NULL;
    }
234 235 236
    worker->id = curWorkerNum;
    worker->pool = pool;

237 238 239 240 241
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
242 243 244
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
      (void)taosArrayPop(pool->workers);
      taosMemoryFree(worker);
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
      taosCloseQueue(queue);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }

    taosThreadAttrDestroy(&thAttr);
    uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, (int32_t)taosArrayGetSize(pool->workers));

    curWorkerNum++;
  }

  taosThreadMutexUnlock(&pool->mutex);
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);

  return queue;
}

void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
264
  taosCloseQueue(queue);
265 266
}

S
Shengliang Guan 已提交
267
int32_t tWWorkerInit(SWWorkerPool *pool) {
268
  pool->nextId = 0;
wafwerar's avatar
wafwerar 已提交
269
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
270 271 272 273 274
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
275
  (void)taosThreadMutexInit(&pool->mutex, NULL);
276 277

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
278
    SWWorker *worker = pool->workers + i;
279 280 281 282 283 284
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

285
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
286 287 288
  return 0;
}

S
Shengliang Guan 已提交
289
void tWWorkerCleanup(SWWorkerPool *pool) {
290
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
291
    SWWorker *worker = pool->workers + i;
292
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
293 294 295
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
296 297 298 299
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
300
    SWWorker *worker = pool->workers + i;
301
    if (taosCheckPthreadValid(worker->thread)) {
302
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
wafwerar's avatar
wafwerar 已提交
303
      taosThreadJoin(worker->thread, NULL);
304
      taosThreadClear(&worker->thread);
305 306
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
307
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
308 309 310
    }
  }

wafwerar's avatar
wafwerar 已提交
311
  taosMemoryFreeClear(pool->workers);
wafwerar's avatar
wafwerar 已提交
312
  taosThreadMutexDestroy(&pool->mutex);
313

314
  uInfo("worker:%s is closed", pool->name);
315 316
}

S
Shengliang Guan 已提交
317
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
318
  SWWorkerPool *pool = worker->pool;
319 320 321 322
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
  int32_t       numOfMsgs = 0;
323 324 325

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
326 327
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
328 329

  while (1) {
330
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
331
    if (numOfMsgs == 0) {
332
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
S
Shengliang Guan 已提交
333
            worker->pid);
334 335 336
      break;
    }

337 338 339 340
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
341
    }
342
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
343 344 345 346 347
  }

  return NULL;
}

S
Shengliang Guan 已提交
348
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
349
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
350
  SWWorker *worker = pool->workers + pool->nextId;
S
Shengliang Guan 已提交
351
  int32_t   code = -1;
352

353
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
354
  if (queue == NULL) goto _OVER;
355

S
Shengliang Guan 已提交
356
  taosSetQueueFp(queue, NULL, fp);
357 358
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
S
Shengliang Guan 已提交
359
    if (worker->qset == NULL) goto _OVER;
360 361 362

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
S
Shengliang Guan 已提交
363 364
    if (worker->qall == NULL) goto _OVER;

wafwerar's avatar
wafwerar 已提交
365 366 367
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
368
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
369

370
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
S
Shengliang Guan 已提交
371
    pool->nextId = (pool->nextId + 1) % pool->max;
372

wafwerar's avatar
wafwerar 已提交
373
    taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
374 375
    pool->num++;
    if (pool->num > pool->max) pool->num = pool->max;
376 377 378 379 380
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

S
Shengliang Guan 已提交
381
  code = 0;
S
TD-2393  
Shengliang Guan 已提交
382

S
Shengliang Guan 已提交
383 384 385 386 387 388 389 390 391
_OVER:
  taosThreadMutexUnlock(&pool->mutex);

  if (code == -1) {
    if (queue != NULL) taosCloseQueue(queue);
    if (worker->qset != NULL) taosCloseQset(worker->qset);
    if (worker->qall != NULL) taosFreeQall(worker->qall);
    return NULL;
  } else {
S
Shengliang Guan 已提交
392 393
    while (worker->pid <= 0) taosMsleep(10);
    queue->threadId = worker->pid;
394 395
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle,
          queue->threadId);
S
Shengliang Guan 已提交
396 397
    return queue;
  }
S
TD-2393  
Shengliang Guan 已提交
398 399
}

S
Shengliang Guan 已提交
400
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
401
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
402
  taosCloseQueue(queue);
S
TD-2393  
Shengliang Guan 已提交
403
}
S
Shengliang Guan 已提交
404

S
Shengliang Guan 已提交
405
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
406 407
  SQWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
408 409
  pPool->min = pCfg->min;
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
410 411
  if (tQWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
412
  pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
413 414
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
415 416 417 418
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
419
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
S
Shengliang Guan 已提交
420 421 422 423 424 425 426 427 428 429
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tQWorkerCleanup(&pWorker->pool);
  tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}

S
Shengliang Guan 已提交
430
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
431 432
  SWWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
433
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
434 435
  if (tWWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
436
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
437 438
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
439 440 441 442
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
443
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
S
Shengliang Guan 已提交
444 445 446 447 448 449 450 451 452
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tWWorkerCleanup(&pWorker->pool);
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}