ttimer.c 19.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
timer  
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
dengyihao's avatar
fix bug  
dengyihao 已提交
17 18
#include "ttimer.h"
#include "taoserror.h"
H
hzcheng 已提交
19 20
#include "tlog.h"
#include "tsched.h"
21
#include "tdef.h"
H
hzcheng 已提交
22

23 24 25 26 27
#define tmrFatal(...)                                                     \
  {                                                                       \
    if (tmrDebugFlag & DEBUG_FATAL) {                                     \
      taosPrintLog("TMR FATAL ", DEBUG_FATAL, tmrDebugFlag, __VA_ARGS__); \
    }                                                                     \
dengyihao's avatar
fix bug  
dengyihao 已提交
28
  }
29 30 31 32 33
#define tmrError(...)                                                     \
  {                                                                       \
    if (tmrDebugFlag & DEBUG_ERROR) {                                     \
      taosPrintLog("TMR ERROR ", DEBUG_ERROR, tmrDebugFlag, __VA_ARGS__); \
    }                                                                     \
dengyihao's avatar
fix bug  
dengyihao 已提交
34
  }
35 36 37 38 39
#define tmrWarn(...)                                                    \
  {                                                                     \
    if (tmrDebugFlag & DEBUG_WARN) {                                    \
      taosPrintLog("TMR WARN ", DEBUG_WARN, tmrDebugFlag, __VA_ARGS__); \
    }                                                                   \
dengyihao's avatar
fix bug  
dengyihao 已提交
40
  }
41 42 43 44 45
#define tmrInfo(...)                                               \
  {                                                                \
    if (tmrDebugFlag & DEBUG_INFO) {                               \
      taosPrintLog("TMR ", DEBUG_INFO, tmrDebugFlag, __VA_ARGS__); \
    }                                                              \
dengyihao's avatar
fix bug  
dengyihao 已提交
46
  }
47 48 49 50 51
#define tmrDebug(...)                                               \
  {                                                                 \
    if (tmrDebugFlag & DEBUG_DEBUG) {                               \
      taosPrintLog("TMR ", DEBUG_DEBUG, tmrDebugFlag, __VA_ARGS__); \
    }                                                               \
dengyihao's avatar
fix bug  
dengyihao 已提交
52
  }
53 54 55 56 57
#define tmrTrace(...)                                               \
  {                                                                 \
    if (tmrDebugFlag & DEBUG_TRACE) {                               \
      taosPrintLog("TMR ", DEBUG_TRACE, tmrDebugFlag, __VA_ARGS__); \
    }                                                               \
dengyihao's avatar
fix bug  
dengyihao 已提交
58
  }
H
hzcheng 已提交
59

S
timer  
Shengliang Guan 已提交
60 61 62
#define TIMER_STATE_WAITING  0
#define TIMER_STATE_EXPIRED  1
#define TIMER_STATE_STOPPED  2
weixin_48148422's avatar
weixin_48148422 已提交
63 64 65 66 67 68 69 70 71 72
#define TIMER_STATE_CANCELED 3

typedef union _tmr_ctrl_t {
  char label[16];
  struct {
    // pad to ensure 'next' is the end of this union
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
    union _tmr_ctrl_t* next;
  };
} tmr_ctrl_t;
H
hzcheng 已提交
73

weixin_48148422's avatar
weixin_48148422 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86
typedef struct tmr_obj_t {
  uintptr_t         id;
  tmr_ctrl_t*       ctrl;
  struct tmr_obj_t* mnext;
  struct tmr_obj_t* prev;
  struct tmr_obj_t* next;
  uint16_t          slot;
  uint8_t           wheel;
  uint8_t           state;
  uint8_t           refCount;
  uint8_t           reserved1;
  uint16_t          reserved2;
  union {
87 88
    int64_t expireAt;
    int64_t executedBy;
weixin_48148422's avatar
weixin_48148422 已提交
89 90 91 92
  };
  TAOS_TMR_CALLBACK fp;
  void*             param;
} tmr_obj_t;
H
hzcheng 已提交
93

