ttimer.c 19.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
timer  
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
dengyihao's avatar
fix bug  
dengyihao 已提交
17 18
#include "ttimer.h"
#include "taoserror.h"
H
hzcheng 已提交
19 20 21
#include "tlog.h"
#include "tsched.h"

22 23 24 25 26
#define tmrFatal(...)                                                     \
  {                                                                       \
    if (tmrDebugFlag & DEBUG_FATAL) {                                     \
      taosPrintLog("TMR FATAL ", DEBUG_FATAL, tmrDebugFlag, __VA_ARGS__); \
    }                                                                     \
dengyihao's avatar
fix bug  
dengyihao 已提交
27
  }
28 29 30 31 32
#define tmrError(...)                                                     \
  {                                                                       \
    if (tmrDebugFlag & DEBUG_ERROR) {                                     \
      taosPrintLog("TMR ERROR ", DEBUG_ERROR, tmrDebugFlag, __VA_ARGS__); \
    }                                                                     \
dengyihao's avatar
fix bug  
dengyihao 已提交
33
  }
34 35 36 37 38
#define tmrWarn(...)                                                    \
  {                                                                     \
    if (tmrDebugFlag & DEBUG_WARN) {                                    \
      taosPrintLog("TMR WARN ", DEBUG_WARN, tmrDebugFlag, __VA_ARGS__); \
    }                                                                   \
dengyihao's avatar
fix bug  
dengyihao 已提交
39
  }
40 41 42 43 44
#define tmrInfo(...)                                               \
  {                                                                \
    if (tmrDebugFlag & DEBUG_INFO) {                               \
      taosPrintLog("TMR ", DEBUG_INFO, tmrDebugFlag, __VA_ARGS__); \
    }                                                              \
dengyihao's avatar
fix bug  
dengyihao 已提交
45
  }
46 47 48 49 50
#define tmrDebug(...)                                               \
  {                                                                 \
    if (tmrDebugFlag & DEBUG_DEBUG) {                               \
      taosPrintLog("TMR ", DEBUG_DEBUG, tmrDebugFlag, __VA_ARGS__); \
    }                                                               \
dengyihao's avatar
fix bug  
dengyihao 已提交
51
  }
52 53 54 55 56
#define tmrTrace(...)                                               \
  {                                                                 \
    if (tmrDebugFlag & DEBUG_TRACE) {                               \
      taosPrintLog("TMR ", DEBUG_TRACE, tmrDebugFlag, __VA_ARGS__); \
    }                                                               \
dengyihao's avatar
fix bug  
dengyihao 已提交
57
  }
H
hzcheng 已提交
58

S
timer  
Shengliang Guan 已提交
59 60 61
#define TIMER_STATE_WAITING  0
#define TIMER_STATE_EXPIRED  1
#define TIMER_STATE_STOPPED  2
weixin_48148422's avatar
weixin_48148422 已提交
62 63 64 65 66 67 68 69 70 71
#define TIMER_STATE_CANCELED 3

typedef union _tmr_ctrl_t {
  char label[16];
  struct {
    // pad to ensure 'next' is the end of this union
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
    union _tmr_ctrl_t* next;
  };
} tmr_ctrl_t;
H
hzcheng 已提交
72

weixin_48148422's avatar
weixin_48148422 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85
typedef struct tmr_obj_t {
  uintptr_t         id;
  tmr_ctrl_t*       ctrl;
  struct tmr_obj_t* mnext;
  struct tmr_obj_t* prev;
  struct tmr_obj_t* next;
  uint16_t          slot;
  uint8_t           wheel;
  uint8_t           state;
  uint8_t           refCount;
  uint8_t           reserved1;
  uint16_t          reserved2;
  union {
86 87
    int64_t expireAt;
    int64_t executedBy;
weixin_48148422's avatar
weixin_48148422 已提交
88 89 90 91
  };
  TAOS_TMR_CALLBACK fp;
  void*             param;
} tmr_obj_t;
H
hzcheng 已提交
92

