tworker.c 7.7 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
S
TD-2393  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
typedef void *(*ThreadFp)(void *param);
22

S
Shengliang Guan 已提交
23
int32_t tQWorkerInit(SQWorkerPool *pool) {
24
  pool->qset = taosOpenQset();
S
Shengliang Guan 已提交
25
  pool->workers = calloc(sizeof(SQWorker), pool->max);
S
Shengliang Guan 已提交
26 27 28 29 30
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
31
  if (taosThreadMutexInit(&pool->mutex, NULL)) {
S
Shengliang Guan 已提交
32
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
33 34 35 36 37
    return -1;
  }

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
38 39
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
40 41
  }

S
Shengliang Guan 已提交
42
  uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
43 44 45
  return 0;
}

S
Shengliang Guan 已提交
46 47 48
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
49
    if (worker == NULL) continue;
50
    if (taosCheckPthreadValid(worker->thread)) {
51
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
52 53 54
    }
  }

S
Shengliang Guan 已提交
55 56
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
57
    if (worker == NULL) continue;
58
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
59
      taosThreadJoin(worker->thread, NULL);
S
TD-2393  
Shengliang Guan 已提交
60 61 62
    }
  }

63
  tfree(pool->workers);
64
  taosCloseQset(pool->qset);
wafwerar's avatar
wafwerar 已提交
65
  taosThreadMutexDestroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67
  uDebug("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
68 69
}

S
Shengliang Guan 已提交
70 71
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
72
  FItem         fp = NULL;
73

S
Shengliang Guan 已提交
74 75
  void   *msg = NULL;
  void   *ahandle = NULL;
76 77 78 79
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
80
  uDebug("worker:%s:%d is running", pool->name, worker->id);
81 82

  while (1) {
S
Shengliang Guan 已提交
83
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
S
Shengliang Guan 已提交
84
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
85 86 87
      break;
    }

S
Shengliang Guan 已提交
88
    if (fp != NULL) {
S
Shengliang Guan 已提交
89
      (*fp)(ahandle, msg);
S
Shengliang Guan 已提交
90
    }
91 92 93 94 95
  }

  return NULL;
}

S
Shengliang Guan 已提交
96
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
wafwerar's avatar
wafwerar 已提交
97
  taosThreadMutexLock(&pool->mutex);
98
  STaosQueue *queue = taosOpenQueue();
99
  if (queue == NULL) {
wafwerar's avatar
wafwerar 已提交
100
    taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
101
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
TD-2393  
Shengliang Guan 已提交
102 103 104
    return NULL;
  }

S
Shengliang Guan 已提交
105
  taosSetQueueFp(queue, fp, NULL);
106
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
107 108

  // spawn a thread to process queue
109
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
110
    do {
S
Shengliang Guan 已提交
111
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
112

wafwerar's avatar
wafwerar 已提交
113 114 115
      TdThreadAttr thAttr;
      taosThreadAttrInit(&thAttr);
      taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
TD-2393  
Shengliang Guan 已提交
116

S
Shengliang Guan 已提交
117
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
118
        uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
S
Shengliang Guan 已提交
119
        taosCloseQueue(queue);
S
Shengliang Guan 已提交
120
        terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
121 122
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
123 124
      }

wafwerar's avatar
wafwerar 已提交
125
      taosThreadAttrDestroy(&thAttr);
126
      pool->num++;
S
Shengliang Guan 已提交
127
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
128
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
129 130
  }

wafwerar's avatar
wafwerar 已提交
131
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
132
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
133 134 135 136

  return queue;
}

S
Shengliang Guan 已提交
137
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
138
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
139
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
140 141
}