weixin_48148422's avatar
weixin_48148422 已提交
94
typedef struct timer_list_t {
95
  int64_t    lockedBy;
weixin_48148422's avatar
weixin_48148422 已提交
96 97 98 99 100 101 102 103 104 105
  tmr_obj_t* timers;
} timer_list_t;

typedef struct timer_map_t {
  uint32_t      size;
  uint32_t      count;
  timer_list_t* slots;
} timer_map_t;

typedef struct time_wheel_t {
wafwerar's avatar
wafwerar 已提交
106
  TdThreadMutex mutex;
weixin_48148422's avatar
weixin_48148422 已提交
107 108 109 110 111 112 113
  int64_t         nextScanAt;
  uint32_t        resolution;
  uint16_t        size;
  uint16_t        index;
  tmr_obj_t**     slots;
} time_wheel_t;

114
static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100;
weixin_48148422's avatar
weixin_48148422 已提交
115

wafwerar's avatar
wafwerar 已提交
116 117
static TdThreadOnce  tmrModuleInit = PTHREAD_ONCE_INIT;
static TdThreadMutex tmrCtrlMutex;
118
static tmr_ctrl_t*     tmrCtrls;
weixin_48148422's avatar
weixin_48148422 已提交
119
static tmr_ctrl_t*     unusedTmrCtrl = NULL;
120
static void*           tmrQhandle;
S
timer  
Shengliang Guan 已提交
121
static int32_t         numOfTmrCtrl = 0;
122

S
timer  
Shengliang Guan 已提交
123
int32_t          taosTmrThreads = 1;
weixin_48148422's avatar
weixin_48148422 已提交
124 125 126 127 128 129 130 131 132 133 134 135
static uintptr_t nextTimerId = 0;

static time_wheel_t wheels[] = {
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
    {.resolution = 1000, .size = 1024},
    {.resolution = 60000, .size = 1024},
};
static timer_map_t timerMap;

static uintptr_t getNextTimerId() {
  uintptr_t id;
  do {
wafwerar's avatar
wafwerar 已提交
136
    id = (uintptr_t)atomic_add_fetch_ptr((void **)&nextTimerId, 1);
weixin_48148422's avatar
weixin_48148422 已提交
137 138
  } while (id == 0);
  return id;
H
hzcheng 已提交
139 140
}

weixin_48148422's avatar
weixin_48148422 已提交
141
static void timerAddRef(tmr_obj_t* timer) { atomic_add_fetch_8(&timer->refCount, 1); }
H
hzcheng 已提交
142

weixin_48148422's avatar
weixin_48148422 已提交
143
static void timerDecRef(tmr_obj_t* timer) {
weixin_48148422's avatar
weixin_48148422 已提交
144
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
wafwerar's avatar
wafwerar 已提交
145
    taosMemoryFree(timer);
H
hzcheng 已提交
146
  }
weixin_48148422's avatar
weixin_48148422 已提交
147
}
H
hzcheng 已提交
148

weixin_48148422's avatar
weixin_48148422 已提交
149
static void lockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
150
  int64_t tid = taosGetSelfPthreadId();
S
timer  
Shengliang Guan 已提交
151
  int32_t i = 0;
weixin_48148422's avatar
weixin_48148422 已提交
152
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
weixin_48148422's avatar
weixin_48148422 已提交
153 154 155
    if (++i % 1000 == 0) {
      sched_yield();
    }
H
hzcheng 已提交
156
  }
weixin_48148422's avatar
weixin_48148422 已提交
157
}
H
hzcheng 已提交
158

weixin_48148422's avatar
weixin_48148422 已提交
159
static void unlockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
160
  int64_t tid = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
161
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
weixin_48148422's avatar
weixin_48148422 已提交
162
    assert(false);