weixin_48148422's avatar
weixin_48148422 已提交
93
typedef struct timer_list_t {
94
  int64_t    lockedBy;
weixin_48148422's avatar
weixin_48148422 已提交
95 96 97 98 99 100 101 102 103 104
  tmr_obj_t* timers;
} timer_list_t;

typedef struct timer_map_t {
  uint32_t      size;
  uint32_t      count;
  timer_list_t* slots;
} timer_map_t;

typedef struct time_wheel_t {
wafwerar's avatar
wafwerar 已提交
105
  TdThreadMutex mutex;
weixin_48148422's avatar
weixin_48148422 已提交
106 107 108 109 110 111 112
  int64_t         nextScanAt;
  uint32_t        resolution;
  uint16_t        size;
  uint16_t        index;
  tmr_obj_t**     slots;
} time_wheel_t;

113
static int32_t tsMaxTmrCtrl = 512;
weixin_48148422's avatar
weixin_48148422 已提交
114

wafwerar's avatar
wafwerar 已提交
115 116
static TdThreadOnce  tmrModuleInit = PTHREAD_ONCE_INIT;
static TdThreadMutex tmrCtrlMutex;
117
static tmr_ctrl_t*     tmrCtrls;
weixin_48148422's avatar
weixin_48148422 已提交
118
static tmr_ctrl_t*     unusedTmrCtrl = NULL;
119
static void*           tmrQhandle;
S
timer  
Shengliang Guan 已提交
120
static int32_t         numOfTmrCtrl = 0;
121

S
timer  
Shengliang Guan 已提交
122
int32_t          taosTmrThreads = 1;
weixin_48148422's avatar
weixin_48148422 已提交
123 124 125 126 127 128 129 130 131 132 133 134
static uintptr_t nextTimerId = 0;

static time_wheel_t wheels[] = {
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
    {.resolution = 1000, .size = 1024},
    {.resolution = 60000, .size = 1024},
};
static timer_map_t timerMap;

static uintptr_t getNextTimerId() {
  uintptr_t id;
  do {
wafwerar's avatar
wafwerar 已提交
135
    id = (uintptr_t)atomic_add_fetch_ptr((void **)&nextTimerId, (void*)1);
weixin_48148422's avatar
weixin_48148422 已提交
136 137
  } while (id == 0);
  return id;
H
hzcheng 已提交
138 139
}

weixin_48148422's avatar
weixin_48148422 已提交
140
static void timerAddRef(tmr_obj_t* timer) { atomic_add_fetch_8(&timer->refCount, 1); }
H
hzcheng 已提交
141

weixin_48148422's avatar
weixin_48148422 已提交
142
static void timerDecRef(tmr_obj_t* timer) {
weixin_48148422's avatar
weixin_48148422 已提交
143
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
wafwerar's avatar
wafwerar 已提交
144
    taosMemoryFree(timer);
H
hzcheng 已提交
145
  }
weixin_48148422's avatar
weixin_48148422 已提交
146
}
H
hzcheng 已提交
147

weixin_48148422's avatar
weixin_48148422 已提交
148
static void lockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
149
  int64_t tid = taosGetSelfPthreadId();
S
timer  
Shengliang Guan 已提交
150
  int32_t i = 0;
weixin_48148422's avatar
weixin_48148422 已提交
151
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
weixin_48148422's avatar
weixin_48148422 已提交
152 153 154
    if (++i % 1000 == 0) {
      sched_yield();
    }
H
hzcheng 已提交
155
  }
weixin_48148422's avatar
weixin_48148422 已提交
156
}
H
hzcheng 已提交
157

weixin_48148422's avatar
weixin_48148422 已提交
158
static void unlockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
159
  int64_t tid = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
160
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
weixin_48148422's avatar
weixin_48148422 已提交
161
    assert(false);
