tworker.c 9.0 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
Shengliang Guan 已提交
19
#include "ulog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
S
Shengliang Guan 已提交
25
  pool->workers = calloc(sizeof(SQWorker), pool->max);
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
31
  if (pthread_mutex_init(&pool->mutex, NULL)) {
S
Shengliang Guan 已提交
32
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
33 34 35 36 37
    return -1;
  }

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
38 39
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
40 41
  }

S
Shengliang Guan 已提交
42
  uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
43 44 45
  return 0;
}

S
Shengliang Guan 已提交
46 47 48
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
49
    if (worker == NULL) continue;
50
    if (taosCheckPthreadValid(worker->thread)) {
51
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
52 53 54
    }
  }

S
Shengliang Guan 已提交
55 56
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
57
    if (worker == NULL) continue;
58 59
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
S
TD-2393  
Shengliang Guan 已提交
60 61 62
    }
  }

63
  tfree(pool->workers);
64 65
  taosCloseQset(pool->qset);
  pthread_mutex_destroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67
  uDebug("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
68 69
}

S
Shengliang Guan 已提交
70 71
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
72
  FItem         fp = NULL;
73

S
Shengliang Guan 已提交
74 75
  void *  msg = NULL;
  void *  ahandle = NULL;
76 77 78 79
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
80
  uDebug("worker:%s:%d is running", pool->name, worker->id);
81 82

  while (1) {
S
Shengliang Guan 已提交
83
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
S
Shengliang Guan 已提交
84
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
85 86 87
      break;
    }

S
Shengliang Guan 已提交
88
    if (fp != NULL) {
S
Shengliang Guan 已提交
89
      (*fp)(ahandle, msg);
S
Shengliang Guan 已提交
90
    }
91 92 93 94 95
  }

  return NULL;
}

S
Shengliang Guan 已提交
96
STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) {
97
  pthread_mutex_lock(&pool->mutex);
98
  STaosQueue *queue = taosOpenQueue();
99
  if (queue == NULL) {
100
    pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
101
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
TD-2393  
Shengliang Guan 已提交
102 103 104
    return NULL;
  }

S
Shengliang Guan 已提交
105
  taosSetQueueFp(queue, fp, NULL);
106
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
107 108

  // spawn a thread to process queue
109
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
110
    do {
S
Shengliang Guan 已提交
111
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
112 113 114 115 116

      pthread_attr_t thAttr;
      pthread_attr_init(&thAttr);
      pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

S
Shengliang Guan 已提交
117 118
      if (pthread_create(&worker->thread, &thAttr, threadFp, worker) != 0) {
        uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
S
Shengliang Guan 已提交
119
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
120
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
121 122
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
123 124 125
      }

      pthread_attr_destroy(&thAttr);
126
      pool->num++;
S
Shengliang Guan 已提交
127
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
128
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
129 130
  }

131
  pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
132
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
133 134 135 136

  return queue;
}

S
Shengliang Guan 已提交
137 138 139 140
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
  return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp);
}

S
Shengliang Guan 已提交
141
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
142
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
143
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
144 145
}

S
Shengliang Guan 已提交
146 147 148 149 150 151 152
int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); }

void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); }

static void *tFWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;

S
Shengliang Guan 已提交
153
  FItem   fp = NULL;
S
Shengliang Guan 已提交
154 155 156 157 158 159 160 161 162
  void *  msg = NULL;
  void *  ahandle = NULL;
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  uDebug("worker:%s:%d is running", pool->name, worker->id);

  while (1) {
S
Shengliang Guan 已提交
163 164 165
    code = taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id);

    if (code < 0) {
S
Shengliang Guan 已提交
166 167
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
      break;
S
Shengliang Guan 已提交
168 169 170
    } else if (code == 0) {
      // uTrace("worker:%s:%d qset:%p, got no message and continue", pool->name, worker->id, pool->qset);
      continue;
S
Shengliang Guan 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183
    }

    if (fp != NULL) {
      (*fp)(ahandle, msg);
    }

    taosResetQsetThread(pool->qset, msg);
  }

  return NULL;
}

STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
S
Shengliang Guan 已提交
184
  return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp);
S
Shengliang Guan 已提交
185 186 187 188
}

void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); }

S
Shengliang Guan 已提交
189
int32_t tWWorkerInit(SWWorkerPool *pool) {
190
  pool->nextId = 0;
S
Shengliang Guan 已提交
191
  pool->workers = calloc(sizeof(SWWorker), pool->max);
S
Shengliang Guan 已提交
192 193 194 195 196 197 198 199 200
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (pthread_mutex_init(&pool->mutex, NULL) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
201 202

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
203
    SWWorker *worker = pool->workers + i;
204 205 206 207 208 209
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

S
Shengliang Guan 已提交
210
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
211 212 213
  return 0;
}

S
Shengliang Guan 已提交
214
void tWWorkerCleanup(SWWorkerPool *pool) {
215
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
216
    SWWorker *worker = pool->workers + i;
217
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
218 219 220
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
221 222 223 224
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
225
    SWWorker *worker = pool->workers + i;
226 227 228 229 230 231 232
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

233
  tfree(pool->workers);
234 235
  pthread_mutex_destroy(&pool->mutex);

S
Shengliang Guan 已提交
236
  uInfo("worker:%s is closed", pool->name);
237 238
}

S
Shengliang Guan 已提交
239
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
240
  SWWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
241
  FItems        fp = NULL;
242

S
Shengliang Guan 已提交
243 244
  void *  msg = NULL;
  void *  ahandle = NULL;
245 246 247 248 249
  int32_t numOfMsgs = 0;
  int32_t qtype = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
250
  uDebug("worker:%s:%d is running", pool->name, worker->id);
251 252

  while (1) {
S
Shengliang Guan 已提交
253
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
254
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
255
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
256 257 258
      break;
    }

S
Shengliang Guan 已提交
259
    if (fp != NULL) {
S
Shengliang Guan 已提交
260
      (*fp)(ahandle, worker->qall, numOfMsgs);
261 262 263 264 265 266
    }
  }

  return NULL;
}

S
Shengliang Guan 已提交
267
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
268
  pthread_mutex_lock(&pool->mutex);
S
Shengliang Guan 已提交
269
  SWWorker *worker = pool->workers + pool->nextId;
270

271
  STaosQueue *queue = taosOpenQueue();
272 273
  if (queue == NULL) {
    pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
274
    terrno = TSDB_CODE_OUT_OF_MEMORY;
275 276 277
    return NULL;
  }

S
Shengliang Guan 已提交
278 279
  taosSetQueueFp(queue, NULL, fp);

280 281 282 283 284 285 286 287 288 289 290 291 292 293
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
    if (worker->qset == NULL) {
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
      return NULL;
    }

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
    if (worker->qall == NULL) {
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
294
      terrno = TSDB_CODE_OUT_OF_MEMORY;
295 296 297 298 299 300
      return NULL;
    }
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

S
Shengliang Guan 已提交
301
    if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
302
      uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
303 304 305
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
S
Shengliang Guan 已提交
306
      terrno = TSDB_CODE_OUT_OF_MEMORY;
307 308
      queue = NULL;
    } else {
S
Shengliang Guan 已提交
309
      uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
310 311 312 313 314 315 316 317 318 319
      pool->nextId = (pool->nextId + 1) % pool->max;
    }

    pthread_attr_destroy(&thAttr);
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

  pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
320
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
321

322
  return queue;
S
TD-2393  
Shengliang Guan 已提交
323 324
}

S
Shengliang Guan 已提交
325
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
326
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
327
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
TD-2393  
Shengliang Guan 已提交
328
}