tworker.c 9.1 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
S
Shengliang Guan 已提交
25
  pool->workers = calloc(pool->max, sizeof(SQWorker));
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
31
  if (taosThreadMutexInit(&pool->mutex, NULL)) {
S
Shengliang Guan 已提交
32
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
33 34 35 36 37
    return -1;
  }

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
38 39
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
40 41
  }

S
Shengliang Guan 已提交
42
  uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
43 44 45
  return 0;
}

S
Shengliang Guan 已提交
46 47 48
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
49
    if (worker == NULL) continue;
50
    if (taosCheckPthreadValid(worker->thread)) {
51
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
52 53 54
    }
  }

S
Shengliang Guan 已提交
55 56
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
57
    if (worker == NULL) continue;
58
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
59
      taosThreadJoin(worker->thread, NULL);
S
TD-2393  
Shengliang Guan 已提交
60 61 62
    }
  }

63
  tfree(pool->workers);
64
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
65
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67
  uDebug("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
68 69
}

S
Shengliang Guan 已提交
70 71
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
72
  FItem         fp = NULL;
73

S
Shengliang Guan 已提交
74 75
  void   *msg = NULL;
  void   *ahandle = NULL;
76 77 78 79
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
80
  uDebug("worker:%s:%d is running", pool->name, worker->id);
81 82

  while (1) {
S
Shengliang Guan 已提交
83
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
S
Shengliang Guan 已提交
84
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
85 86 87
      break;
    }

S
Shengliang Guan 已提交
88
    if (fp != NULL) {
S
Shengliang Guan 已提交
89
      (*fp)(ahandle, msg);
S
Shengliang Guan 已提交
90
    }
91 92 93 94 95
  }

  return NULL;
}

S
Shengliang Guan 已提交
96
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
wafwerar's avatar
wafwerar 已提交
97
  taosThreadMutexLock(&pool->mutex);
98
  STaosQueue *queue = taosOpenQueue();
99
  if (queue == NULL) {
wafwerar's avatar
wafwerar 已提交
100
    taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
101
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
TD-2393  
Shengliang Guan 已提交
102 103 104
    return NULL;
  }

S
Shengliang Guan 已提交
105
  taosSetQueueFp(queue, fp, NULL);
106
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
107 108

  // spawn a thread to process queue
109
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
110
    do {
S
Shengliang Guan 已提交
111
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
112

wafwerar's avatar
wafwerar 已提交
113 114 115
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
116

S
Shengliang Guan 已提交
117
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
118
        uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
S
Shengliang Guan 已提交
119
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
120
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
121 122
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
123 124
      }

wafwerar's avatar
wafwerar 已提交
125
      taosThreadAttrDestroy(&thAttr);
126
      pool->num++;
S
Shengliang Guan 已提交
127
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
128
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
129 130
  }

wafwerar's avatar
wafwerar 已提交
131
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
132
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
133 134 135 136

  return queue;
}

S
Shengliang Guan 已提交
137
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
138
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
139
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
140 141
}

S
Shengliang Guan 已提交
142
int32_t tWWorkerInit(SWWorkerPool *pool) {
143
  pool->nextId = 0;
L
Liu Jicong 已提交
144
  pool->workers = calloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
145 146 147 148 149
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
150
  if (taosThreadMutexInit(&pool->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
151 152 153
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
154 155

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
156
    SWWorker *worker = pool->workers + i;
157 158 159 160 161 162
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

S
Shengliang Guan 已提交
163
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
164 165 166
  return 0;
}

S
Shengliang Guan 已提交
167
void tWWorkerCleanup(SWWorkerPool *pool) {
168
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
169
    SWWorker *worker = pool->workers + i;
170
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
171 172 173
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
174 175 176 177
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
178
    SWWorker *worker = pool->workers + i;
179
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
180
      taosThreadJoin(worker->thread, NULL);
181 182 183 184 185
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

186
  tfree(pool->workers);
wafwerar's avatar
wafwerar 已提交
187
  taosThreadMutexDestroy(&pool->mutex);
188

S
Shengliang Guan 已提交
189
  uInfo("worker:%s is closed", pool->name);
190 191
}

S
Shengliang Guan 已提交
192
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
193
  SWWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
194
  FItems        fp = NULL;
195

S
Shengliang Guan 已提交
196 197
  void   *msg = NULL;
  void   *ahandle = NULL;
198 199 200 201 202
  int32_t numOfMsgs = 0;
  int32_t qtype = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
203
  uDebug("worker:%s:%d is running", pool->name, worker->id);
204 205

  while (1) {
S
Shengliang Guan 已提交
206
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
207
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
208
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
209 210 211
      break;
    }

S
Shengliang Guan 已提交
212
    if (fp != NULL) {
S
Shengliang Guan 已提交
213
      (*fp)(ahandle, worker->qall, numOfMsgs);
214 215 216 217 218 219
    }
  }

  return NULL;
}

S
Shengliang Guan 已提交
220
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
221
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
222
  SWWorker *worker = pool->workers + pool->nextId;
223

224
  STaosQueue *queue = taosOpenQueue();
225
  if (queue == NULL) {
wafwerar's avatar
wafwerar 已提交
226
    taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
227
    terrno = TSDB_CODE_OUT_OF_MEMORY;
228 229 230
    return NULL;
  }

S
Shengliang Guan 已提交
231 232
  taosSetQueueFp(queue, NULL, fp);

233 234 235 236
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
    if (worker->qset == NULL) {
      taosCloseQueue(queue);
wafwerar's avatar
wafwerar 已提交
237
      taosThreadMutexUnlock(&pool->mutex);
238 239 240 241 242 243 244 245
      return NULL;
    }

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
    if (worker->qall == NULL) {
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
wafwerar's avatar
wafwerar 已提交
246
      taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
247
      terrno = TSDB_CODE_OUT_OF_MEMORY;
248 249
      return NULL;
    }
wafwerar's avatar
wafwerar 已提交
250 251 252
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
253

wafwerar's avatar
wafwerar 已提交
254
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
255
      uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
256 257 258
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
S
Shengliang Guan 已提交
259
      terrno = TSDB_CODE_OUT_OF_MEMORY;
260 261
      queue = NULL;
    } else {
S
Shengliang Guan 已提交
262
      uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
263 264 265
      pool->nextId = (pool->nextId + 1) % pool->max;
    }

wafwerar's avatar
wafwerar 已提交
266
    taosThreadAttrDestroy(&thAttr);
267 268 269 270 271
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

wafwerar's avatar
wafwerar 已提交
272
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
273
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
274

275
  return queue;
S
TD-2393  
Shengliang Guan 已提交
276 277
}

S
Shengliang Guan 已提交
278
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
279
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
280
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
TD-2393  
Shengliang Guan 已提交
281
}
S
Shengliang Guan 已提交
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338

int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
  SQWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
  pPool->min = pCfg->minNum;
  pPool->max = pCfg->maxNum;
  if (tQWorkerInit(pPool) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
  if (pWorker->queue == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->name = pCfg->name;
  return 0;
}

void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tQWorkerCleanup(&pWorker->pool);
  tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}

int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
  SWWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
  pPool->max = pCfg->maxNum;
  if (tWWorkerInit(pPool) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
  if (pWorker->queue == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pWorker->name = pCfg->name;
  return 0;
}

void tWWorkerAllCleanup(SWWorkerAll *pWorker) {
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tWWorkerCleanup(&pWorker->pool);
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}