H
Hui Li 已提交
162
    tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
H
hzcheng 已提交
163 164 165
  }
}

weixin_48148422's avatar
weixin_48148422 已提交
166 167 168
static void addTimer(tmr_obj_t* timer) {
  timerAddRef(timer);
  timer->wheel = tListLen(wheels);
H
hzcheng 已提交
169

weixin_48148422's avatar
weixin_48148422 已提交
170 171
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
H
hzcheng 已提交
172

weixin_48148422's avatar
weixin_48148422 已提交
173 174 175 176 177
  lockTimerList(list);
  timer->mnext = list->timers;
  list->timers = timer;
  unlockTimerList(list);
}
H
hzcheng 已提交
178

weixin_48148422's avatar
weixin_48148422 已提交
179 180 181 182 183 184 185 186 187 188
static tmr_obj_t* findTimer(uintptr_t id) {
  tmr_obj_t* timer = NULL;
  if (id > 0) {
    uint32_t      idx = (uint32_t)(id % timerMap.size);
    timer_list_t* list = timerMap.slots + idx;
    lockTimerList(list);
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
      if (timer->id == id) {
        timerAddRef(timer);
        break;
H
hzcheng 已提交
189 190
      }
    }
weixin_48148422's avatar
weixin_48148422 已提交
191
    unlockTimerList(list);
H
hzcheng 已提交
192
  }
weixin_48148422's avatar
weixin_48148422 已提交
193
  return timer;
H
hzcheng 已提交
194 195
}

weixin_48148422's avatar
weixin_48148422 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
static void removeTimer(uintptr_t id) {
  tmr_obj_t*    prev = NULL;
  uint32_t      idx = (uint32_t)(id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
  lockTimerList(list);
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
    if (p->id == id) {
      if (prev == NULL) {
        list->timers = p->mnext;
      } else {
        prev->mnext = p->mnext;
      }
      timerDecRef(p);
      break;
    }
    prev = p;
H
hzcheng 已提交
212
  }
weixin_48148422's avatar
weixin_48148422 已提交
213 214
  unlockTimerList(list);
}
H
hzcheng 已提交
215

weixin_48148422's avatar
weixin_48148422 已提交
216 217 218 219 220 221 222 223 224
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
  timerAddRef(timer);
  // select a wheel for the timer, we are not an accurate timer,
  // but the inaccuracy should not be too large.
  timer->wheel = tListLen(wheels) - 1;
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (delay < wheel->resolution * wheel->size) {
      timer->wheel = i;
H
hzcheng 已提交
225 226 227 228
      break;
    }
  }

weixin_48148422's avatar
weixin_48148422 已提交
229 230
  time_wheel_t* wheel = wheels + timer->wheel;
  timer->prev = NULL;
231
  timer->expireAt = taosGetMonotonicMs() + delay;
H
hzcheng 已提交
232

wafwerar's avatar
wafwerar 已提交
233
  taosThreadMutexLock(&wheel->mutex);
H
hzcheng 已提交
234

weixin_48148422's avatar
weixin_48148422 已提交
235 236 237 238 239 240
  uint32_t idx = 0;
  if (timer->expireAt > wheel->nextScanAt) {
    // adjust delay according to next scan time of this wheel
    // so that the timer is not fired earlier than desired.
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
H
hzcheng 已提交
241 242
  }

weixin_48148422's avatar
weixin_48148422 已提交
243 244 245 246 247 248 249
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
  tmr_obj_t* p = wheel->slots[timer->slot];
  wheel->slots[timer->slot] = timer;
  timer->next = p;
  if (p != NULL) {
    p->prev = timer;
  }
H
hzcheng 已提交
250

wafwerar's avatar
wafwerar 已提交
251
  taosThreadMutexUnlock(&wheel->mutex);
H
hzcheng 已提交
252 253
}