H
Hui Li 已提交
163
    tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
H
hzcheng 已提交
164 165 166
  }
}

weixin_48148422's avatar
weixin_48148422 已提交
167 168 169
static void addTimer(tmr_obj_t* timer) {
  timerAddRef(timer);
  timer->wheel = tListLen(wheels);
H
hzcheng 已提交
170

weixin_48148422's avatar
weixin_48148422 已提交
171 172
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
H
hzcheng 已提交
173

weixin_48148422's avatar
weixin_48148422 已提交
174 175 176 177 178
  lockTimerList(list);
  timer->mnext = list->timers;
  list->timers = timer;
  unlockTimerList(list);
}
H
hzcheng 已提交
179

weixin_48148422's avatar
weixin_48148422 已提交
180 181 182 183 184 185 186 187 188 189
static tmr_obj_t* findTimer(uintptr_t id) {
  tmr_obj_t* timer = NULL;
  if (id > 0) {
    uint32_t      idx = (uint32_t)(id % timerMap.size);
    timer_list_t* list = timerMap.slots + idx;
    lockTimerList(list);
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
      if (timer->id == id) {
        timerAddRef(timer);
        break;
H
hzcheng 已提交
190 191
      }
    }
weixin_48148422's avatar
weixin_48148422 已提交
192
    unlockTimerList(list);
H
hzcheng 已提交
193
  }
weixin_48148422's avatar
weixin_48148422 已提交
194
  return timer;
H
hzcheng 已提交
195 196
}

weixin_48148422's avatar
weixin_48148422 已提交
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
static void removeTimer(uintptr_t id) {
  tmr_obj_t*    prev = NULL;
  uint32_t      idx = (uint32_t)(id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
  lockTimerList(list);
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
    if (p->id == id) {
      if (prev == NULL) {
        list->timers = p->mnext;
      } else {
        prev->mnext = p->mnext;
      }
      timerDecRef(p);
      break;
    }
    prev = p;
H
hzcheng 已提交
213
  }
weixin_48148422's avatar
weixin_48148422 已提交
214 215
  unlockTimerList(list);
}
H
hzcheng 已提交
216

weixin_48148422's avatar
weixin_48148422 已提交
217 218 219 220 221 222 223 224 225
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
  timerAddRef(timer);
  // select a wheel for the timer, we are not an accurate timer,
  // but the inaccuracy should not be too large.
  timer->wheel = tListLen(wheels) - 1;
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (delay < wheel->resolution * wheel->size) {
      timer->wheel = i;
H
hzcheng 已提交
226 227 228 229
      break;
    }
  }

weixin_48148422's avatar
weixin_48148422 已提交
230 231
  time_wheel_t* wheel = wheels + timer->wheel;
  timer->prev = NULL;
232
  timer->expireAt = taosGetMonotonicMs() + delay;
H
hzcheng 已提交
233

wafwerar's avatar
wafwerar 已提交
234
  taosThreadMutexLock(&wheel->mutex);
H
hzcheng 已提交
235

weixin_48148422's avatar
weixin_48148422 已提交
236 237 238 239 240 241
  uint32_t idx = 0;
  if (timer->expireAt > wheel->nextScanAt) {
    // adjust delay according to next scan time of this wheel
    // so that the timer is not fired earlier than desired.
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
H
hzcheng 已提交
242 243
  }

weixin_48148422's avatar
weixin_48148422 已提交
244 245 246 247 248 249 250
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
  tmr_obj_t* p = wheel->slots[timer->slot];
  wheel->slots[timer->slot] = timer;
  timer->next = p;
  if (p != NULL) {
    p->prev = timer;
  }
H
hzcheng 已提交
251

wafwerar's avatar
wafwerar 已提交
252
  taosThreadMutexUnlock(&wheel->mutex);
H
hzcheng 已提交
253 254
}