S
Shengliang Guan 已提交
142
int32_t tWWorkerInit(SWWorkerPool *pool) {
143
  pool->nextId = 0;
L
Liu Jicong 已提交
144
  pool->workers = calloc(pool->max, sizeof(SWWorker));
S
Shengliang Guan 已提交
145 146 147 148 149
  if (pool->workers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
150
  if (taosThreadMutexInit(&pool->mutex, NULL) != 0) {
S
Shengliang Guan 已提交
151 152 153
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
154 155

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
156
    SWWorker *worker = pool->workers + i;
157 158 159 160 161 162
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

S
Shengliang Guan 已提交
163
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
164 165 166
  return 0;
}

S
Shengliang Guan 已提交
167
void tWWorkerCleanup(SWWorkerPool *pool) {
168
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
169
    SWWorker *worker = pool->workers + i;
170
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
171 172 173
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
174 175 176 177
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
178
    SWWorker *worker = pool->workers + i;
179
    if (taosCheckPthreadValid(worker->thread)) {
wafwerar's avatar
wafwerar 已提交
180
      taosThreadJoin(worker->thread, NULL);
181 182 183 184 185
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

186
  tfree(pool->workers);
wafwerar's avatar
wafwerar 已提交
187
  taosThreadMutexDestroy(&pool->mutex);
188

S
Shengliang Guan 已提交
189
  uInfo("worker:%s is closed", pool->name);
190 191
}

S
Shengliang Guan 已提交
192
static void *tWWorkerThreadFp(SWWorker *worker) {
S
Shengliang Guan 已提交
193
  SWWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
194
  FItems        fp = NULL;
195

S
Shengliang Guan 已提交
196 197
  void   *msg = NULL;
  void   *ahandle = NULL;
198 199 200 201 202
  int32_t numOfMsgs = 0;
  int32_t qtype = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
203
  uDebug("worker:%s:%d is running", pool->name, worker->id);
204 205

  while (1) {
S
Shengliang Guan 已提交
206
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
207
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
208
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
209 210 211
      break;
    }

S
Shengliang Guan 已提交
212
    if (fp != NULL) {
S
Shengliang Guan 已提交
213
      (*fp)(ahandle, worker->qall, numOfMsgs);
214 215 216 217 218 219
    }
  }

  return NULL;
}

S
Shengliang Guan 已提交
220
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
wafwerar's avatar
wafwerar 已提交
221
  taosThreadMutexLock(&pool->mutex);
S
Shengliang Guan 已提交
222
  SWWorker *worker = pool->workers + pool->nextId;
223

224
  STaosQueue *queue = taosOpenQueue();
225
  if (queue == NULL) {
wafwerar's avatar
wafwerar 已提交
226
    taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
227
    terrno = TSDB_CODE_OUT_OF_MEMORY;
228 229 230
    return NULL;
  }

S
Shengliang Guan 已提交
231 232
  taosSetQueueFp(queue, NULL, fp);

233 234 235 236
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
    if (worker->qset == NULL) {
      taosCloseQueue(queue);
wafwerar's avatar
wafwerar 已提交
237
      taosThreadMutexUnlock(&pool->mutex);
238 239 240 241 242 243 244 245
      return NULL;
    }

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
    if (worker->qall == NULL) {
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
wafwerar's avatar
wafwerar 已提交
246
      taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
247
      terrno = TSDB_CODE_OUT_OF_MEMORY;
248 249
      return NULL;
    }
wafwerar's avatar
wafwerar 已提交
250 251 252
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
253

wafwerar's avatar
wafwerar 已提交
254
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
255
      uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
256 257 258
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
S
Shengliang Guan 已提交
259
      terrno = TSDB_CODE_OUT_OF_MEMORY;
260 261
      queue = NULL;
    } else {
S
Shengliang Guan 已提交
262
      uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
263 264 265
      pool->nextId = (pool->nextId + 1) % pool->max;
    }

wafwerar's avatar
wafwerar 已提交
266
    taosThreadAttrDestroy(&thAttr);
267 268 269 270 271
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

wafwerar's avatar
wafwerar 已提交
272
  taosThreadMutexUnlock(&pool->mutex);
S
Shengliang Guan 已提交
273
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
274

275
  return queue;
S
TD-2393  
Shengliang Guan 已提交
276 277
}

S
Shengliang Guan 已提交
278
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
279
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
280
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
TD-2393  
Shengliang Guan 已提交
281
}