weixin_48148422's avatar
weixin_48148422 已提交
254
static bool removeFromWheel(tmr_obj_t* timer) {
255 256
  uint8_t wheelIdx = timer->wheel;
  if (wheelIdx >= tListLen(wheels)) {
weixin_48148422's avatar
weixin_48148422 已提交
257 258
    return false;
  }
259
  time_wheel_t* wheel = wheels + wheelIdx;
weixin_48148422's avatar
weixin_48148422 已提交
260 261

  bool removed = false;
wafwerar's avatar
wafwerar 已提交
262
  taosThreadMutexLock(&wheel->mutex);
weixin_48148422's avatar
weixin_48148422 已提交
263 264 265 266
  // other thread may modify timer->wheel, check again.
  if (timer->wheel < tListLen(wheels)) {
    if (timer->prev != NULL) {
      timer->prev->next = timer->next;
S
slguan 已提交
267
    }
weixin_48148422's avatar
weixin_48148422 已提交
268 269
    if (timer->next != NULL) {
      timer->next->prev = timer->prev;
S
slguan 已提交
270
    }
weixin_48148422's avatar
weixin_48148422 已提交
271 272 273 274 275 276 277 278
    if (timer == wheel->slots[timer->slot]) {
      wheel->slots[timer->slot] = timer->next;
    }
    timer->wheel = tListLen(wheels);
    timer->next = NULL;
    timer->prev = NULL;
    timerDecRef(timer);
    removed = true;
S
slguan 已提交
279
  }
wafwerar's avatar
wafwerar 已提交
280
  taosThreadMutexUnlock(&wheel->mutex);
S
slguan 已提交
281

weixin_48148422's avatar
weixin_48148422 已提交
282
  return removed;
S
slguan 已提交
283 284
}

weixin_48148422's avatar
weixin_48148422 已提交
285 286
static void processExpiredTimer(void* handle, void* arg) {
  tmr_obj_t* timer = (tmr_obj_t*)handle;
S
TD-2616  
Shengliang Guan 已提交
287
  timer->executedBy = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
288
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
weixin_48148422's avatar
weixin_48148422 已提交
289
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
290
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
291
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
H
hzcheng 已提交
292

weixin_48148422's avatar
weixin_48148422 已提交
293 294
    (*timer->fp)(timer->param, (tmr_h)timer->id);
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
H
hzcheng 已提交
295

weixin_48148422's avatar
weixin_48148422 已提交
296
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
297
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
298 299 300 301
  }
  removeTimer(timer->id);
  timerDecRef(timer);
}
H
hzcheng 已提交
302

weixin_48148422's avatar
weixin_48148422 已提交
303
static void addToExpired(tmr_obj_t* head) {
weixin_48148422's avatar
weixin_48148422 已提交
304
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
H
hzcheng 已提交
305

weixin_48148422's avatar
weixin_48148422 已提交
306
  while (head != NULL) {
dengyihao's avatar
fix bug  
dengyihao 已提交
307
    uintptr_t  id = head->id;
weixin_48148422's avatar
weixin_48148422 已提交
308
    tmr_obj_t* next = head->next;
309
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
weixin_48148422's avatar
weixin_48148422 已提交
310

dengyihao's avatar
fix bug  
dengyihao 已提交
311
    SSchedMsg schedMsg;
weixin_48148422's avatar
weixin_48148422 已提交
312 313
    schedMsg.fp = NULL;
    schedMsg.tfp = processExpiredTimer;
H
Hui Li 已提交
314
    schedMsg.msg = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
315 316 317
    schedMsg.ahandle = head;
    schedMsg.thandle = NULL;
    taosScheduleTask(tmrQhandle, &schedMsg);
weixin_48148422's avatar
weixin_48148422 已提交
318

319
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
weixin_48148422's avatar
weixin_48148422 已提交
320 321 322
    head = next;
  }
}
H
hzcheng 已提交
323

