tworker.c 8.9 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
wafwerar's avatar
wafwerar 已提交
25
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker));
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
31
  (void)taosThreadMutexInit(&pool->mutex, NULL);
S
Shengliang Guan 已提交
32 33 34

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
35 36
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
37 38
  }

39
  uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
46
    if (taosCheckPthreadValid(worker->thread)) {
47
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
48 49 50
    }
  }

S
Shengliang Guan 已提交
51 52
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
53
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
54
      taosThreadJoin(worker->thread, NULL);
55
      taosThreadClear(&worker->thread);
S
TD-2393  
Shengliang Guan 已提交
56 57 58
    }
  }

wafwerar's avatar
wafwerar 已提交
59
  taosMemoryFreeClear(pool->workers);
60
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
61
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
62

S
Shengliang Guan 已提交
63
  uDebug("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
64 65
}

S
Shengliang Guan 已提交
66 67
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
68 69 70
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
71 72 73

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
74 75
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
76 77

  while (1) {
H
Hongze Cheng 已提交
78
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
79
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
S
Shengliang Guan 已提交
80
            worker->pid);
81 82 83
      break;
    }

84 85 86 87
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItem)qinfo.fp))(&qinfo, msg);
S
Shengliang Guan 已提交
88
    }
89 90

    taosUpdateItemSize(qinfo.queue, 1);
91 92 93 94 95
  }

  return NULL;
}

S
Shengliang Guan 已提交
96
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
97
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
98
  if (queue == NULL) return NULL;
S
TD-2393  
Shengliang Guan 已提交
99

S
Shengliang Guan 已提交
100
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
101
  taosSetQueueFp(queue, fp, NULL);
102
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
103 104

  // spawn a thread to process queue
105
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
106
    do {
S
Shengliang Guan 已提交
107
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
108

wafwerar's avatar
wafwerar 已提交
109 110 111
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
112

S
Shengliang Guan 已提交
113
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
114
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
115
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
116 117
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
118 119
      }

wafwerar's avatar
wafwerar 已提交
120
      taosThreadAttrDestroy(&thAttr);
121
      pool->num++;
S
Shengliang Guan 已提交
122
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
123
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
124 125
  }

wafwerar's avatar
wafwerar 已提交
126
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
127
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
128 129 130 131

  return queue;
}

S
Shengliang Guan 已提交
132
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
S
Shengliang Guan 已提交
133
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
134
  taosCloseQueue(queue);
135 136
}

S
Shengliang Guan 已提交
137
int32_t tWWorkerInit(SWWorkerPool *pool) {
138
  pool->nextId = 0;
wafwerar's avatar
wafwerar 已提交
139
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
140 141 142 143 144
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
145
  (void)taosThreadMutexInit(&pool->mutex, NULL);
146 147

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
148
    SWWorker *worker = pool->workers + i;
149 150 151 152 153 154
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

S
Shengliang Guan 已提交
155
  uDebug("worker:%s is initialized, max:%d", pool->name, pool->max);
156 157 158
  return 0;
}

S
Shengliang Guan 已提交
159
void tWWorkerCleanup(SWWorkerPool *pool) {
160
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
161
    SWWorker *worker = pool->workers + i;
162
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
163 164 165
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
166 167 168 169
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
170
    SWWorker *worker = pool->workers + i;
171
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
172
      taosThreadJoin(worker->thread, NULL);
173
      taosThreadClear(&worker->thread);
174 175 176 177 178
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

wafwerar's avatar
wafwerar 已提交
179
  taosMemoryFreeClear(pool->workers);
wafwerar's avatar
wafwerar 已提交
180
  taosThreadMutexDestroy(&pool->mutex);
181

S
Shengliang Guan 已提交
182
  uDebug("worker:%s is closed", pool->name);
183 184
}

S
Shengliang Guan 已提交
185
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
186
  SWWorkerPool *pool = worker->pool;
187 188 189 190
  SQueueInfo    qinfo = {0};
  void         *msg = NULL;
  int32_t       code = 0;
  int32_t       numOfMsgs = 0;
