tworker.c 7.2 KB
Newer Older
S
TD-2393  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tworker.h"
S
Shengliang Guan 已提交
18
#include "ulog.h"
S
TD-2393  
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20
typedef void *(*ThreadFp)(void *param);
21

S
Shengliang Guan 已提交
22
int32_t tQWorkerInit(SQWorkerPool *pool) {
23
  pool->qset = taosOpenQset();
S
Shengliang Guan 已提交
24 25 26 27 28 29 30
  pool->workers = calloc(sizeof(SQWorker), pool->max);
  if (pthread_mutex_init(&pool->mutex, NULL)) {
    return -1;
  }

  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
31 32
    worker->id = i;
    worker->pool = pool;
S
TD-2393  
Shengliang Guan 已提交
33 34
  }

S
Shengliang Guan 已提交
35
  uDebug("qworker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
S
TD-2393  
Shengliang Guan 已提交
36 37 38
  return 0;
}

S
Shengliang Guan 已提交
39 40 41
void tQWorkerCleanup(SQWorkerPool *pool) {
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
42
    if (worker == NULL) continue;
43
    if (taosCheckPthreadValid(worker->thread)) {
44
      taosQsetThreadResume(pool->qset);
S
TD-2393  
Shengliang Guan 已提交
45 46 47
    }
  }

S
Shengliang Guan 已提交
48 49
  for (int32_t i = 0; i < pool->max; ++i) {
    SQWorker *worker = pool->workers + i;
S
Shengliang Guan 已提交
50
    if (worker == NULL) continue;
51 52
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
S
TD-2393  
Shengliang Guan 已提交
53 54 55
    }
  }

56
  tfree(pool->workers);
57 58
  taosCloseQset(pool->qset);
  pthread_mutex_destroy(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
59

S
Shengliang Guan 已提交
60
  uDebug("qworker:%s is closed", pool->name);
S
TD-2393  
Shengliang Guan 已提交
61 62
}

S
Shengliang Guan 已提交
63 64 65
static void *tQWorkerThreadFp(SQWorker *worker) {
  SQWorkerPool *pool = worker->pool;
  FProcessItem  fp = NULL;
66

S
Shengliang Guan 已提交
67 68
  void *  msg = NULL;
  void *  ahandle = NULL;
69 70 71 72
  int32_t code = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
73
  uDebug("qworker:%s:%d is running", pool->name, worker->id);
74 75

  while (1) {
S
Shengliang Guan 已提交
76
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
S
Shengliang Guan 已提交
77
      uDebug("qworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
78 79 80
      break;
    }

S
Shengliang Guan 已提交
81
    if (fp != NULL) {
S
Shengliang Guan 已提交
82
      (*fp)(ahandle, msg);
S
Shengliang Guan 已提交
83
    }
84 85 86 87 88
  }

  return NULL;
}

S
Shengliang Guan 已提交
89
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp) {
90
  pthread_mutex_lock(&pool->mutex);
91
  STaosQueue *queue = taosOpenQueue();
92
  if (queue == NULL) {
93
    pthread_mutex_unlock(&pool->mutex);
S
TD-2393  
Shengliang Guan 已提交
94 95 96
    return NULL;
  }

S
Shengliang Guan 已提交
97
  taosSetQueueFp(queue, fp, NULL);
98
  taosAddIntoQset(pool->qset, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
99 100

  // spawn a thread to process queue
101
  if (pool->num < pool->max) {
S
TD-2393  
Shengliang Guan 已提交
102
    do {
S
Shengliang Guan 已提交
103
      SQWorker *worker = pool->workers + pool->num;
S
TD-2393  
Shengliang Guan 已提交
104 105 106 107 108

      pthread_attr_t thAttr;
      pthread_attr_init(&thAttr);
      pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

S
Shengliang Guan 已提交
109 110 111 112 113
      if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
        uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
        taosCloseQueue(queue);
        queue = NULL;
        break;
S
TD-2393  
Shengliang Guan 已提交
114 115 116
      }

      pthread_attr_destroy(&thAttr);
117
      pool->num++;
S
Shengliang Guan 已提交
118
      uDebug("qworker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
119
    } while (pool->num < pool->min);
S
TD-2393  
Shengliang Guan 已提交
120 121
  }

122
  pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
123
  uDebug("qworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
124 125 126 127

  return queue;
}

S
Shengliang Guan 已提交
128
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
129
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
130
  uDebug("qworker:%s, queue:%p is freed", pool->name, queue);
131 132
}

S
Shengliang Guan 已提交
133
int32_t tWWorkerInit(SWWorkerPool *pool) {
134
  pool->nextId = 0;
S
Shengliang Guan 已提交
135
  pool->workers = calloc(sizeof(SWWorker), pool->max);
136 137 138 139
  if (pool->workers == NULL) return -1;

  pthread_mutex_init(&pool->mutex, NULL);
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
140
    SWWorker *worker = pool->workers + i;
141 142 143 144 145 146
    worker->id = i;
    worker->qall = NULL;
    worker->qset = NULL;
    worker->pool = pool;
  }

S
Shengliang Guan 已提交
147
  uInfo("wworker:%s is initialized, max:%d", pool->name, pool->max);
148 149 150
  return 0;
}