S
timer  
Shengliang Guan 已提交
324
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl) {
weixin_48148422's avatar
weixin_48148422 已提交
325 326 327 328 329 330 331 332
  uintptr_t id = getNextTimerId();
  timer->id = id;
  timer->state = TIMER_STATE_WAITING;
  timer->fp = fp;
  timer->param = param;
  timer->ctrl = ctrl;
  addTimer(timer);

weixin_48148422's avatar
weixin_48148422 已提交
333
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
334
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
335 336 337 338 339

  if (mseconds == 0) {
    timer->wheel = tListLen(wheels);
    timerAddRef(timer);
    addToExpired(timer);
H
hzcheng 已提交
340
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
341
    addToWheel(timer, mseconds);
H
hzcheng 已提交
342 343
  }

weixin_48148422's avatar
weixin_48148422 已提交
344 345
  // note: use `timer->id` here is unsafe as `timer` may already be freed
  return id;
H
hzcheng 已提交
346 347
}

S
timer  
Shengliang Guan 已提交
348
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
weixin_48148422's avatar
weixin_48148422 已提交
349 350 351 352
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return NULL;
  }
H
hzcheng 已提交
353

wafwerar's avatar
wafwerar 已提交
354
  tmr_obj_t* timer = (tmr_obj_t*)taosMemoryCalloc(1, sizeof(tmr_obj_t));
weixin_48148422's avatar
weixin_48148422 已提交
355
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
356
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
357 358
    return NULL;
  }
H
hzcheng 已提交
359

weixin_48148422's avatar
weixin_48148422 已提交
360 361
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
}
H
hzcheng 已提交
362

S
timer  
Shengliang Guan 已提交
363
static void taosTimerLoopFunc(int32_t signo) {
364
  int64_t now = taosGetMonotonicMs();
weixin_48148422's avatar
weixin_48148422 已提交
365

S
timer  
Shengliang Guan 已提交
366
  for (int32_t i = 0; i < tListLen(wheels); i++) {
weixin_48148422's avatar
weixin_48148422 已提交
367 368 369 370 371 372 373 374
    // `expried` is a temporary expire list.
    // expired timers are first add to this list, then move
    // to expired queue as a batch to improve performance.
    // note this list is used as a stack in this function.
    tmr_obj_t* expired = NULL;

    time_wheel_t* wheel = wheels + i;
    while (now >= wheel->nextScanAt) {
wafwerar's avatar
wafwerar 已提交
375
      taosThreadMutexLock(&wheel->mutex);
weixin_48148422's avatar
weixin_48148422 已提交
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
      wheel->index = (wheel->index + 1) % wheel->size;
      tmr_obj_t* timer = wheel->slots[wheel->index];
      while (timer != NULL) {
        tmr_obj_t* next = timer->next;
        if (now < timer->expireAt) {
          timer = next;
          continue;
        }

        // remove from the wheel
        if (timer->prev == NULL) {
          wheel->slots[wheel->index] = next;
          if (next != NULL) {
            next->prev = NULL;
          }
        } else {
          timer->prev->next = next;
          if (next != NULL) {
            next->prev = timer->prev;
          }
        }
        timer->wheel = tListLen(wheels);

        // add to temporary expire list
        timer->next = expired;
        timer->prev = NULL;
        if (expired != NULL) {
          expired->prev = timer;
        }
        expired = timer;

        timer = next;
      }
      wheel->nextScanAt += wheel->resolution;
wafwerar's avatar
wafwerar 已提交
410
      taosThreadMutexUnlock(&wheel->mutex);
H
hzcheng 已提交
411 412
    }

weixin_48148422's avatar
weixin_48148422 已提交
413
    addToExpired(expired);
H
hzcheng 已提交
414
  }
weixin_48148422's avatar
weixin_48148422 已提交
415
}
H
hzcheng 已提交
416