weixin_48148422's avatar
weixin_48148422 已提交
255
static bool removeFromWheel(tmr_obj_t* timer) {
256 257
  uint8_t wheelIdx = timer->wheel;
  if (wheelIdx >= tListLen(wheels)) {
weixin_48148422's avatar
weixin_48148422 已提交
258 259
    return false;
  }
260
  time_wheel_t* wheel = wheels + wheelIdx;
weixin_48148422's avatar
weixin_48148422 已提交
261 262

  bool removed = false;
wafwerar's avatar
wafwerar 已提交
263
  taosThreadMutexLock(&wheel->mutex);
weixin_48148422's avatar
weixin_48148422 已提交
264 265 266 267
  // other thread may modify timer->wheel, check again.
  if (timer->wheel < tListLen(wheels)) {
    if (timer->prev != NULL) {
      timer->prev->next = timer->next;
S
slguan 已提交
268
    }
weixin_48148422's avatar
weixin_48148422 已提交
269 270
    if (timer->next != NULL) {
      timer->next->prev = timer->prev;
S
slguan 已提交
271
    }
weixin_48148422's avatar
weixin_48148422 已提交
272 273 274 275 276 277 278 279
    if (timer == wheel->slots[timer->slot]) {
      wheel->slots[timer->slot] = timer->next;
    }
    timer->wheel = tListLen(wheels);
    timer->next = NULL;
    timer->prev = NULL;
    timerDecRef(timer);
    removed = true;
S
slguan 已提交
280
  }
wafwerar's avatar
wafwerar 已提交
281
  taosThreadMutexUnlock(&wheel->mutex);
S
slguan 已提交
282

weixin_48148422's avatar
weixin_48148422 已提交
283
  return removed;
S
slguan 已提交
284 285
}

weixin_48148422's avatar
weixin_48148422 已提交
286 287
static void processExpiredTimer(void* handle, void* arg) {
  tmr_obj_t* timer = (tmr_obj_t*)handle;
S
TD-2616  
Shengliang Guan 已提交
288
  timer->executedBy = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
289
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
weixin_48148422's avatar
weixin_48148422 已提交
290
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
291
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
292
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
H
hzcheng 已提交
293

weixin_48148422's avatar
weixin_48148422 已提交
294 295
    (*timer->fp)(timer->param, (tmr_h)timer->id);
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
H
hzcheng 已提交
296

weixin_48148422's avatar
weixin_48148422 已提交
297
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
298
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
299 300 301 302
  }
  removeTimer(timer->id);
  timerDecRef(timer);
}
H
hzcheng 已提交
303

weixin_48148422's avatar
weixin_48148422 已提交
304
static void addToExpired(tmr_obj_t* head) {
weixin_48148422's avatar
weixin_48148422 已提交
305
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
H
hzcheng 已提交
306

weixin_48148422's avatar
weixin_48148422 已提交
307
  while (head != NULL) {
dengyihao's avatar
fix bug  
dengyihao 已提交
308
    uintptr_t  id = head->id;
weixin_48148422's avatar
weixin_48148422 已提交
309
    tmr_obj_t* next = head->next;
310
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
weixin_48148422's avatar
weixin_48148422 已提交
311

dengyihao's avatar
fix bug  
dengyihao 已提交
312
    SSchedMsg schedMsg;
weixin_48148422's avatar
weixin_48148422 已提交
313 314
    schedMsg.fp = NULL;
    schedMsg.tfp = processExpiredTimer;
H
Hui Li 已提交
315
    schedMsg.msg = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
316 317 318
    schedMsg.ahandle = head;
    schedMsg.thandle = NULL;
    taosScheduleTask(tmrQhandle, &schedMsg);
weixin_48148422's avatar
weixin_48148422 已提交
319

320
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
weixin_48148422's avatar
weixin_48148422 已提交
321 322 323
    head = next;
  }
}
H
hzcheng 已提交
324