191 192 193

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
194 195
  worker->pid = taosGetSelfPthreadId();
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
196 197

  while (1) {
198
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
199
    if (numOfMsgs == 0) {
200
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
S
Shengliang Guan 已提交
201
            worker->pid);
202 203 204
      break;
    }

205 206 207 208
    if (qinfo.fp != NULL) {
      qinfo.workerId = worker->id;
      qinfo.threadNum = pool->num;
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
209
    }
210
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
211 212 213 214 215
  }

  return NULL;
}

S
Shengliang Guan 已提交
216
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
217
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
218
  SWWorker *worker = pool->workers + pool->nextId;
S
Shengliang Guan 已提交
219
  int32_t   code = -1;
220

221
  STaosQueue *queue = taosOpenQueue();
S
Shengliang Guan 已提交
222
  if (queue == NULL) goto _OVER;
223

S
Shengliang Guan 已提交
224
  taosSetQueueFp(queue, NULL, fp);
225 226
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
S
Shengliang Guan 已提交
227
    if (worker->qset == NULL) goto _OVER;
228 229 230

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
S
Shengliang Guan 已提交
231 232
    if (worker->qall == NULL) goto _OVER;

wafwerar's avatar
wafwerar 已提交
233 234 235
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
236
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
237

S
Shengliang Guan 已提交
238 239
    uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
    pool->nextId = (pool->nextId + 1) % pool->max;
240

wafwerar's avatar
wafwerar 已提交
241
    taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
242 243
    pool->num++;
    if (pool->num > pool->max) pool->num = pool->max;
244 245 246 247 248
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

S
Shengliang Guan 已提交
249 250 251
  while (worker->pid <= 0) taosMsleep(10);
  queue->threadId = worker->pid;
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId);
S
Shengliang Guan 已提交
252
  code = 0;
S
TD-2393  
Shengliang Guan 已提交
253

S
Shengliang Guan 已提交
254 255 256 257 258 259 260 261 262 263 264
_OVER:
  taosThreadMutexUnlock(&pool->mutex);

  if (code == -1) {
    if (queue != NULL) taosCloseQueue(queue);
    if (worker->qset != NULL) taosCloseQset(worker->qset);
    if (worker->qall != NULL) taosFreeQall(worker->qall);
    return NULL;
  } else {
    return queue;
  }
S
TD-2393  
Shengliang Guan 已提交
265 266
}

S
Shengliang Guan 已提交
267
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
S
Shengliang Guan 已提交
268
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
Shengliang Guan 已提交
269
  taosCloseQueue(queue);
S
TD-2393  
Shengliang Guan 已提交
270
}
S
Shengliang Guan 已提交
271

S
Shengliang Guan 已提交
272
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
273 274
  SQWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
275 276
  pPool->min = pCfg->min;
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
277 278
  if (tQWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
279
  pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
280 281
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
282 283 284 285
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
286
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
S
Shengliang Guan 已提交
287 288 289 290 291 292 293 294 295 296
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tQWorkerCleanup(&pWorker->pool);
  tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}

S
Shengliang Guan 已提交
297
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
S
Shengliang Guan 已提交
298 299
  SWWorkerPool *pPool = &pWorker->pool;
  pPool->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
300
  pPool->max = pCfg->max;
S
Shengliang Guan 已提交
301 302
  if (tWWorkerInit(pPool) != 0) return -1;

S
Shengliang Guan 已提交
303
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
S
Shengliang Guan 已提交
304 305
  if (pWorker->queue == NULL) return -1;

S
Shengliang Guan 已提交
306 307 308 309
  pWorker->name = pCfg->name;
  return 0;
}

S
Shengliang Guan 已提交
310
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
S
Shengliang Guan 已提交
311 312 313 314 315 316 317 318 319
  if (pWorker->queue == NULL) return;

  while (!taosQueueEmpty(pWorker->queue)) {
    taosMsleep(10);
  }

  tWWorkerCleanup(&pWorker->pool);
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}