tworker.c 8.9 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
wafwerar's avatar
wafwerar 已提交
25
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker));
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
31
  (void)taosThreadMutexInit(&pool->mutex, NULL);
S
Shengliang Guan 已提交
32 33 34

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
35 36
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
37 38
  }

39
  uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
46
    // if (worker == NULL) continue;
47
    if (taosCheckPthreadValid(worker->thread)) {
48
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
49 50 51
    }
  }

S
Shengliang Guan 已提交
52 53
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
54
    // if (worker == NULL) continue;
55
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
56
      taosThreadJoin(worker->thread, NULL);
57
      taosThreadClear(&worker->thread);
S
TD-2393  
Shengliang Guan 已提交
58 59 60
    }
  }

wafwerar's avatar
wafwerar 已提交
61
  taosMemoryFreeClear(pool->workers);
62
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
63
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
64

S
Shengliang Guan 已提交
65
  uDebug("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
66 67
}

S
Shengliang Guan 已提交
68 69
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
70 71 72
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
73 74 75

  taosBlockSIGPIPE();
  setThreadName(pool->name);
76
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, taosGetSelfPthreadId());
77 78

  while (1) {
H
Hongze Cheng 已提交
79
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
80 81
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
            taosGetSelfPthreadId());
82 83 84
      break;
    }

85 86 87 88
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItem)qinfo.fp))(&qinfo, msg);
S
Shengliang Guan 已提交
89
    }
90 91

    taosUpdateItemSize(qinfo.queue, 1);
92 93 94 95 96
  }

  return NULL;
}

S
Shengliang Guan 已提交
97
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
98
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
99
  if (queue == NULL) return NULL;
S
TD-2393  
Shengliang Guan 已提交
100

S
Shengliang Guan 已提交
101
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
102
  taosSetQueueFp(queue, fp, NULL);
103
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
104 105

  // spawn a thread to process queue
106
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
107
    do {
S
Shengliang Guan 已提交
108
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
109

wafwerar's avatar
wafwerar 已提交
110 111 112
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
113

S
Shengliang Guan 已提交
114
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
115
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
116
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
117 118
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
119 120
      }

wafwerar's avatar
wafwerar 已提交
121
      taosThreadAttrDestroy(&thAttr);
122
      pool->num++;
S
Shengliang Guan 已提交
123
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
124
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
125 126
  }

wafwerar's avatar
wafwerar 已提交
127
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
128
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
129 130 131 132

  return queue;
}

S
Shengliang Guan 已提交
133
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
S
Shengliang Guan 已提交
134
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
135
  taosCloseQueue(queue);
136 137
}

S
Shengliang Guan 已提交
138
int32_t tWWorkerInit(SWWorkerPool *pool) {
139
  pool->nextId = 0;
wafwerar's avatar
wafwerar 已提交
140
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
141 142 143 144 145
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
146
  (void)taosThreadMutexInit(&pool->mutex, NULL);
147 148

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
149
    SWWorker *worker = pool->workers + i;
150 151 152 153 154 155
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

S
Shengliang Guan 已提交
156
  uDebug("worker:%s is initialized, max:%d", pool->name, pool->max);
157 158 159
  return 0;
}

S
Shengliang Guan 已提交
160
void tWWorkerCleanup(SWWorkerPool *pool) {
161
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
162
    SWWorker *worker = pool->workers + i;
163
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
164 165 166
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
167 168 169 170
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
171
    SWWorker *worker = pool->workers + i;
172
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
173
      taosThreadJoin(worker->thread, NULL);
174
      taosThreadClear(&worker->thread);
175 176 177 178 179
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

wafwerar's avatar
wafwerar 已提交
180
  taosMemoryFreeClear(pool->workers);
wafwerar's avatar
wafwerar 已提交
181
  taosThreadMutexDestroy(&pool->mutex);
182

S
Shengliang Guan 已提交
183
  uDebug("worker:%s is closed", pool->name);
184 185
}

S
Shengliang Guan 已提交
186
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
187
  SWWorkerPool *pool = worker->pool;
188 189 190 191
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
  int32_t       numOfMsgs = 0;
192 193 194

  taosBlockSIGPIPE();
  setThreadName(pool->name);
195
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, taosGetSelfPthreadId());
196 197

  while (1) {
198
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
199
    if (numOfMsgs == 0) {
200 201
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
            taosGetSelfPthreadId());
202 203 204
      break;
    }

205 206 207 208
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
209
    }
210
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
211 212 213 214 215
  }

  return NULL;
}

S
Shengliang Guan 已提交
216
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
217
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
218
  SWWorker *worker = pool->workers + pool->nextId;
S
Shengliang Guan 已提交
219
  int32_t   code = -1;
220

221
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
222
  if (queue == NULL) goto _OVER;
223

S
Shengliang Guan 已提交
224
  taosSetQueueFp(queue, NULL, fp);
225 226
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
S
Shengliang Guan 已提交
227
    if (worker->qset == NULL) goto _OVER;
228 229 230

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
S
Shengliang Guan 已提交
231 232
    if (worker->qall == NULL) goto _OVER;

wafwerar's avatar
wafwerar 已提交
233 234 235
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
236
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
237

S
Shengliang Guan 已提交
238 239
    uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
    pool->nextId = (pool->nextId + 1) % pool->max;
240

wafwerar's avatar
wafwerar 已提交
241
    taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
242 243
    pool->num++;
    if (pool->num > pool->max) pool->num = pool->max;
244 245 246 247 248
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

249 250
  queue->threadId = taosGetPthreadId(worker->thread);
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId);
S
Shengliang Guan 已提交
251
  code = 0;
S
TD-2393  
Shengliang Guan 已提交
252

S
Shengliang Guan 已提交
253 254 255 256 257 258 259 260 261 262 263
_OVER:
  taosThreadMutexUnlock(&pool->mutex);

  if (code == -1) {
    if (queue != NULL) taosCloseQueue(queue);
    if (worker->qset != NULL) taosCloseQset(worker->qset);
    if (worker->qall != NULL) taosFreeQall(worker->qall);
    return NULL;
  } else {
    return queue;
  }
S
TD-2393  
Shengliang Guan 已提交
264 265
}

S
Shengliang Guan 已提交
266
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
S
Shengliang Guan 已提交
267
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
268
  taosCloseQueue(queue);
S
TD-2393  
Shengliang Guan 已提交
269
}
S
Shengliang Guan 已提交
270

S
Shengliang Guan 已提交
271
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
272 273
  SQWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
274 275
  pPool->min = pCfg->min;
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
276 277
  if (tQWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
278
  pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
279 280
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
281 282 283 284
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
285
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
S
Shengliang Guan 已提交
286 287 288 289 290 291 292 293 294 295
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tQWorkerCleanup(&pWorker->pool);
  tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}

S
Shengliang Guan 已提交
296
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
297 298
  SWWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
299
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
300 301
  if (tWWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
302
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
303 304
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
305 306 307 308
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
309
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
S
Shengliang Guan 已提交
310 311 312 313 314 315 316 317 318
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tWWorkerCleanup(&pWorker->pool);
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}