tworker.c 8.8 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
Shengliang Guan 已提交
19
#include "ulog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
S
Shengliang Guan 已提交
25
  pool->workers = calloc(sizeof(SQWorker), pool->max);
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
31
  if (pthread_mutex_init(&pool->mutex, NULL)) {
S
Shengliang Guan 已提交
32
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
33 34 35 36 37
    return -1;
  }

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
38 39
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
40 41
  }

S
Shengliang Guan 已提交
42
  uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
43 44 45
  return 0;
}

S
Shengliang Guan 已提交
46 47 48
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
49
    if (worker == NULL) continue;
50
    if (taosCheckPthreadValid(worker->thread)) {
51
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
52 53 54
    }
  }

S
Shengliang Guan 已提交
55 56
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
57
    if (worker == NULL) continue;
58 59
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
S
TD-2393  
Shengliang Guan 已提交
60 61 62
    }
  }

63
  tfree(pool->workers);
64 65
  taosCloseQset(pool->qset);
  pthread_mutex_destroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67
  uDebug("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
68 69
}

S
Shengliang Guan 已提交
70 71
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
72
  FItem         fp = NULL;
73

S
Shengliang Guan 已提交
74 75
  void *  msg = NULL;
  void *  ahandle = NULL;
76 77 78 79
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
80
  uDebug("worker:%s:%d is running", pool->name, worker->id);
81 82

  while (1) {
S
Shengliang Guan 已提交
83
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
S
Shengliang Guan 已提交
84
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
85 86 87
      break;
    }

S
Shengliang Guan 已提交
88
    if (fp != NULL) {
S
Shengliang Guan 已提交
89
      (*fp)(ahandle, msg);
S
Shengliang Guan 已提交
90
    }
91 92 93 94 95
  }

  return NULL;
}

S
Shengliang Guan 已提交
96
STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) {
97
  pthread_mutex_lock(&pool->mutex);
98
  STaosQueue *queue = taosOpenQueue();
99
  if (queue == NULL) {
100
    pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
101
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
TD-2393  
Shengliang Guan 已提交
102 103 104
    return NULL;
  }

S
Shengliang Guan 已提交
105
  taosSetQueueFp(queue, fp, NULL);
106
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
107 108

  // spawn a thread to process queue
109
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
110
    do {
S
Shengliang Guan 已提交
111
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
112 113 114 115 116

      pthread_attr_t thAttr;
      pthread_attr_init(&thAttr);
      pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

S
Shengliang Guan 已提交
117 118
      if (pthread_create(&worker->thread, &thAttr, threadFp, worker) != 0) {
        uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
S
Shengliang Guan 已提交
119
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
120
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
121 122
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
123 124 125
      }

      pthread_attr_destroy(&thAttr);
126
      pool->num++;
S
Shengliang Guan 已提交
127
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
128
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
129 130
  }

131
  pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
132
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
133 134 135 136

  return queue;
}

S
Shengliang Guan 已提交
137 138 139 140
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
  return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp);
}

S
Shengliang Guan 已提交
141
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
142
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
143
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
144 145
}

S
Shengliang Guan 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); }

void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); }

static void *tFWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
  FItem         fp = NULL;

  void *  msg = NULL;
  void *  ahandle = NULL;
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  uDebug("worker:%s:%d is running", pool->name, worker->id);

  while (1) {
    if (taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id) == 0) {
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
      break;
    }

    if (fp != NULL) {
      (*fp)(ahandle, msg);
    }

    taosResetQsetThread(pool->qset, msg);
  }

  return NULL;
}

STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
  return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tFWorkerThreadFp);
}

void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); }

S
Shengliang Guan 已提交
184
int32_t tWWorkerInit(SWWorkerPool *pool) {
185
  pool->nextId = 0;
S
Shengliang Guan 已提交
186
  pool->workers = calloc(sizeof(SWWorker), pool->max);
S
Shengliang Guan 已提交
187 188 189 190 191 192 193 194 195
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (pthread_mutex_init(&pool->mutex, NULL) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
196 197

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
198
    SWWorker *worker = pool->workers + i;
199 200 201 202 203 204
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

S
Shengliang Guan 已提交
205
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
206 207 208
  return 0;
}

S
Shengliang Guan 已提交
209
void tWWorkerCleanup(SWWorkerPool *pool) {
210
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
211
    SWWorker *worker = pool->workers + i;
212
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
213 214 215
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
216 217 218 219
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
220
    SWWorker *worker = pool->workers + i;
221 222 223 224 225 226 227
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

228
  tfree(pool->workers);
229 230
  pthread_mutex_destroy(&pool->mutex);

S
Shengliang Guan 已提交
231
  uInfo("worker:%s is closed", pool->name);
232 233
}

S
Shengliang Guan 已提交
234 235
static void *tWriteWorkerThreadFp(SWWorker *worker) {
  SWWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
236
  FItems        fp = NULL;
237

S
Shengliang Guan 已提交
238 239
  void *  msg = NULL;
  void *  ahandle = NULL;
240 241 242 243 244
  int32_t numOfMsgs = 0;
  int32_t qtype = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
245
  uDebug("worker:%s:%d is running", pool->name, worker->id);
246 247

  while (1) {
S
Shengliang Guan 已提交
248
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
249
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
250
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
251 252 253
      break;
    }

S
Shengliang Guan 已提交
254
    if (fp != NULL) {
S
Shengliang Guan 已提交
255
      (*fp)(ahandle, worker->qall, numOfMsgs);
256 257 258 259 260 261
    }
  }

  return NULL;
}

S
Shengliang Guan 已提交
262
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
263
  pthread_mutex_lock(&pool->mutex);
S
Shengliang Guan 已提交
264
  SWWorker *worker = pool->workers + pool->nextId;
265

266
  STaosQueue *queue = taosOpenQueue();
267 268
  if (queue == NULL) {
    pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
269
    terrno = TSDB_CODE_OUT_OF_MEMORY;
270 271 272
    return NULL;
  }

S
Shengliang Guan 已提交
273 274
  taosSetQueueFp(queue, NULL, fp);

275 276 277 278 279 280 281 282 283 284 285 286 287 288
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
    if (worker->qset == NULL) {
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
      return NULL;
    }

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
    if (worker->qall == NULL) {
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
289
      terrno = TSDB_CODE_OUT_OF_MEMORY;
290 291 292 293 294 295 296
      return NULL;
    }
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
297
      uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
298 299 300
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
S
Shengliang Guan 已提交
301
      terrno = TSDB_CODE_OUT_OF_MEMORY;
302 303
      queue = NULL;
    } else {
S
Shengliang Guan 已提交
304
      uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
305 306 307 308 309 310 311 312 313 314
      pool->nextId = (pool->nextId + 1) % pool->max;
    }

    pthread_attr_destroy(&thAttr);
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

  pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
315
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
316

317
  return queue;
S
TD-2393  
Shengliang Guan 已提交
318 319
}

S
Shengliang Guan 已提交
320
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
321
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
322
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
TD-2393  
Shengliang Guan 已提交
323
}