S
timer  
Shengliang Guan 已提交
325
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl) {
weixin_48148422's avatar
weixin_48148422 已提交
326 327 328 329 330 331 332 333
  uintptr_t id = getNextTimerId();
  timer->id = id;
  timer->state = TIMER_STATE_WAITING;
  timer->fp = fp;
  timer->param = param;
  timer->ctrl = ctrl;
  addTimer(timer);

weixin_48148422's avatar
weixin_48148422 已提交
334
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
335
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
336 337 338 339 340

  if (mseconds == 0) {
    timer->wheel = tListLen(wheels);
    timerAddRef(timer);
    addToExpired(timer);
H
hzcheng 已提交
341
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
342
    addToWheel(timer, mseconds);
H
hzcheng 已提交
343 344
  }

weixin_48148422's avatar
weixin_48148422 已提交
345 346
  // note: use `timer->id` here is unsafe as `timer` may already be freed
  return id;
H
hzcheng 已提交
347 348
}

S
timer  
Shengliang Guan 已提交
349
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
weixin_48148422's avatar
weixin_48148422 已提交
350 351 352 353
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return NULL;
  }
H
hzcheng 已提交
354

wafwerar's avatar
wafwerar 已提交
355
  tmr_obj_t* timer = (tmr_obj_t*)taosMemoryCalloc(1, sizeof(tmr_obj_t));
weixin_48148422's avatar
weixin_48148422 已提交
356
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
357
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
358 359
    return NULL;
  }
H
hzcheng 已提交
360

weixin_48148422's avatar
weixin_48148422 已提交
361 362
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
}
H
hzcheng 已提交
363

S
timer  
Shengliang Guan 已提交
364
static void taosTimerLoopFunc(int32_t signo) {
365
  int64_t now = taosGetMonotonicMs();
weixin_48148422's avatar
weixin_48148422 已提交
366

S
timer  
Shengliang Guan 已提交
367
  for (int32_t i = 0; i < tListLen(wheels); i++) {
weixin_48148422's avatar
weixin_48148422 已提交
368 369 370 371 372 373 374 375
    // `expried` is a temporary expire list.
    // expired timers are first add to this list, then move
    // to expired queue as a batch to improve performance.
    // note this list is used as a stack in this function.
    tmr_obj_t* expired = NULL;

    time_wheel_t* wheel = wheels + i;
    while (now >= wheel->nextScanAt) {
wafwerar's avatar
wafwerar 已提交
376
      taosThreadMutexLock(&wheel->mutex);
weixin_48148422's avatar
weixin_48148422 已提交
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
      wheel->index = (wheel->index + 1) % wheel->size;
      tmr_obj_t* timer = wheel->slots[wheel->index];
      while (timer != NULL) {
        tmr_obj_t* next = timer->next;
        if (now < timer->expireAt) {
          timer = next;
          continue;
        }

        // remove from the wheel
        if (timer->prev == NULL) {
          wheel->slots[wheel->index] = next;
          if (next != NULL) {
            next->prev = NULL;
          }
        } else {
          timer->prev->next = next;
          if (next != NULL) {
            next->prev = timer->prev;
          }
        }
        timer->wheel = tListLen(wheels);

        // add to temporary expire list
        timer->next = expired;
        timer->prev = NULL;
        if (expired != NULL) {
          expired->prev = timer;
        }
        expired = timer;

        timer = next;
      }
      wheel->nextScanAt += wheel->resolution;
wafwerar's avatar
wafwerar 已提交
411
      taosThreadMutexUnlock(&wheel->mutex);
H
hzcheng 已提交
412 413
    }

weixin_48148422's avatar
weixin_48148422 已提交
414
    addToExpired(expired);
H
hzcheng 已提交
415
  }
weixin_48148422's avatar
weixin_48148422 已提交
416
}
H
hzcheng 已提交
417

