tworker.c 13.0 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
wafwerar's avatar
wafwerar 已提交
25
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker));
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
31
  (void)taosThreadMutexInit(&pool->mutex, NULL);
S
Shengliang Guan 已提交
32 33 34

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
35 36
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
37 38
  }

39
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
46
    if (taosCheckPthreadValid(worker->thread)) {
47
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
48 49 50
    }
  }

S
Shengliang Guan 已提交
51 52
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
53
    if (taosCheckPthreadValid(worker->thread)) {
54
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
wafwerar's avatar
wafwerar 已提交
55
      taosThreadJoin(worker->thread, NULL);
56
      taosThreadClear(&worker->thread);
57
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
S
TD-2393  
Shengliang Guan 已提交
58 59 60
    }
  }

wafwerar's avatar
wafwerar 已提交
61
  taosMemoryFreeClear(pool->workers);
62
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
63
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
64

65
  uInfo("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
66 67
}

S
Shengliang Guan 已提交
68 69
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
70 71 72
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
73 74 75

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
76 77
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
78 79

  while (1) {
H
Hongze Cheng 已提交
80
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
81
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
S
Shengliang Guan 已提交
82
            worker->pid);
83 84 85
      break;
    }

86 87 88 89
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItem)qinfo.fp))(&qinfo, msg);
S
Shengliang Guan 已提交
90
    }
91 92

    taosUpdateItemSize(qinfo.queue, 1);
93 94 95 96 97
  }

  return NULL;
}

S
Shengliang Guan 已提交
98
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
99
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
100
  if (queue == NULL) return NULL;
S
TD-2393  
Shengliang Guan 已提交
101

S
Shengliang Guan 已提交
102
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
103
  taosSetQueueFp(queue, fp, NULL);
104
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
105 106

  // spawn a thread to process queue
107
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
108
    do {
S
Shengliang Guan 已提交
109
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
110

wafwerar's avatar
wafwerar 已提交
111 112 113
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
114

S
Shengliang Guan 已提交
115
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
116
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
117
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
118 119
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
120 121
      }

wafwerar's avatar
wafwerar 已提交
122
      taosThreadAttrDestroy(&thAttr);
123
      pool->num++;
124
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
125
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
126 127
  }

wafwerar's avatar
wafwerar 已提交
128
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
129
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
130 131 132 133

  return queue;
}

S
Shengliang Guan 已提交
134
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
  taosCloseQueue(queue);
}

int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
  pool->qset = taosOpenQset();
  pool->workers = taosArrayInit(2, sizeof(SQWorker));
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  (void)taosThreadMutexInit(&pool->mutex, NULL);

  uInfo("worker:%s is initialized as auto", pool->name);
  return 0;
}

void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
  int32_t size = taosArrayGetSize(pool->workers);
  for (int32_t i = 0; i < size; ++i) {
    SQWorker *worker = taosArrayGet(pool->workers, i);
    if (taosCheckPthreadValid(worker->thread)) {
      taosQsetThreadResume(pool->qset);
    }
  }

  for (int32_t i = 0; i < size; ++i) {
    SQWorker *worker = taosArrayGet(pool->workers, i);
    if (taosCheckPthreadValid(worker->thread)) {
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
      taosThreadJoin(worker->thread, NULL);
      taosThreadClear(&worker->thread);
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
    }
  }

  taosArrayDestroy(pool->workers);
  taosCloseQset(pool->qset);
  taosThreadMutexDestroy(&pool->mutex);

  uInfo("worker:%s is closed", pool->name);
}

static void *tAutoQWorkerThreadFp(SQWorker *worker) {
  SAutoQWorkerPool *pool = worker->pool;
  SQueueInfo        qinfo = {0};
  void             *msg = NULL;
  int32_t           code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);

  while (1) {
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
            worker->pid);
      break;
    }

    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = taosArrayGetSize(pool->workers);
      (*((FItem)qinfo.fp))(&qinfo, msg);
    }

    taosUpdateItemSize(qinfo.queue, 1);
  }

  return NULL;
}

STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
  STaosQueue *queue = taosOpenQueue();
  if (queue == NULL) return NULL;

  taosThreadMutexLock(&pool->mutex);
  taosSetQueueFp(queue, fp, NULL);
  taosAddIntoQset(pool->qset, queue, ahandle);

  int32_t queueNum = taosGetQueueNumber(pool->qset);
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
  int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
  if (dstWorkerNum < 1) dstWorkerNum = 1;
  // spawn a thread to process queue

  while (curWorkerNum < dstWorkerNum) {
    SQWorker wobj = {
        .id = (int32_t)taosArrayGetSize(pool->workers),
        .pool = pool,
    };
    SQWorker *worker = taosArrayPush(pool->workers, &wobj);
    if (worker == NULL) {
      uError("worker:%s:%d failed to create, total:%d", pool->name, wobj.id, (int32_t)taosArrayGetSize(pool->workers));
      taosCloseQueue(queue);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, wobj.id,
             (int32_t)taosArrayGetSize(pool->workers));
      taosCloseQueue(queue);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }

    taosThreadAttrDestroy(&thAttr);
    uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, (int32_t)taosArrayGetSize(pool->workers));

    curWorkerNum++;
  }

  taosThreadMutexUnlock(&pool->mutex);
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);

  return queue;
}

