tsched.c 7.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16 17
#define _DEFAULT_SOURCE
#include "tsched.h"
18
#include "tdef.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
20
#include "ttimer.h"
S
Shengliang Guan 已提交
21
#include "tutil.h"
22

S
Shengliang Guan 已提交
23
#define DUMP_SCHEDULER_TIME_WINDOW 30000  // every 30sec, take a snap shot of task queue.
H
hzcheng 已提交
24

25
static void *taosProcessSchedQueue(void *param);
S
Shengliang Guan 已提交
26
static void  taosDumpSchedulerStatus(void *qhandle, void *tmrId);
H
hzcheng 已提交
27

D
dapan1121 已提交
28
void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label, SSchedQueue *pSched) {
D
dapan1121 已提交
29 30
  bool schedMalloced = false;
  
D
dapan1121 已提交
31 32 33 34 35 36
  if (NULL == pSched) {
    pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1);
    if (pSched == NULL) {
      uError("%s: no enough memory for pSched", label);
      return NULL;
    }
D
dapan1121 已提交
37 38

    schedMalloced = true;
J
Jeff Tao 已提交
39 40
  }

wafwerar's avatar
wafwerar 已提交
41
  pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize);
J
Jeff Tao 已提交
42
  if (pSched->queue == NULL) {
S
slguan 已提交
43
    uError("%s: no enough memory for queue", label);
J
Jeff Tao 已提交
44
    taosCleanUpScheduler(pSched);
D
dapan1121 已提交
45 46 47
    if (schedMalloced) {
      taosMemoryFree(pSched);
    }
J
Jeff Tao 已提交
48 49 50
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
51
  pSched->qthread = taosMemoryCalloc(sizeof(TdThread), numOfThreads);
J
Jeff Tao 已提交
52
  if (pSched->qthread == NULL) {
S
slguan 已提交
53
    uError("%s: no enough memory for qthread", label);
J
Jeff Tao 已提交
54
    taosCleanUpScheduler(pSched);
D
dapan1121 已提交
55 56 57
    if (schedMalloced) {
      taosMemoryFree(pSched);
    }
J
Jeff Tao 已提交
58
    return NULL;
59
  }
H
hzcheng 已提交
60 61

  pSched->queueSize = queueSize;
S
Shengliang Guan 已提交
62
  tstrncpy(pSched->label, label, sizeof(pSched->label));  // fix buffer overflow
H
hzcheng 已提交
63

J
Jeff Tao 已提交
64 65 66
  pSched->fullSlot = 0;
  pSched->emptySlot = 0;

wafwerar's avatar
wafwerar 已提交
67
  if (taosThreadMutexInit(&pSched->queueMutex, NULL) < 0) {
S
slguan 已提交
68
    uError("init %s:queueMutex failed(%s)", label, strerror(errno));
J
Jeff Tao 已提交
69
    taosCleanUpScheduler(pSched);
D
dapan1121 已提交
70 71 72
    if (schedMalloced) {
      taosMemoryFree(pSched);
    }
J
Jeff Tao 已提交
73
    return NULL;
H
hzcheng 已提交
74 75
  }

S
Shengliang Guan 已提交
76
  if (tsem_init(&pSched->emptySem, 0, (uint32_t)pSched->queueSize) != 0) {
S
slguan 已提交
77
    uError("init %s:empty semaphore failed(%s)", label, strerror(errno));
J
Jeff Tao 已提交
78
    taosCleanUpScheduler(pSched);
D
dapan1121 已提交
79 80 81
    if (schedMalloced) {
      taosMemoryFree(pSched);
    }
J
Jeff Tao 已提交
82
    return NULL;
H
hzcheng 已提交
83 84
  }

S
slguan 已提交
85
  if (tsem_init(&pSched->fullSem, 0, 0) != 0) {
S
slguan 已提交
86
    uError("init %s:full semaphore failed(%s)", label, strerror(errno));
J
Jeff Tao 已提交
87
    taosCleanUpScheduler(pSched);
D
dapan1121 已提交
88 89 90
    if (schedMalloced) {
      taosMemoryFree(pSched);
    }
J
Jeff Tao 已提交
91
    return NULL;
H
hzcheng 已提交
92 93
  }

D
dapan1121 已提交
94
  atomic_store_8(&pSched->stop, 0);
S
Shengliang Guan 已提交
95
  for (int32_t i = 0; i < numOfThreads; ++i) {
wafwerar's avatar
wafwerar 已提交
96 97 98 99 100
    TdThreadAttr attr;
    taosThreadAttrInit(&attr);
    taosThreadAttrSetDetachState(&attr, PTHREAD_CREATE_JOINABLE);
    int32_t code = taosThreadCreate(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched);
    taosThreadAttrDestroy(&attr);
J
Jeff Tao 已提交
101
    if (code != 0) {
S
slguan 已提交
102
      uError("%s: failed to create rpc thread(%s)", label, strerror(errno));
J
Jeff Tao 已提交
103
      taosCleanUpScheduler(pSched);
D
dapan1121 已提交
104 105 106
      if (schedMalloced) {
        taosMemoryFree(pSched);
      }
J
Jeff Tao 已提交
107
      return NULL;
H
hzcheng 已提交
108
    }
109
    ++pSched->numOfThreads;
H
hzcheng 已提交
110 111
  }

112
  uDebug("%s scheduler is initialized, numOfThreads:%d", label, pSched->numOfThreads);
H
hzcheng 已提交
113 114 115 116

  return (void *)pSched;
}