weixin_48148422's avatar
weixin_48148422 已提交
418 419
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
420
    bool reusable = false;
weixin_48148422's avatar
weixin_48148422 已提交
421 422 423 424 425 426
    if (removeFromWheel(timer)) {
      removeTimer(timer->id);
      // only safe to reuse the timer when timer is removed from the wheel.
      // we cannot guarantee the thread safety of the timr in all other cases.
      reusable = true;
    }
weixin_48148422's avatar
weixin_48148422 已提交
427
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
428
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
429 430
    return reusable;
  }
431

weixin_48148422's avatar
weixin_48148422 已提交
432
  if (state != TIMER_STATE_EXPIRED) {
weixin_48148422's avatar
weixin_48148422 已提交
433
    // timer already stopped or cancelled, has nothing to do in this case
weixin_48148422's avatar
weixin_48148422 已提交
434 435
    return false;
  }
436

S
TD-2616  
Shengliang Guan 已提交
437
  if (timer->executedBy == taosGetSelfPthreadId()) {
weixin_48148422's avatar
weixin_48148422 已提交
438 439 440
    // taosTmrReset is called in the timer callback, should do nothing in this
    // case to avoid dead lock. note taosTmrReset must be the last statement
    // of the callback funtion, will be a bug otherwise.
weixin_48148422's avatar
weixin_48148422 已提交
441
    return false;
H
hzcheng 已提交
442
  }
443

weixin_48148422's avatar
weixin_48148422 已提交
444 445 446
  // timer callback is executing in another thread, we SHOULD wait it to stop,
  // BUT this may result in dead lock if current thread are holding a lock which
  // the timer callback need to acquire. so, we HAVE TO return directly.
weixin_48148422's avatar
weixin_48148422 已提交
447
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
448
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
449
  return false;
weixin_48148422's avatar
weixin_48148422 已提交
450
}
H
hzcheng 已提交
451

weixin_48148422's avatar
weixin_48148422 已提交
452 453
bool taosTmrStop(tmr_h timerId) {
  uintptr_t id = (uintptr_t)timerId;
H
hzcheng 已提交
454

weixin_48148422's avatar
weixin_48148422 已提交
455 456
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
457
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
weixin_48148422's avatar
weixin_48148422 已提交
458 459
    return false;
  }
H
hzcheng 已提交
460

weixin_48148422's avatar
weixin_48148422 已提交
461
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
462 463
  doStopTimer(timer, state);
  timerDecRef(timer);
H
hzcheng 已提交
464

weixin_48148422's avatar
weixin_48148422 已提交
465
  return state == TIMER_STATE_WAITING;
H
hzcheng 已提交
466 467
}

weixin_48148422's avatar
weixin_48148422 已提交
468 469 470 471
bool taosTmrStopA(tmr_h* timerId) {
  bool ret = taosTmrStop(*timerId);
  *timerId = NULL;
  return ret;
H
hzcheng 已提交
472 473
}

S
timer  
Shengliang Guan 已提交
474
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
weixin_48148422's avatar
weixin_48148422 已提交
475 476
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
S
TD-1037  
Shengliang Guan 已提交
477
    return false;
H
hzcheng 已提交
478 479
  }

weixin_48148422's avatar
weixin_48148422 已提交
480 481 482 483
  uintptr_t  id = (uintptr_t)*pTmrId;
  bool       stopped = false;
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
484
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
H
hzcheng 已提交
485
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
486
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
487 488 489 490 491
    if (!doStopTimer(timer, state)) {
      timerDecRef(timer);
      timer = NULL;
    }
    stopped = state == TIMER_STATE_WAITING;
H
hzcheng 已提交
492 493
  }

weixin_48148422's avatar
weixin_48148422 已提交
494 495 496
  if (timer == NULL) {
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
    return stopped;
H
hzcheng 已提交
497 498
  }

