tworker.c 13.1 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
wafwerar's avatar
wafwerar 已提交
25
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker));
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
31
  (void)taosThreadMutexInit(&pool->mutex, NULL);
S
Shengliang Guan 已提交
32 33 34

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
35 36
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
37 38
  }

39
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
46
    if (taosCheckPthreadValid(worker->thread)) {
47
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
48 49 50
    }
  }

S
Shengliang Guan 已提交
51 52
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
53
    if (taosCheckPthreadValid(worker->thread)) {
54
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
wafwerar's avatar
wafwerar 已提交
55
      taosThreadJoin(worker->thread, NULL);
56
      taosThreadClear(&worker->thread);
57
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
S
TD-2393  
Shengliang Guan 已提交
58 59 60
    }
  }

wafwerar's avatar
wafwerar 已提交
61
  taosMemoryFreeClear(pool->workers);
62
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
63
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
64

65
  uInfo("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
66 67
}

S
Shengliang Guan 已提交
68 69
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
70 71 72
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
73 74 75

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
76 77
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
78 79

  while (1) {
H
Hongze Cheng 已提交
80
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
81
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
S
Shengliang Guan 已提交
82
            worker->pid);
83 84 85
      break;
    }

86 87 88 89
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItem)qinfo.fp))(&qinfo, msg);
S
Shengliang Guan 已提交
90
    }
91 92

    taosUpdateItemSize(qinfo.queue, 1);
93 94 95 96 97
  }

  return NULL;
}

S
Shengliang Guan 已提交
98
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
99
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
100
  if (queue == NULL) return NULL;
S
TD-2393  
Shengliang Guan 已提交
101

S
Shengliang Guan 已提交
102
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
103
  taosSetQueueFp(queue, fp, NULL);
104
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
105 106

  // spawn a thread to process queue
107
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
108
    do {
S
Shengliang Guan 已提交
109
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
110

wafwerar's avatar
wafwerar 已提交
111 112 113
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
114

S
Shengliang Guan 已提交
115
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
116
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
117
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
118 119
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
120 121
      }

wafwerar's avatar
wafwerar 已提交
122
      taosThreadAttrDestroy(&thAttr);
123
      pool->num++;
124
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
125
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
126 127
  }

wafwerar's avatar
wafwerar 已提交
128
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
129
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
130 131 132 133

  return queue;
}

S
Shengliang Guan 已提交
134
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
135 136 137 138 139 140
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
  taosCloseQueue(queue);
}

int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
  pool->qset = taosOpenQset();
141
  pool->workers = taosArrayInit(2, sizeof(SQWorker *));
142 143 144 145 146 147 148 149 150 151 152 153 154 155
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  (void)taosThreadMutexInit(&pool->mutex, NULL);

  uInfo("worker:%s is initialized as auto", pool->name);
  return 0;
}

void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
  int32_t size = taosArrayGetSize(pool->workers);
  for (int32_t i = 0; i < size; ++i) {
156
    SQWorker *worker = taosArrayGetP(pool->workers, i);
157 158 159 160 161 162
    if (taosCheckPthreadValid(worker->thread)) {
      taosQsetThreadResume(pool->qset);
    }
  }

  for (int32_t i = 0; i < size; ++i) {
163
    SQWorker *worker = taosArrayGetP(pool->workers, i);
164 165 166 167 168 169
    if (taosCheckPthreadValid(worker->thread)) {
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
      taosThreadJoin(worker->thread, NULL);
      taosThreadClear(&worker->thread);
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
    }
170
    taosMemoryFree(worker);
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
  }

  taosArrayDestroy(pool->workers);
  taosCloseQset(pool->qset);
  taosThreadMutexDestroy(&pool->mutex);

  uInfo("worker:%s is closed", pool->name);
}

static void *tAutoQWorkerThreadFp(SQWorker *worker) {
  SAutoQWorkerPool *pool = worker->pool;
  SQueueInfo        qinfo = {0};
  void             *msg = NULL;
  int32_t           code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);

  while (1) {
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
            worker->pid);
      break;
    }

    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = taosArrayGetSize(pool->workers);
      (*((FItem)qinfo.fp))(&qinfo, msg);
    }

    taosUpdateItemSize(qinfo.queue, 1);
  }

  return NULL;
}

STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
  STaosQueue *queue = taosOpenQueue();
  if (queue == NULL) return NULL;

  taosThreadMutexLock(&pool->mutex);
  taosSetQueueFp(queue, fp, NULL);
  taosAddIntoQset(pool->qset, queue, ahandle);

  int32_t queueNum = taosGetQueueNumber(pool->qset);
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
  int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
  if (dstWorkerNum < 1) dstWorkerNum = 1;

223
  // spawn a thread to process queue
224
  while (curWorkerNum < dstWorkerNum) {
225 226 227 228
    SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker));
    if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
      uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
      taosMemoryFree(worker);
229 230 231 232
      taosCloseQueue(queue);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
233 234 235
    worker->id = curWorkerNum;
    worker->pool = pool;

