tworker.c 7.0 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
S
Shengliang Guan 已提交
18
#include "ulog.h"
S
TD-2393  
Shengliang Guan 已提交
19 20 21
#include "tqueue.h"
#include "tworker.h"

22
typedef void* (*ThreadFp)(void *param);
23 24 25 26 27 28

int32_t tWorkerInit(SWorkerPool *pool) {
  pool->qset = taosOpenQset();
  pool->workers = calloc(sizeof(SWorker), pool->max);
  pthread_mutex_init(&pool->mutex, NULL);
  for (int i = 0; i < pool->max; ++i) {
29 30 31
    SWorker *worker = pool->workers + i;
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
32 33
  }

34
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
35 36 37
  return 0;
}

38 39
void tWorkerCleanup(SWorkerPool *pool) {
  for (int i = 0; i < pool->max; ++i) {
40 41
    SWorker *worker = pool->workers + i;
    if (taosCheckPthreadValid(worker->thread)) {
42
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
43 44 45
    }
  }

46
  for (int i = 0; i < pool->max; ++i) {
47 48 49
    SWorker *worker = pool->workers + i;
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
S
TD-2393  
Shengliang Guan 已提交
50 51 52
    }
  }

53 54 55
  free(pool->workers);
  taosCloseQset(pool->qset);
  pthread_mutex_destroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
56

57
  uInfo("worker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
58 59
}

60
static void *tWorkerThreadFp(SWorker *worker) {
S
Shengliang Guan 已提交
61 62
  SWorkerPool *pool = worker->pool;
  FProcessItem fp = NULL;
63

S
Shengliang Guan 已提交
64 65
  void   *msg = NULL;
  void   *ahandle = NULL;
66 67 68 69 70 71 72
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  uDebug("worker:%s:%d is running", pool->name, worker->id);

  while (1) {
S
Shengliang Guan 已提交
73
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
74 75 76 77
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
      break;
    }

S
Shengliang Guan 已提交
78 79 80
    if (fp) {
      (*fp)(msg, ahandle);
    }
81 82 83 84 85
  }

  return NULL;
}

S
Shengliang Guan 已提交
86
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) {
87
  pthread_mutex_lock(&pool->mutex);
88 89
  taos_queue queue = taosOpenQueue();
  if (queue == NULL) {
90
    pthread_mutex_unlock(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
91 92 93
    return NULL;
  }

S
Shengliang Guan 已提交
94
  taosSetQueueFp(queue, fp, NULL);
95
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
96 97

  // spawn a thread to process queue
98
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
99
    do {
100
      SWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
101 102 103 104 105

      pthread_attr_t thAttr;
      pthread_attr_init(&thAttr);
      pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

106 107
      if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWorkerThreadFp, worker) != 0) {
        uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
S
TD-2393  
Shengliang Guan 已提交
108 109 110
      }

      pthread_attr_destroy(&thAttr);
111
      pool->num++;
112
      uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
113
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
114 115
  }

116
  pthread_mutex_unlock(&pool->mutex);
117 118 119 120 121 122 123 124 125 126
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);

  return queue;
}

void tWorkerFreeQueue(SWorkerPool *pool, void *queue) {
  taosCloseQueue(queue);
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
}

S
Shengliang Guan 已提交
127
int32_t tMWorkerInit(SMWorkerPool *pool) {
128
  pool->nextId = 0;
S
Shengliang Guan 已提交
129
  pool->workers = calloc(sizeof(SMWorker), pool->max);
130 131 132 133
  if (pool->workers == NULL) return -1;

  pthread_mutex_init(&pool->mutex, NULL);
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
134
    SMWorker *worker = pool->workers + i;
135 136 137 138 139 140 141 142 143 144
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
  return 0;
}

S
Shengliang Guan 已提交
145
void tMWorkerCleanup(SMWorkerPool *pool) {
146
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
147
    SMWorker *worker = pool->workers + i;
148 149 150 151 152 153
    if (taosCheckPthreadValid(worker->thread)) {
      if (worker->qset) taosQsetThreadResume(worker->qset);
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
154
    SMWorker *worker = pool->workers + i;
155 156 157 158 159 160 161 162 163 164 165 166 167
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

  free(pool->workers);
  pthread_mutex_destroy(&pool->mutex);

  uInfo("worker:%s is closed", pool->name);
}

S
Shengliang Guan 已提交
168
static void *tWriteWorkerThreadFp(SMWorker *worker) {
S
Shengliang Guan 已提交
169 170
  SMWorkerPool *pool = worker->pool;
  FProcessItems fp = NULL;
171

S
Shengliang Guan 已提交
172 173
  void   *msg = NULL;
  void   *ahandle = NULL;
174 175 176 177 178 179 180 181
  int32_t numOfMsgs = 0;
  int32_t qtype = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
  uDebug("worker:%s:%d is running", pool->name, worker->id);

  while (1) {
S
Shengliang Guan 已提交
182
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
183 184 185 186 187
    if (numOfMsgs == 0) {
      uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
      break;
    }

S
Shengliang Guan 已提交
188 189
    if (fp) {
      (*fp)(worker->qall, numOfMsgs, ahandle);
190 191 192 193 194 195
    }
  }

  return NULL;
}

S
Shengliang Guan 已提交
196
taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) {
197
  pthread_mutex_lock(&pool->mutex);
S
Shengliang Guan 已提交
198
  SMWorker *worker = pool->workers + pool->nextId;
199 200 201 202 203 204 205

  taos_queue *queue = taosOpenQueue();
  if (queue == NULL) {
    pthread_mutex_unlock(&pool->mutex);
    return NULL;
  }

S
Shengliang Guan 已提交
206 207
  taosSetQueueFp(queue, NULL, fp);

208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
    if (worker->qset == NULL) {
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
      return NULL;
    }

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
    if (worker->qall == NULL) {
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
      return NULL;
    }
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) {
      uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
      queue = NULL;
    } else {
      uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
      pool->nextId = (pool->nextId + 1) % pool->max;
    }

    pthread_attr_destroy(&thAttr);
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

  pthread_mutex_unlock(&pool->mutex);
  uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
247

248
  return queue;
S
TD-2393  
Shengliang Guan 已提交
249 250
}

S
Shengliang Guan 已提交
251
void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue) {
252 253
  taosCloseQueue(queue);
  uDebug("worker:%s, queue:%p is freed", pool->name, queue);
S
TD-2393  
Shengliang Guan 已提交
254
}