499
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
weixin_48148422's avatar
weixin_48148422 已提交
500 501 502

  // wait until there's no other reference to this timer,
  // so that we can reuse this timer safely.
S
timer  
Shengliang Guan 已提交
503
  for (int32_t i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
weixin_48148422's avatar
weixin_48148422 已提交
504 505 506 507
    if (i % 1000 == 0) {
      sched_yield();
    }
  }
H
hzcheng 已提交
508

weixin_48148422's avatar
weixin_48148422 已提交
509 510 511
  assert(timer->refCount == 1);
  memset(timer, 0, sizeof(*timer));
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
H
hzcheng 已提交
512

weixin_48148422's avatar
weixin_48148422 已提交
513
  return stopped;
H
hzcheng 已提交
514 515
}

weixin_48148422's avatar
weixin_48148422 已提交
516
static void taosTmrModuleInit(void) {
wafwerar's avatar
wafwerar 已提交
517
  tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
518 519 520 521 522
  if (tmrCtrls == NULL) {
    tmrError("failed to allocate memory for timer controllers.");
    return;
  }

dengyihao's avatar
fix bug  
dengyihao 已提交
523 524
  memset(&timerMap, 0, sizeof(timerMap));

S
TD-1037  
Shengliang Guan 已提交
525
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
weixin_48148422's avatar
weixin_48148422 已提交
526 527 528
    tmr_ctrl_t* ctrl = tmrCtrls + i;
    ctrl->next = ctrl + 1;
  }
S
Shengliang Guan 已提交
529
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
530
  unusedTmrCtrl = tmrCtrls;
H
hzcheng 已提交
531

wafwerar's avatar
wafwerar 已提交
532
  taosThreadMutexInit(&tmrCtrlMutex, NULL);
weixin_48148422's avatar
weixin_48148422 已提交
533

534
  int64_t now = taosGetMonotonicMs();
S
timer  
Shengliang Guan 已提交
535
  for (int32_t i = 0; i < tListLen(wheels); i++) {
weixin_48148422's avatar
weixin_48148422 已提交
536
    time_wheel_t* wheel = wheels + i;
wafwerar's avatar
wafwerar 已提交
537
    if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) {
weixin_48148422's avatar
weixin_48148422 已提交
538 539 540 541 542
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
      return;
    }
    wheel->nextScanAt = now + wheel->resolution;
    wheel->index = 0;
wafwerar's avatar
wafwerar 已提交
543
    wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*));
weixin_48148422's avatar
weixin_48148422 已提交
544 545 546 547 548
    if (wheel->slots == NULL) {
      tmrError("failed to allocate wheel slots");
      return;
    }
    timerMap.size += wheel->size;
H
hzcheng 已提交
549 550
  }

weixin_48148422's avatar
weixin_48148422 已提交
551
  timerMap.count = 0;
wafwerar's avatar
wafwerar 已提交
552
  timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t));
weixin_48148422's avatar
weixin_48148422 已提交
553 554 555 556
  if (timerMap.slots == NULL) {
    tmrError("failed to allocate hash map");
    return;
  }
H
hzcheng 已提交
557

D
dapan1121 已提交
558
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
weixin_48148422's avatar
weixin_48148422 已提交
559
  taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
H
hzcheng 已提交
560

561
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
weixin_48148422's avatar
weixin_48148422 已提交
562
}
H
hzcheng 已提交
563

