tworker.c 9.3 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
S
Shengliang Guan 已提交
25
  pool->workers = calloc(pool->max, sizeof(SQWorker));
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
31
  if (taosThreadMutexInit(&pool->mutex, NULL)) {
S
Shengliang Guan 已提交
32
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
33 34 35 36 37
    return -1;
  }

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
38 39
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
40 41
  }

S
Shengliang Guan 已提交
42
  uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
43 44 45
  return 0;
}

S
Shengliang Guan 已提交
46 47 48
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
49
    if (worker == NULL) continue;
50
    if (taosCheckPthreadValid(worker->thread)) {
51
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
52 53 54
    }
  }

S
Shengliang Guan 已提交
55 56
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
57
    if (worker == NULL) continue;
58
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
59
      taosThreadJoin(worker->thread, NULL);
S
TD-2393  
Shengliang Guan 已提交
60 61 62
    }
  }

63
  tfree(pool->workers);
64
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
65
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67
  uDebug("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
68 69
}

S
Shengliang Guan 已提交
70 71
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
72
  FItem         fp = NULL;
73

S
Shengliang Guan 已提交
74 75
  void   *msg = NULL;
  void   *ahandle = NULL;
76 77 78 79
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
80
  uDebug("worker:%s:%d is running", pool->name, worker->id);
81 82

  while (1) {
S
Shengliang Guan 已提交
83
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
S
Shengliang Guan 已提交
84
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
85 86 87
      break;
    }

S
Shengliang Guan 已提交
88
    if (fp != NULL) {
S
Shengliang Guan 已提交
89 90
      SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
      (*fp)(&info, msg);
S
Shengliang Guan 已提交
91
    }
92 93 94 95 96
  }

  return NULL;
}

S
Shengliang Guan 已提交
97
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
wafwerar's avatar
wafwerar 已提交
98
  taosThreadMutexLock(&pool->mutex);
99
  STaosQueue *queue = taosOpenQueue();
100
  if (queue == NULL) {
wafwerar's avatar
wafwerar 已提交
101
    taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
102
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
TD-2393  
Shengliang Guan 已提交
103 104 105
    return NULL;
  }

S
Shengliang Guan 已提交
106
  taosSetQueueFp(queue, fp, NULL);
107
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
108 109

  // spawn a thread to process queue
110
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
111
    do {
S
Shengliang Guan 已提交
112
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
113

wafwerar's avatar
wafwerar 已提交
114 115 116
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
117

S
Shengliang Guan 已提交
118
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
119
        uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
S
Shengliang Guan 已提交
120
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
121
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
122 123
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
124 125
      }

wafwerar's avatar
wafwerar 已提交
126
      taosThreadAttrDestroy(&thAttr);
127
      pool->num++;
S
Shengliang Guan 已提交
128
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
129
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
130 131
  }

wafwerar's avatar
wafwerar 已提交
132
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
133
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
134 135 136 137

  return queue;
}

S
Shengliang Guan 已提交
138
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
139
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
140
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
141 142
}

S
Shengliang Guan 已提交
143
int32_t tWWorkerInit(SWWorkerPool *pool) {
144
  pool->nextId = 0;
L
Liu Jicong 已提交
145
  pool->workers = calloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
146 147 148 149 150
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
151
  if (taosThreadMutexInit(&pool->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
152 153 154
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
155 156

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
157
    SWWorker *worker = pool->workers + i;
158 159 160 161 162 163
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

S
Shengliang Guan 已提交
164
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
165 166 167
  return 0;
}

S
Shengliang Guan 已提交
168
void tWWorkerCleanup(SWWorkerPool *pool) {
169
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
170
    SWWorker *worker = pool->workers + i;
171
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
172 173 174
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
175 176 177 178
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
179
    SWWorker *worker = pool->workers + i;
180
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
181
      taosThreadJoin(worker->thread, NULL);
182 183 184 185 186
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

187
  tfree(pool->workers);
wafwerar's avatar
wafwerar 已提交
188
  taosThreadMutexDestroy(&pool->mutex);
189

S
Shengliang Guan 已提交
190
  uInfo("worker:%s is closed", pool->name);
191 192
}

S
Shengliang Guan 已提交
193
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
194
  SWWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
195
  FItems        fp = NULL;
196

S
Shengliang Guan 已提交
197 198
  void   *msg = NULL;
  void   *ahandle = NULL;
199 200 201 202 203
  int32_t numOfMsgs = 0;
  int32_t qtype = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
204
  uDebug("worker:%s:%d is running", pool->name, worker->id);
205 206

  while (1) {
S
Shengliang Guan 已提交
207
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
208
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
209
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
210 211 212
      break;
    }

S
Shengliang Guan 已提交
213
    if (fp != NULL) {
S
Shengliang Guan 已提交
214
      SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
S
Shengliang Guan 已提交
215
      (*fp)(ahandle, worker->qall, numOfMsgs);
216 217 218 219 220 221
    }
  }

  return NULL;
}

S
Shengliang Guan 已提交
222
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
223
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
224
  SWWorker *worker = pool->workers + pool->nextId;
225

226
  STaosQueue *queue = taosOpenQueue();
227
  if (queue == NULL) {
wafwerar's avatar
wafwerar 已提交
228
    taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
229
    terrno = TSDB_CODE_OUT_OF_MEMORY;
230 231 232
    return NULL;
  }

S
Shengliang Guan 已提交
233 234
  taosSetQueueFp(queue, NULL, fp);

235 236 237 238
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
    if (worker->qset == NULL) {
      taosCloseQueue(queue);
wafwerar's avatar
wafwerar 已提交
239
      taosThreadMutexUnlock(&pool->mutex);
240 241 242 243 244 245 246 247
      return NULL;
    }

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
    if (worker->qall == NULL) {
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
wafwerar's avatar
wafwerar 已提交
248
      taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
249
      terrno = TSDB_CODE_OUT_OF_MEMORY;
250 251
      return NULL;
    }
wafwerar's avatar
wafwerar 已提交
252 253 254
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
255

wafwerar's avatar
wafwerar 已提交
256
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
257
      uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
258 259 260
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
S
Shengliang Guan 已提交
261
      terrno = TSDB_CODE_OUT_OF_MEMORY;
262 263
      queue = NULL;
    } else {
S
Shengliang Guan 已提交
264
      uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
265 266 267
      pool->nextId = (pool->nextId + 1) % pool->max;
    }

wafwerar's avatar
wafwerar 已提交
268
    taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
269 270
    pool->num++;
    if (pool->num > pool->max) pool->num = pool->max;
271 272 273 274 275
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

wafwerar's avatar
wafwerar 已提交
276
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
277
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
278

279
  return queue;
S
TD-2393  
Shengliang Guan 已提交
280 281
}

S
Shengliang Guan 已提交
282
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
283
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
284
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
TD-2393  
Shengliang Guan 已提交
285
}
S
Shengliang Guan 已提交
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342

int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
  SQWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
  pPool->min = pCfg->minNum;
  pPool->max = pCfg->maxNum;
  if (tQWorkerInit(pPool) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
  if (pWorker->queue == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->name = pCfg->name;
  return 0;
}

void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tQWorkerCleanup(&pWorker->pool);
  tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}

int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
  SWWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
  pPool->max = pCfg->maxNum;
  if (tWWorkerInit(pPool) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
  if (pWorker->queue == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->name = pCfg->name;
  return 0;
}

void tWWorkerAllCleanup(SWWorkerAll *pWorker) {
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tWWorkerCleanup(&pWorker->pool);
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}