S
Shengliang Guan 已提交
151
void tWWorkerCleanup(SWWorkerPool *pool) {
152
  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
153
    SWWorker *worker = pool->workers + i;
154
    if (taosCheckPthreadValid(worker->thread)) {
S
Shengliang Guan 已提交
155 156 157
      if (worker->qset) {
        taosQsetThreadResume(worker->qset);
      }
158 159 160 161
    }
  }

  for (int32_t i = 0; i < pool->max; ++i) {
S
Shengliang Guan 已提交
162
    SWWorker *worker = pool->workers + i;
163 164 165 166 167 168 169
    if (taosCheckPthreadValid(worker->thread)) {
      pthread_join(worker->thread, NULL);
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
    }
  }

170
  tfree(pool->workers);
171 172
  pthread_mutex_destroy(&pool->mutex);

S
Shengliang Guan 已提交
173
  uInfo("wworker:%s is closed", pool->name);
174 175
}

S
Shengliang Guan 已提交
176 177
static void *tWriteWorkerThreadFp(SWWorker *worker) {
  SWWorkerPool *pool = worker->pool;
S
Shengliang Guan 已提交
178
  FProcessItems fp = NULL;
179

S
Shengliang Guan 已提交
180 181
  void *  msg = NULL;
  void *  ahandle = NULL;
182 183 184 185 186
  int32_t numOfMsgs = 0;
  int32_t qtype = 0;

  taosBlockSIGPIPE();
  setThreadName(pool->name);
S
Shengliang Guan 已提交
187
  uDebug("wworker:%s:%d is running", pool->name, worker->id);
188 189

  while (1) {
S
Shengliang Guan 已提交
190
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
191
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
192
      uDebug("wworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
193 194 195
      break;
    }

S
Shengliang Guan 已提交
196
    if (fp != NULL) {
S
Shengliang Guan 已提交
197
      (*fp)(ahandle, worker->qall, numOfMsgs);
198 199 200 201 202 203
    }
  }

  return NULL;
}

S
Shengliang Guan 已提交
204
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp) {
205
  pthread_mutex_lock(&pool->mutex);
S
Shengliang Guan 已提交
206
  SWWorker *worker = pool->workers + pool->nextId;
207

208
  STaosQueue *queue = taosOpenQueue();
209 210 211 212 213
  if (queue == NULL) {
    pthread_mutex_unlock(&pool->mutex);
    return NULL;
  }

S
Shengliang Guan 已提交
214 215
  taosSetQueueFp(queue, NULL, fp);

216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
  if (worker->qset == NULL) {
    worker->qset = taosOpenQset();
    if (worker->qset == NULL) {
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
      return NULL;
    }

    taosAddIntoQset(worker->qset, queue, ahandle);
    worker->qall = taosAllocateQall();
    if (worker->qall == NULL) {
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
      pthread_mutex_unlock(&pool->mutex);
      return NULL;
    }
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) {
S
Shengliang Guan 已提交
237
      uError("wworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
238 239 240 241 242
      taosFreeQall(worker->qall);
      taosCloseQset(worker->qset);
      taosCloseQueue(queue);
      queue = NULL;
    } else {
S
Shengliang Guan 已提交
243
      uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
244 245 246 247 248 249 250 251 252 253
      pool->nextId = (pool->nextId + 1) % pool->max;
    }

    pthread_attr_destroy(&thAttr);
  } else {
    taosAddIntoQset(worker->qset, queue, ahandle);
    pool->nextId = (pool->nextId + 1) % pool->max;
  }

  pthread_mutex_unlock(&pool->mutex);
S
Shengliang Guan 已提交
254
  uDebug("wworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
S
TD-2393  
Shengliang Guan 已提交
255

256
  return queue;
S
TD-2393  
Shengliang Guan 已提交
257 258
}

S
Shengliang Guan 已提交
259
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
260
  taosCloseQueue(queue);
S
Shengliang Guan 已提交
261
  uDebug("wworker:%s, queue:%p is freed", pool->name, queue);
S
TD-2393  
Shengliang Guan 已提交
262
}