S
Shengliang Guan 已提交
117
void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) {
D
dapan1121 已提交
118
  SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label, NULL);
119

120 121 122 123
  if (tmrCtrl != NULL && pSched != NULL) {
    pSched->pTmrCtrl = tmrCtrl;
    taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
  }
124

125 126 127
  return pSched;
}

128
void *taosProcessSchedQueue(void *scheduler) {
H
hzcheng 已提交
129
  SSchedMsg    msg;
130
  SSchedQueue *pSched = (SSchedQueue *)scheduler;
S
Shengliang Guan 已提交
131
  int32_t      ret = 0;
H
hzcheng 已提交
132

H
Haojun Liao 已提交
133 134 135
  char name[16] = {0};
  snprintf(name, tListLen(name), "%s-taskQ", pSched->label);
  setThreadName(name);
136

H
hzcheng 已提交
137
  while (1) {
138 139
    if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
      uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
D
dapan1121 已提交
140
      ASSERT(0);
H
hzcheng 已提交
141
    }
D
dapan1121 已提交
142
    if (atomic_load_8(&pSched->stop)) {
143 144
      break;
    }
H
hzcheng 已提交
145

wafwerar's avatar
wafwerar 已提交
146
    if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) {
147
      uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
D
dapan1121 已提交
148
      ASSERT(0);
149
    }
H
hzcheng 已提交
150 151 152 153 154

    msg = pSched->queue[pSched->fullSlot];
    memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
    pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;

wafwerar's avatar
wafwerar 已提交
155
    if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) {
156
      uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
D
dapan1121 已提交
157
      ASSERT(0);
158
    }
H
hzcheng 已提交
159

160 161
    if ((ret = tsem_post(&pSched->emptySem)) != 0) {
      uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno));
D
dapan1121 已提交
162
      ASSERT(0);
163
    }
H
hzcheng 已提交
164 165 166 167 168 169

    if (msg.fp)
      (*(msg.fp))(&msg);
    else if (msg.tfp)
      (*(msg.tfp))(msg.ahandle, msg.thandle);
  }
S
slguan 已提交
170 171

  return NULL;
H
hzcheng 已提交
172 173
}

dengyihao's avatar
dengyihao 已提交
174
int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
175
  SSchedQueue *pSched = (SSchedQueue *)queueScheduler;
S
Shengliang Guan 已提交
176
  int32_t      ret = 0;
177

H
hzcheng 已提交
178
  if (pSched == NULL) {
S
slguan 已提交
179
    uError("sched is not ready, msg:%p is dropped", pMsg);
dengyihao's avatar
dengyihao 已提交
180
    return -1;
H
hzcheng 已提交
181 182
  }