S
timer  
Shengliang Guan 已提交
564
void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) {
565 566 567
  const char* ret = taosMonotonicInit();
  tmrDebug("ttimer monotonic clock source:%s", ret);

wafwerar's avatar
wafwerar 已提交
568
  taosThreadOnce(&tmrModuleInit, taosTmrModuleInit);
H
hzcheng 已提交
569

wafwerar's avatar
wafwerar 已提交
570
  taosThreadMutexLock(&tmrCtrlMutex);
weixin_48148422's avatar
weixin_48148422 已提交
571 572 573
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
  if (ctrl != NULL) {
    unusedTmrCtrl = ctrl->next;
574
    numOfTmrCtrl++;
H
hzcheng 已提交
575
  }
wafwerar's avatar
wafwerar 已提交
576
  taosThreadMutexUnlock(&tmrCtrlMutex);
weixin_48148422's avatar
weixin_48148422 已提交
577 578

  if (ctrl == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
579
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
S
Shengliang Guan 已提交
580
    terrno = TSDB_CODE_OUT_OF_MEMORY;
weixin_48148422's avatar
weixin_48148422 已提交
581 582 583
    return NULL;
  }

B
Bomin Zhang 已提交
584
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
585
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
weixin_48148422's avatar
weixin_48148422 已提交
586
  return ctrl;
H
hzcheng 已提交
587 588
}

weixin_48148422's avatar
weixin_48148422 已提交
589 590
void taosTmrCleanUp(void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
weixin_48148422's avatar
weixin_48148422 已提交
591 592 593
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return;
  }
weixin_48148422's avatar
weixin_48148422 已提交
594

595
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
596
  ctrl->label[0] = 0;
H
hzcheng 已提交
597

wafwerar's avatar
wafwerar 已提交
598
  taosThreadMutexLock(&tmrCtrlMutex);
weixin_48148422's avatar
weixin_48148422 已提交
599
  ctrl->next = unusedTmrCtrl;
600
  numOfTmrCtrl--;
weixin_48148422's avatar
weixin_48148422 已提交
601
  unusedTmrCtrl = ctrl;
wafwerar's avatar
wafwerar 已提交
602
  taosThreadMutexUnlock(&tmrCtrlMutex);
603

dengyihao's avatar
fix bug  
dengyihao 已提交
604 605
  tmrDebug("time controller's tmr ctrl size:  %d", numOfTmrCtrl);
  if (numOfTmrCtrl <= 0) {
weixin_48148422's avatar
weixin_48148422 已提交
606
    taosUninitTimer();
607

weixin_48148422's avatar
weixin_48148422 已提交
608 609
    taosCleanUpScheduler(tmrQhandle);

S
timer  
Shengliang Guan 已提交
610
    for (int32_t i = 0; i < tListLen(wheels); i++) {
611
      time_wheel_t* wheel = wheels + i;
wafwerar's avatar
wafwerar 已提交
612
      taosThreadMutexDestroy(&wheel->mutex);
wafwerar's avatar
wafwerar 已提交
613
      taosMemoryFree(wheel->slots);
614 615
    }

wafwerar's avatar
wafwerar 已提交
616
    taosThreadMutexDestroy(&tmrCtrlMutex);
weixin_48148422's avatar
weixin_48148422 已提交
617 618 619

    for (size_t i = 0; i < timerMap.size; i++) {
      timer_list_t* list = timerMap.slots + i;
dengyihao's avatar
fix bug  
dengyihao 已提交
620
      tmr_obj_t*    t = list->timers;
weixin_48148422's avatar
weixin_48148422 已提交
621 622
      while (t != NULL) {
        tmr_obj_t* next = t->mnext;
wafwerar's avatar
wafwerar 已提交
623
        taosMemoryFree(t);
weixin_48148422's avatar
weixin_48148422 已提交
624 625 626
        t = next;
      }
    }
wafwerar's avatar
wafwerar 已提交
627 628
    taosMemoryFree(timerMap.slots);
    taosMemoryFree(tmrCtrls);
629

dengyihao's avatar
fix bug  
dengyihao 已提交
630 631
    tmrCtrls = NULL;
    unusedTmrCtrl = NULL;
632
#if defined(LINUX)
dengyihao's avatar
fix bug  
dengyihao 已提交
633
    tmrModuleInit = PTHREAD_ONCE_INIT;  // to support restart
634
#endif
635
  }
H
hzcheng 已提交
636
}