weixin_48148422's avatar
weixin_48148422 已提交
417 418
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
419
    bool reusable = false;
weixin_48148422's avatar
weixin_48148422 已提交
420 421 422 423 424 425
    if (removeFromWheel(timer)) {
      removeTimer(timer->id);
      // only safe to reuse the timer when timer is removed from the wheel.
      // we cannot guarantee the thread safety of the timr in all other cases.
      reusable = true;
    }
weixin_48148422's avatar
weixin_48148422 已提交
426
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
427
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
428 429
    return reusable;
  }
430

weixin_48148422's avatar
weixin_48148422 已提交
431
  if (state != TIMER_STATE_EXPIRED) {
weixin_48148422's avatar
weixin_48148422 已提交
432
    // timer already stopped or cancelled, has nothing to do in this case
weixin_48148422's avatar
weixin_48148422 已提交
433 434
    return false;
  }
435

S
TD-2616  
Shengliang Guan 已提交
436
  if (timer->executedBy == taosGetSelfPthreadId()) {
weixin_48148422's avatar
weixin_48148422 已提交
437 438 439
    // taosTmrReset is called in the timer callback, should do nothing in this
    // case to avoid dead lock. note taosTmrReset must be the last statement
    // of the callback funtion, will be a bug otherwise.
weixin_48148422's avatar
weixin_48148422 已提交
440
    return false;
H
hzcheng 已提交
441
  }
442

weixin_48148422's avatar
weixin_48148422 已提交
443 444 445
  // timer callback is executing in another thread, we SHOULD wait it to stop,
  // BUT this may result in dead lock if current thread are holding a lock which
  // the timer callback need to acquire. so, we HAVE TO return directly.
weixin_48148422's avatar
weixin_48148422 已提交
446
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
447
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
448
  return false;
weixin_48148422's avatar
weixin_48148422 已提交
449
}
H
hzcheng 已提交
450

weixin_48148422's avatar
weixin_48148422 已提交
451 452
bool taosTmrStop(tmr_h timerId) {
  uintptr_t id = (uintptr_t)timerId;
H
hzcheng 已提交
453

weixin_48148422's avatar
weixin_48148422 已提交
454 455
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
456
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
weixin_48148422's avatar
weixin_48148422 已提交
457 458
    return false;
  }
H
hzcheng 已提交
459

weixin_48148422's avatar
weixin_48148422 已提交
460
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
461 462
  doStopTimer(timer, state);
  timerDecRef(timer);
H
hzcheng 已提交
463

weixin_48148422's avatar
weixin_48148422 已提交
464
  return state == TIMER_STATE_WAITING;
H
hzcheng 已提交
465 466
}

weixin_48148422's avatar
weixin_48148422 已提交
467 468 469 470
bool taosTmrStopA(tmr_h* timerId) {
  bool ret = taosTmrStop(*timerId);
  *timerId = NULL;
  return ret;
H
hzcheng 已提交
471 472
}

S
timer  
Shengliang Guan 已提交
473
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
weixin_48148422's avatar
weixin_48148422 已提交
474 475
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
S
TD-1037  
Shengliang Guan 已提交
476
    return false;
H
hzcheng 已提交
477 478
  }

weixin_48148422's avatar
weixin_48148422 已提交
479 480 481 482
  uintptr_t  id = (uintptr_t)*pTmrId;
  bool       stopped = false;
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
483
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
H
hzcheng 已提交
484
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
485
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
486 487 488 489 490
    if (!doStopTimer(timer, state)) {
      timerDecRef(timer);
      timer = NULL;
    }
    stopped = state == TIMER_STATE_WAITING;
H
hzcheng 已提交
491 492
  }

weixin_48148422's avatar
weixin_48148422 已提交
493 494 495
  if (timer == NULL) {
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
    return stopped;
H
hzcheng 已提交
496 497
  }

498
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
weixin_48148422's avatar
weixin_48148422 已提交
499 500 501

  // wait until there's no other reference to this timer,
  // so that we can reuse this timer safely.
