tworker.c 7.1 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
S
Shengliang Guan 已提交
18
#include "ulog.h"
S
TD-2393  
Shengliang Guan 已提交
19 20 21
#include "tqueue.h"
#include "tworker.h"

22
typedef void* (*ThreadFp)(void *param);
23 24 25 26 27 28

int32_t tWorkerInit(SWorkerPool *pool) {
  pool->qset = taosOpenQset();
  pool->workers = calloc(sizeof(SWorker), pool->max);
  pthread_mutex_init(&pool->mutex, NULL);
  for (int i = 0; i < pool->max; ++i) {
29 30 31
    SWorker *worker = pool->workers + i;
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
32 33
  }

34
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
35 36 37
  return 0;
}

38 39
void tWorkerCleanup(SWorkerPool *pool) {
  for (int i = 0; i < pool->max; ++i) {
40
    SWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
41
    if (worker == NULL) continue;
42
    if (taosCheckPthreadValid(worker->thread)) {
43
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
44 45 46
    }
  }

47
  for (int i = 0; i < pool->max; ++i) {
48
    SWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
49
    if (worker == NULL) continue;
50 51
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
S
TD-2393  
Shengliang Guan 已提交
52 53 54
    }
  }

55
  tfree(pool->workers);
56 57
  taosCloseQset(pool->qset);
  pthread_mutex_destroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
58

59
  uInfo("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
60 61
}

62
static void *tWorkerThreadFp(SWorker *worker) {
S
Shengliang Guan 已提交
63 64
  SWorkerPool *pool = worker->pool;
  FProcessItem fp = NULL;
65

S
Shengliang Guan 已提交
66 67
  void   *msg = NULL;
  void   *ahandle = NULL;
68 69 70 71 72 73 74
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  uDebug("worker:%s:%d is running", pool->name, worker->id);

  while (1) {
S
Shengliang Guan 已提交
75
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
76 77 78 79
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
      break;
    }

S
Shengliang Guan 已提交
80
    if (fp) {
S
Shengliang Guan 已提交
81
      (*fp)(ahandle, msg);
S
Shengliang Guan 已提交
82
    }
83 84 85 86 87
  }

  return NULL;
}

S
Shengliang Guan 已提交
88
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) {
89
  pthread_mutex_lock(&pool->mutex);
90 91
  taos_queue queue = taosOpenQueue();
  if (queue == NULL) {
92
    pthread_mutex_unlock(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
93 94 95
    return NULL;
  }

S
Shengliang Guan 已提交
96
  taosSetQueueFp(queue, fp, NULL);
97
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
98 99

  // spawn a thread to process queue
100
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
101
    do {
102
      SWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
103 104 105 106 107

      pthread_attr_t thAttr;
      pthread_attr_init(&thAttr);
      pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

108 109
      if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWorkerThreadFp, worker) != 0) {
        uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
S
TD-2393  
Shengliang Guan 已提交
110 111 112
      }

      pthread_attr_destroy(&thAttr);
113
      pool->num++;
114
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
115
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
116 117
  }

118
  pthread_mutex_unlock(&pool->mutex);
119 120 121 122 123 124 125 126 127 128
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);

  return queue;
}

void tWorkerFreeQueue(SWorkerPool *pool, void *queue) {
  taosCloseQueue(queue);
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
}

S
Shengliang Guan 已提交
129
int32_t tMWorkerInit(SMWorkerPool *pool) {
130
  pool->nextId = 0;
S
Shengliang Guan 已提交
131
  pool->workers = calloc(sizeof(SMWorker), pool->max);
132 133 134 135
  if (pool->workers == NULL) return -1;

  pthread_mutex_init(&pool->mutex, NULL);
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
136
    SMWorker *worker = pool->workers + i;
137 138 139 140 141 142 143 144 145 146
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
  return 0;
}

S
Shengliang Guan 已提交
147
void tMWorkerCleanup(SMWorkerPool *pool) {
148
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
149
    SMWorker *worker = pool->workers + i;
150 151 152 153 154 155
    if (taosCheckPthreadValid(worker->thread)) {
      if (worker->qset) taosQsetThreadResume(worker->qset);
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
156
    SMWorker *worker = pool->workers + i;
157 158 159 160 161 162 163
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

164
  tfree(pool->workers);
165 166 167 168 169
  pthread_mutex_destroy(&pool->mutex);

  uInfo("worker:%s is closed", pool->name);
}

S
Shengliang Guan 已提交
170
static void *tWriteWorkerThreadFp(SMWorker *worker) {
S
Shengliang Guan 已提交
171 172
  SMWorkerPool *pool = worker->pool;
  FProcessItems fp = NULL;
173

S
Shengliang Guan 已提交
174 175
  void   *msg = NULL;
  void   *ahandle = NULL;
176 177 178 179 180 181 182 183
  int32_t numOfMsgs = 0;
  int32_t qtype = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  uDebug("worker:%s:%d is running", pool->name, worker->id);

  while (1) {
S
Shengliang Guan 已提交
184
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
185 186 187 188 189
    if (numOfMsgs == 0) {
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
      break;
    }

S
Shengliang Guan 已提交
190
    if (fp) {
S
Shengliang Guan 已提交
191
      (*fp)(ahandle, worker->qall, numOfMsgs);
192 193 194 195 196 197
    }
  }

  return NULL;
}

S
Shengliang Guan 已提交
198
taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) {
199
  pthread_mutex_lock(&pool->mutex);
S
Shengliang Guan 已提交
200
  SMWorker *worker = pool->workers + pool->nextId;
201 202 203 204 205 206 207

  taos_queue *queue = taosOpenQueue();
  if (queue == NULL) {
    pthread_mutex_unlock(&pool->mutex);
    return NULL;
  }

S
Shengliang Guan 已提交
208 209
  taosSetQueueFp(queue, NULL, fp);

210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
    if (worker->qset == NULL) {
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
      return NULL;
    }

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
    if (worker->qall == NULL) {
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
      return NULL;
    }
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) {
      uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
      queue = NULL;
    } else {
      uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
      pool->nextId = (pool->nextId + 1) % pool->max;
    }

    pthread_attr_destroy(&thAttr);
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

  pthread_mutex_unlock(&pool->mutex);
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
249

250
  return queue;
S
TD-2393  
Shengliang Guan 已提交
251 252
}

S
Shengliang Guan 已提交
253
void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue) {
254 255
  taosCloseQueue(queue);
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
TD-2393  
Shengliang Guan 已提交
256
}