tworker.c 9.4 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
wafwerar's avatar
wafwerar 已提交
25
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker));
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
31
  if (taosThreadMutexInit(&pool->mutex, NULL)) {
S
Shengliang Guan 已提交
32
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
33 34 35 36 37
    return -1;
  }

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
38 39
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
40 41
  }

S
Shengliang Guan 已提交
42
  uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
43 44 45
  return 0;
}

S
Shengliang Guan 已提交
46 47 48
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
49
    if (worker == NULL) continue;
50
    if (taosCheckPthreadValid(worker->thread)) {
51
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
52 53 54
    }
  }

S
Shengliang Guan 已提交
55 56
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
57
    if (worker == NULL) continue;
58
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
59
      taosThreadJoin(worker->thread, NULL);
S
TD-2393  
Shengliang Guan 已提交
60 61 62
    }
  }

wafwerar's avatar
wafwerar 已提交
63
  taosMemoryFreeClear(pool->workers);
64
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
65
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67
  uDebug("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
68 69
}

S
Shengliang Guan 已提交
70 71
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
72
  FItem         fp = NULL;
73

S
Shengliang Guan 已提交
74 75
  void   *msg = NULL;
  void   *ahandle = NULL;
76 77 78 79
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
80
  uDebug("worker:%s:%d is running", pool->name, worker->id);
81 82

  while (1) {
S
Shengliang Guan 已提交
83
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
S
Shengliang Guan 已提交
84
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
85 86 87
      break;
    }

S
Shengliang Guan 已提交
88
    if (fp != NULL) {
S
Shengliang Guan 已提交
89 90
      SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
      (*fp)(&info, msg);
S
Shengliang Guan 已提交
91
    }
92 93 94 95 96
  }

  return NULL;
}

S
Shengliang Guan 已提交
97
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
wafwerar's avatar
wafwerar 已提交
98
  taosThreadMutexLock(&pool->mutex);
99
  STaosQueue *queue = taosOpenQueue();
100
  if (queue == NULL) {
wafwerar's avatar
wafwerar 已提交
101
    taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
102
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
TD-2393  
Shengliang Guan 已提交
103 104 105
    return NULL;
  }

S
Shengliang Guan 已提交
106
  taosSetQueueFp(queue, fp, NULL);
107
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
108 109

  // spawn a thread to process queue
110
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
111
    do {
S
Shengliang Guan 已提交
112
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
113

wafwerar's avatar
wafwerar 已提交
114 115 116
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
117

S
Shengliang Guan 已提交
118
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
119
        uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
S
Shengliang Guan 已提交
120
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
121
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
122 123
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
124 125
      }

wafwerar's avatar
wafwerar 已提交
126
      taosThreadAttrDestroy(&thAttr);
127
      pool->num++;
S
Shengliang Guan 已提交
128
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
129
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
130 131
  }

wafwerar's avatar
wafwerar 已提交
132
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
133
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
134 135 136 137

  return queue;
}

S
Shengliang Guan 已提交
138
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
139
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
140
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
141 142
}