S
timer  
Shengliang Guan 已提交
502
  for (int32_t i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
weixin_48148422's avatar
weixin_48148422 已提交
503 504 505 506
    if (i % 1000 == 0) {
      sched_yield();
    }
  }
H
hzcheng 已提交
507

weixin_48148422's avatar
weixin_48148422 已提交
508 509 510
  assert(timer->refCount == 1);
  memset(timer, 0, sizeof(*timer));
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
H
hzcheng 已提交
511

weixin_48148422's avatar
weixin_48148422 已提交
512
  return stopped;
H
hzcheng 已提交
513 514
}

weixin_48148422's avatar
weixin_48148422 已提交
515
static void taosTmrModuleInit(void) {
wafwerar's avatar
wafwerar 已提交
516
  tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
517 518 519 520 521
  if (tmrCtrls == NULL) {
    tmrError("failed to allocate memory for timer controllers.");
    return;
  }

dengyihao's avatar
fix bug  
dengyihao 已提交
522 523
  memset(&timerMap, 0, sizeof(timerMap));

S
TD-1037  
Shengliang Guan 已提交
524
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
weixin_48148422's avatar
weixin_48148422 已提交
525 526 527
    tmr_ctrl_t* ctrl = tmrCtrls + i;
    ctrl->next = ctrl + 1;
  }
S
Shengliang Guan 已提交
528
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
529
  unusedTmrCtrl = tmrCtrls;
H
hzcheng 已提交
530

wafwerar's avatar
wafwerar 已提交
531
  taosThreadMutexInit(&tmrCtrlMutex, NULL);
weixin_48148422's avatar
weixin_48148422 已提交
532

533
  int64_t now = taosGetMonotonicMs();
S
timer  
Shengliang Guan 已提交
534
  for (int32_t i = 0; i < tListLen(wheels); i++) {
weixin_48148422's avatar
weixin_48148422 已提交
535
    time_wheel_t* wheel = wheels + i;
wafwerar's avatar
wafwerar 已提交
536
    if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) {
weixin_48148422's avatar
weixin_48148422 已提交
537 538 539 540 541
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
      return;
    }
    wheel->nextScanAt = now + wheel->resolution;
    wheel->index = 0;
wafwerar's avatar
wafwerar 已提交
542
    wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*));
weixin_48148422's avatar
weixin_48148422 已提交
543 544 545 546 547
    if (wheel->slots == NULL) {
      tmrError("failed to allocate wheel slots");
      return;
    }
    timerMap.size += wheel->size;
H
hzcheng 已提交
548 549
  }

weixin_48148422's avatar
weixin_48148422 已提交
550
  timerMap.count = 0;
wafwerar's avatar
wafwerar 已提交
551
  timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t));
weixin_48148422's avatar
weixin_48148422 已提交
552 553 554 555
  if (timerMap.slots == NULL) {
    tmrError("failed to allocate hash map");
    return;
  }
H
hzcheng 已提交
556

weixin_48148422's avatar
weixin_48148422 已提交
557
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
weixin_48148422's avatar
weixin_48148422 已提交
558
  taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
H
hzcheng 已提交
559

560
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
weixin_48148422's avatar
weixin_48148422 已提交
561
}
H
hzcheng 已提交
562