void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
261
  taosCloseQueue(queue);
262 263
}

S
Shengliang Guan 已提交
264
int32_t tWWorkerInit(SWWorkerPool *pool) {
265
  pool->nextId = 0;
wafwerar's avatar
wafwerar 已提交
266
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
267 268 269 270 271
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
272
  (void)taosThreadMutexInit(&pool->mutex, NULL);
273 274

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
275
    SWWorker *worker = pool->workers + i;
276 277 278 279 280 281
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

282
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
283 284 285
  return 0;
}

S
Shengliang Guan 已提交
286
void tWWorkerCleanup(SWWorkerPool *pool) {
287
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
288
    SWWorker *worker = pool->workers + i;
289
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
290 291 292
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
293 294 295 296
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
297
    SWWorker *worker = pool->workers + i;
298
    if (taosCheckPthreadValid(worker->thread)) {
299
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
wafwerar's avatar
wafwerar 已提交
300
      taosThreadJoin(worker->thread, NULL);
301
      taosThreadClear(&worker->thread);
302 303
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
304
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
305 306 307
    }
  }

wafwerar's avatar
wafwerar 已提交
308
  taosMemoryFreeClear(pool->workers);
wafwerar's avatar
wafwerar 已提交
309
  taosThreadMutexDestroy(&pool->mutex);
310

311
  uInfo("worker:%s is closed", pool->name);
312 313
}

S
Shengliang Guan 已提交
314
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
315
  SWWorkerPool *pool = worker->pool;
316 317 318 319
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
  int32_t       numOfMsgs = 0;
320 321 322

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
323 324
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
325 326

  while (1) {
327
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
328
    if (numOfMsgs == 0) {
329
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
S
Shengliang Guan 已提交
330
            worker->pid);
331 332 333
      break;
    }

334 335 336 337
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
338
    }
339
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
340 341 342 343 344
  }

  return NULL;
}

S
Shengliang Guan 已提交
345
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
346
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
347
  SWWorker *worker = pool->workers + pool->nextId;
S
Shengliang Guan 已提交
348
  int32_t   code = -1;
349

350
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
351
  if (queue == NULL) goto _OVER;
352

S
Shengliang Guan 已提交
353
  taosSetQueueFp(queue, NULL, fp);
354 355
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
S
Shengliang Guan 已提交
356
    if (worker->qset == NULL) goto _OVER;
357 358 359

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
S
Shengliang Guan 已提交
360 361
    if (worker->qall == NULL) goto _OVER;

wafwerar's avatar
wafwerar 已提交
362 363 364
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
365
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
366

367
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
S
Shengliang Guan 已提交
368
    pool->nextId = (pool->nextId + 1) % pool->max;
369

wafwerar's avatar
wafwerar 已提交
370
    taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
371 372
    pool->num++;
    if (pool->num > pool->max) pool->num = pool->max;
373 374 375 376 377
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

S
Shengliang Guan 已提交
378
  code = 0;
S
TD-2393  
Shengliang Guan 已提交
379

S
Shengliang Guan 已提交
380 381 382 383 384 385 386 387 388
_OVER:
  taosThreadMutexUnlock(&pool->mutex);

  if (code == -1) {
    if (queue != NULL) taosCloseQueue(queue);
    if (worker->qset != NULL) taosCloseQset(worker->qset);
    if (worker->qall != NULL) taosFreeQall(worker->qall);
    return NULL;
  } else {
S
Shengliang Guan 已提交
389 390
    while (worker->pid <= 0) taosMsleep(10);
    queue->threadId = worker->pid;
391 392
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle,
          queue->threadId);
S
Shengliang Guan 已提交
393 394
    return queue;
  }
S
TD-2393  
Shengliang Guan 已提交
395 396
}

S
Shengliang Guan 已提交
397
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
398
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
399
  taosCloseQueue(queue);
S
TD-2393  
Shengliang Guan 已提交
400
}
S
Shengliang Guan 已提交
401

S
Shengliang Guan 已提交
402
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
403 404
  SQWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
405 406
  pPool->min = pCfg->min;
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
407 408
  if (tQWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
409
  pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
410 411
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
412 413 414 415
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
416
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
S
Shengliang Guan 已提交
417 418 419 420 421 422 423 424 425 426
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tQWorkerCleanup(&pWorker->pool);
  tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}

S
Shengliang Guan 已提交
427
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
428 429
  SWWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
430
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
431 432
  if (tWWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
433
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
434 435
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
436 437 438 439
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
440
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
S
Shengliang Guan 已提交
441 442 443 444 445 446 447 448 449
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tWWorkerCleanup(&pWorker->pool);
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}