S
Shengliang Guan 已提交
143
int32_t tWWorkerInit(SWWorkerPool *pool) {
144
  pool->nextId = 0;
wafwerar's avatar
wafwerar 已提交
145
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
146 147 148 149 150
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
151
  if (taosThreadMutexInit(&pool->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
152 153 154
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
155 156

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
157
    SWWorker *worker = pool->workers + i;
158 159 160 161 162 163
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

S
Shengliang Guan 已提交
164
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
165 166 167
  return 0;
}

S
Shengliang Guan 已提交
168
void tWWorkerCleanup(SWWorkerPool *pool) {
169
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
170
    SWWorker *worker = pool->workers + i;
171
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
172 173 174
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
175 176 177 178
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
179
    SWWorker *worker = pool->workers + i;
180
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
181
      taosThreadJoin(worker->thread, NULL);
182 183 184 185 186
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

wafwerar's avatar
wafwerar 已提交
187
  taosMemoryFreeClear(pool->workers);
wafwerar's avatar
wafwerar 已提交
188
  taosThreadMutexDestroy(&pool->mutex);
189

S
Shengliang Guan 已提交
190
  uInfo("worker:%s is closed", pool->name);
191 192
}

S
Shengliang Guan 已提交
193
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
194
  SWWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
195
  FItems        fp = NULL;
196

S
Shengliang Guan 已提交
197 198
  void   *msg = NULL;
  void   *ahandle = NULL;
199 200 201 202 203
  int32_t numOfMsgs = 0;
  int32_t qtype = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
204
  uDebug("worker:%s:%d is running", pool->name, worker->id);
205 206

  while (1) {
S
Shengliang Guan 已提交
207
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
208
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
209
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
210 211 212
      break;
    }

S
Shengliang Guan 已提交
213
    if (fp != NULL) {
S
Shengliang Guan 已提交
214
      SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
S
Shengliang Guan 已提交
215
      (*fp)(&info, worker->qall, numOfMsgs);
216 217 218 219 220 221
    }
  }

  return NULL;
}

S
Shengliang Guan 已提交
222
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
223
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
224
  SWWorker *worker = pool->workers + pool->nextId;
225

226
  STaosQueue *queue = taosOpenQueue();
227
  if (queue == NULL) {
wafwerar's avatar
wafwerar 已提交
228
    taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
229
    terrno = TSDB_CODE_OUT_OF_MEMORY;
230 231 232
    return NULL;
  }

S
Shengliang Guan 已提交
233 234
  taosSetQueueFp(queue, NULL, fp);

235 236 237 238
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
    if (worker->qset == NULL) {
      taosCloseQueue(queue);
wafwerar's avatar
wafwerar 已提交
239
      taosThreadMutexUnlock(&pool->mutex);
240 241 242 243 244 245 246 247
      return NULL;
    }

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
    if (worker->qall == NULL) {
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
wafwerar's avatar
wafwerar 已提交
248
      taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
249
      terrno = TSDB_CODE_OUT_OF_MEMORY;
250 251
      return NULL;
    }
wafwerar's avatar
wafwerar 已提交
252 253 254
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
255

wafwerar's avatar
wafwerar 已提交
256
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
257
      uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
258 259 260
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
S
Shengliang Guan 已提交
261
      terrno = TSDB_CODE_OUT_OF_MEMORY;
262 263
      queue = NULL;
    } else {
S
Shengliang Guan 已提交
264
      uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
265 266 267
      pool->nextId = (pool->nextId + 1) % pool->max;
    }

wafwerar's avatar
wafwerar 已提交
268
    taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
269 270
    pool->num++;
    if (pool->num > pool->max) pool->num = pool->max;
271 272 273 274 275
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

wafwerar's avatar
wafwerar 已提交
276
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
277
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
278

279
  return queue;
S
TD-2393  
Shengliang Guan 已提交
280 281
}

S
Shengliang Guan 已提交
282
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
283
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
284
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
TD-2393  
Shengliang Guan 已提交
285
}
S
Shengliang Guan 已提交
286

S
Shengliang Guan 已提交
287
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
288 289
  SQWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
290 291
  pPool->min = pCfg->min;
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
292 293 294 295 296 297 298 299 300 301 302 303 304
  if (tQWorkerInit(pPool) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
  if (pWorker->queue == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
305
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
S
Shengliang Guan 已提交
306 307 308 309 310 311 312 313 314 315
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tQWorkerCleanup(&pWorker->pool);
  tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}

S
Shengliang Guan 已提交
316
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
317 318
  SWWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
319
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332
  if (tWWorkerInit(pPool) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
  if (pWorker->queue == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
333
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
S
Shengliang Guan 已提交
334 335 336 337 338 339 340 341 342
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tWWorkerCleanup(&pWorker->pool);
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}