S
timer  
Shengliang Guan 已提交
563
void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) {
564 565 566
  const char* ret = taosMonotonicInit();
  tmrDebug("ttimer monotonic clock source:%s", ret);

wafwerar's avatar
wafwerar 已提交
567
  taosThreadOnce(&tmrModuleInit, taosTmrModuleInit);
H
hzcheng 已提交
568

wafwerar's avatar
wafwerar 已提交
569
  taosThreadMutexLock(&tmrCtrlMutex);
weixin_48148422's avatar
weixin_48148422 已提交
570 571 572
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
  if (ctrl != NULL) {
    unusedTmrCtrl = ctrl->next;
573
    numOfTmrCtrl++;
H
hzcheng 已提交
574
  }
wafwerar's avatar
wafwerar 已提交
575
  taosThreadMutexUnlock(&tmrCtrlMutex);
weixin_48148422's avatar
weixin_48148422 已提交
576 577

  if (ctrl == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
578
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
S
Shengliang Guan 已提交
579
    terrno = TSDB_CODE_OUT_OF_MEMORY;
weixin_48148422's avatar
weixin_48148422 已提交
580 581 582
    return NULL;
  }

B
Bomin Zhang 已提交
583
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
584
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
weixin_48148422's avatar
weixin_48148422 已提交
585
  return ctrl;
H
hzcheng 已提交
586 587
}

weixin_48148422's avatar
weixin_48148422 已提交
588 589
void taosTmrCleanUp(void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
weixin_48148422's avatar
weixin_48148422 已提交
590 591 592
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return;
  }
weixin_48148422's avatar
weixin_48148422 已提交
593

594
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
595
  ctrl->label[0] = 0;
H
hzcheng 已提交
596

wafwerar's avatar
wafwerar 已提交
597
  taosThreadMutexLock(&tmrCtrlMutex);
weixin_48148422's avatar
weixin_48148422 已提交
598
  ctrl->next = unusedTmrCtrl;
599
  numOfTmrCtrl--;
weixin_48148422's avatar
weixin_48148422 已提交
600
  unusedTmrCtrl = ctrl;
wafwerar's avatar
wafwerar 已提交
601
  taosThreadMutexUnlock(&tmrCtrlMutex);
602

dengyihao's avatar
fix bug  
dengyihao 已提交
603 604
  tmrDebug("time controller's tmr ctrl size:  %d", numOfTmrCtrl);
  if (numOfTmrCtrl <= 0) {
weixin_48148422's avatar
weixin_48148422 已提交
605
    taosUninitTimer();
606

weixin_48148422's avatar
weixin_48148422 已提交
607 608
    taosCleanUpScheduler(tmrQhandle);

S
timer  
Shengliang Guan 已提交
609
    for (int32_t i = 0; i < tListLen(wheels); i++) {
610
      time_wheel_t* wheel = wheels + i;
wafwerar's avatar
wafwerar 已提交
611
      taosThreadMutexDestroy(&wheel->mutex);
wafwerar's avatar
wafwerar 已提交
612
      taosMemoryFree(wheel->slots);
613 614
    }

wafwerar's avatar
wafwerar 已提交
615
    taosThreadMutexDestroy(&tmrCtrlMutex);
weixin_48148422's avatar
weixin_48148422 已提交
616 617 618

    for (size_t i = 0; i < timerMap.size; i++) {
      timer_list_t* list = timerMap.slots + i;
dengyihao's avatar
fix bug  
dengyihao 已提交
619
      tmr_obj_t*    t = list->timers;
weixin_48148422's avatar
weixin_48148422 已提交
620 621
      while (t != NULL) {
        tmr_obj_t* next = t->mnext;
wafwerar's avatar
wafwerar 已提交
622
        taosMemoryFree(t);
weixin_48148422's avatar
weixin_48148422 已提交
623 624 625
        t = next;
      }
    }
wafwerar's avatar
wafwerar 已提交
626 627
    taosMemoryFree(timerMap.slots);
    taosMemoryFree(tmrCtrls);
628

dengyihao's avatar
fix bug  
dengyihao 已提交
629 630
    tmrCtrls = NULL;
    unusedTmrCtrl = NULL;
631
#if defined(LINUX)
dengyihao's avatar
fix bug  
dengyihao 已提交
632
    tmrModuleInit = PTHREAD_ONCE_INIT;  // to support restart
633
#endif
634
  }
H
hzcheng 已提交
635
}