D
dapan1121 已提交
183 184
  if (atomic_load_8(&pSched->stop)) {
    uError("sched is already stopped, msg:%p is dropped", pMsg);
dengyihao's avatar
dengyihao 已提交
185
    return -1;
D
dapan1121 已提交
186 187
  }

188 189
  if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
    uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
D
dapan1121 已提交
190
    ASSERT(0);
191
  }
H
hzcheng 已提交
192

wafwerar's avatar
wafwerar 已提交
193
  if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) {
194
    uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
D
dapan1121 已提交
195
    ASSERT(0);
196
  }
H
hzcheng 已提交
197 198 199 200

  pSched->queue[pSched->emptySlot] = *pMsg;
  pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;

wafwerar's avatar
wafwerar 已提交
201
  if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) {
202
    uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
D
dapan1121 已提交
203
    ASSERT(0);
204
  }
H
hzcheng 已提交
205

206 207
  if ((ret = tsem_post(&pSched->fullSem)) != 0) {
    uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
D
dapan1121 已提交
208
    ASSERT(0);
209
  }
dengyihao's avatar
dengyihao 已提交
210
  return ret;
H
hzcheng 已提交
211 212 213 214 215 216
}

void taosCleanUpScheduler(void *param) {
  SSchedQueue *pSched = (SSchedQueue *)param;
  if (pSched == NULL) return;

D
dapan1121 已提交
217
  uDebug("start to cleanup %s schedQsueue", pSched->label);
dengyihao's avatar
dengyihao 已提交
218

D
dapan1121 已提交
219 220 221
  atomic_store_8(&pSched->stop, 1);

  taosMsleep(200);
dengyihao's avatar
dengyihao 已提交
222

S
Shengliang Guan 已提交
223 224
  for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
    if (taosCheckPthreadValid(pSched->qthread[i])) {
225 226
      tsem_post(&pSched->fullSem);
    }
227
  }
S
Shengliang Guan 已提交
228
  for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
S
TD-1037  
Shengliang Guan 已提交
229
    if (taosCheckPthreadValid(pSched->qthread[i])) {
wafwerar's avatar
wafwerar 已提交
230
      taosThreadJoin(pSched->qthread[i], NULL);
231
      taosThreadClear(&pSched->qthread[i]);
232
    }
H
hzcheng 已提交
233 234
  }

S
slguan 已提交
235 236
  tsem_destroy(&pSched->emptySem);
  tsem_destroy(&pSched->fullSem);
wafwerar's avatar
wafwerar 已提交
237
  taosThreadMutexDestroy(&pSched->queueMutex);
S
Shengliang Guan 已提交
238

239
  if (pSched->pTimer) {
L
Liu Jicong 已提交
240 241
    taosTmrStop(pSched->pTimer);
    pSched->pTimer = NULL;
242
  }
H
hzcheng 已提交
243

wafwerar's avatar
wafwerar 已提交
244 245
  if (pSched->queue) taosMemoryFree(pSched->queue);
  if (pSched->qthread) taosMemoryFree(pSched->qthread);
dengyihao's avatar
dengyihao 已提交
246
  // taosMemoryFree(pSched);
H
hzcheng 已提交
247
}
248 249 250 251 252 253 254

// for debug purpose, dump the scheduler status every 1min.
void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
  SSchedQueue *pSched = (SSchedQueue *)qhandle;
  if (pSched == NULL || pSched->pTimer == NULL || pSched->pTimer != tmrId) {
    return;
  }
S
Shengliang Guan 已提交
255

256 257
  int32_t size = ((pSched->emptySlot - pSched->fullSlot) + pSched->queueSize) % pSched->queueSize;
  if (size > 0) {
258
    uDebug("scheduler:%s, current tasks in queue:%d, task thread:%d", pSched->label, size, pSched->numOfThreads);
259
  }
S
Shengliang Guan 已提交
260

261
  taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
262
}