236 237 238 239 240
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
241 242 243
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
      (void)taosArrayPop(pool->workers);
      taosMemoryFree(worker);
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
      taosCloseQueue(queue);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }

    taosThreadAttrDestroy(&thAttr);
    uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, (int32_t)taosArrayGetSize(pool->workers));

    curWorkerNum++;
  }

  taosThreadMutexUnlock(&pool->mutex);
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);

  return queue;
}

void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
263
  taosCloseQueue(queue);
264 265
}

S
Shengliang Guan 已提交
266
int32_t tWWorkerInit(SWWorkerPool *pool) {
267
  pool->nextId = 0;
wafwerar's avatar
wafwerar 已提交
268
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
269 270 271 272 273
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
274
  (void)taosThreadMutexInit(&pool->mutex, NULL);
275 276

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
277
    SWWorker *worker = pool->workers + i;
278 279 280 281 282 283
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

284
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
285 286 287
  return 0;
}

S
Shengliang Guan 已提交
288
void tWWorkerCleanup(SWWorkerPool *pool) {
289
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
290
    SWWorker *worker = pool->workers + i;
291
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
292 293 294
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
295 296 297 298
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
299
    SWWorker *worker = pool->workers + i;
300
    if (taosCheckPthreadValid(worker->thread)) {
301
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
wafwerar's avatar
wafwerar 已提交
302
      taosThreadJoin(worker->thread, NULL);
303
      taosThreadClear(&worker->thread);
304 305
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
306
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
307 308 309
    }
  }

wafwerar's avatar
wafwerar 已提交
310
  taosMemoryFreeClear(pool->workers);
wafwerar's avatar
wafwerar 已提交
311
  taosThreadMutexDestroy(&pool->mutex);
312

313
  uInfo("worker:%s is closed", pool->name);
314 315
}

S
Shengliang Guan 已提交
316
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
317
  SWWorkerPool *pool = worker->pool;
318 319 320 321
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
  int32_t       numOfMsgs = 0;
322 323 324

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
325 326
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
327 328

  while (1) {
329
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
330
    if (numOfMsgs == 0) {
331
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
S
Shengliang Guan 已提交
332
            worker->pid);
333 334 335
      break;
    }

336 337 338 339
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
340
    }
341
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
342 343 344 345 346
  }

  return NULL;
}

S
Shengliang Guan 已提交
347
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
348
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
349
  SWWorker *worker = pool->workers + pool->nextId;
S
Shengliang Guan 已提交
350
  int32_t   code = -1;
351

352
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
353
  if (queue == NULL) goto _OVER;
354

S
Shengliang Guan 已提交
355
  taosSetQueueFp(queue, NULL, fp);
356 357
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
S
Shengliang Guan 已提交
358
    if (worker->qset == NULL) goto _OVER;
359 360 361

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
S
Shengliang Guan 已提交
362 363
    if (worker->qall == NULL) goto _OVER;

wafwerar's avatar
wafwerar 已提交
364 365 366
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
367
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
368

369
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
S
Shengliang Guan 已提交
370
    pool->nextId = (pool->nextId + 1) % pool->max;
371

wafwerar's avatar
wafwerar 已提交
372
    taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
373 374
    pool->num++;
    if (pool->num > pool->max) pool->num = pool->max;
375 376 377 378 379
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

S
Shengliang Guan 已提交
380
  code = 0;
S
TD-2393  
Shengliang Guan 已提交
381

S
Shengliang Guan 已提交
382 383 384 385 386 387 388 389 390
_OVER:
  taosThreadMutexUnlock(&pool->mutex);

  if (code == -1) {
    if (queue != NULL) taosCloseQueue(queue);
    if (worker->qset != NULL) taosCloseQset(worker->qset);
    if (worker->qall != NULL) taosFreeQall(worker->qall);
    return NULL;
  } else {
S
Shengliang Guan 已提交
391 392
    while (worker->pid <= 0) taosMsleep(10);
    queue->threadId = worker->pid;
393 394
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle,
          queue->threadId);
S
Shengliang Guan 已提交
395 396
    return queue;
  }
S
TD-2393  
Shengliang Guan 已提交
397 398
}

S
Shengliang Guan 已提交
399
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
400
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
401
  taosCloseQueue(queue);
S
TD-2393  
Shengliang Guan 已提交
402
}
S
Shengliang Guan 已提交
403

S
Shengliang Guan 已提交
404
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
405 406
  SQWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
407 408
  pPool->min = pCfg->min;
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
409 410
  if (tQWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
411
  pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
412 413
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
414 415 416 417
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
418
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
S
Shengliang Guan 已提交
419 420 421 422 423 424 425 426 427 428
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tQWorkerCleanup(&pWorker->pool);
  tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}

S
Shengliang Guan 已提交
429
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
430 431
  SWWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
432
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
433 434
  if (tWWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
435
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
436 437
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
438 439 440 441
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
442
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
S
Shengliang Guan 已提交
443 444 445 446 447 448 449 450 451
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tWWorkerCleanup(&pWorker->pool);
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}