ttimer.c 16.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tlog.h"
#include "tsched.h"
weixin_48148422's avatar
weixin_48148422 已提交
19
#include "ttime.h"
H
hzcheng 已提交
20 21 22
#include "ttimer.h"
#include "tutil.h"

S
slguan 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36
#define tmrError(...)                                 \
  do { if (tmrDebugFlag & DEBUG_ERROR) {              \
    taosPrintLog("ERROR TMR ", tmrDebugFlag, __VA_ARGS__); \
  } } while(0)

#define tmrWarn(...)                                  \
  do { if (tmrDebugFlag & DEBUG_WARN) {               \
    taosPrintLog("WARN  TMR ", tmrDebugFlag, __VA_ARGS__); \
  } } while(0)

#define tmrTrace(...)                           \
  do { if (tmrDebugFlag & DEBUG_TRACE) {        \
    taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \
  } } while(0)
H
hzcheng 已提交
37

weixin_48148422's avatar
weixin_48148422 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
#define TIMER_STATE_WAITING 0
#define TIMER_STATE_EXPIRED 1
#define TIMER_STATE_STOPPED 2
#define TIMER_STATE_CANCELED 3

typedef union _tmr_ctrl_t {
  char label[16];
  struct {
    // pad to ensure 'next' is the end of this union
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
    union _tmr_ctrl_t* next;
  };
} tmr_ctrl_t;
H
hzcheng 已提交
51

weixin_48148422's avatar
weixin_48148422 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64
typedef struct tmr_obj_t {
  uintptr_t         id;
  tmr_ctrl_t*       ctrl;
  struct tmr_obj_t* mnext;
  struct tmr_obj_t* prev;
  struct tmr_obj_t* next;
  uint16_t          slot;
  uint8_t           wheel;
  uint8_t           state;
  uint8_t           refCount;
  uint8_t           reserved1;
  uint16_t          reserved2;
  union {
65 66
    int64_t expireAt;
    int64_t executedBy;
weixin_48148422's avatar
weixin_48148422 已提交
67 68 69 70
  };
  TAOS_TMR_CALLBACK fp;
  void*             param;
} tmr_obj_t;
H
hzcheng 已提交
71

weixin_48148422's avatar
weixin_48148422 已提交
72
typedef struct timer_list_t {
73
  int64_t    lockedBy;
weixin_48148422's avatar
weixin_48148422 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  tmr_obj_t* timers;
} timer_list_t;

typedef struct timer_map_t {
  uint32_t      size;
  uint32_t      count;
  timer_list_t* slots;
} timer_map_t;

typedef struct time_wheel_t {
  pthread_mutex_t mutex;
  int64_t         nextScanAt;
  uint32_t        resolution;
  uint16_t        size;
  uint16_t        index;
  tmr_obj_t**     slots;
} time_wheel_t;

S
slguan 已提交
92
int32_t tmrDebugFlag = 131;
93
uint32_t taosMaxTmrCtrl = 512;
weixin_48148422's avatar
weixin_48148422 已提交
94 95 96

static pthread_once_t  tmrModuleInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tmrCtrlMutex;
97
static tmr_ctrl_t*     tmrCtrls;
weixin_48148422's avatar
weixin_48148422 已提交
98
static tmr_ctrl_t*     unusedTmrCtrl = NULL;
99 100 101 102
static void*           tmrQhandle;
static int             numOfTmrCtrl = 0;

int taosTmrThreads = 1;
weixin_48148422's avatar
weixin_48148422 已提交
103 104 105 106 107 108 109 110 111 112 113 114
static uintptr_t nextTimerId = 0;

static time_wheel_t wheels[] = {
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
    {.resolution = 1000, .size = 1024},
    {.resolution = 60000, .size = 1024},
};
static timer_map_t timerMap;

static uintptr_t getNextTimerId() {
  uintptr_t id;
  do {
weixin_48148422's avatar
weixin_48148422 已提交
115
    id = atomic_add_fetch_ptr(&nextTimerId, 1);
weixin_48148422's avatar
weixin_48148422 已提交
116 117
  } while (id == 0);
  return id;
H
hzcheng 已提交
118 119
}

weixin_48148422's avatar
weixin_48148422 已提交
120
static void timerAddRef(tmr_obj_t* timer) { atomic_add_fetch_8(&timer->refCount, 1); }
H
hzcheng 已提交
121

weixin_48148422's avatar
weixin_48148422 已提交
122
static void timerDecRef(tmr_obj_t* timer) {
weixin_48148422's avatar
weixin_48148422 已提交
123
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
weixin_48148422's avatar
weixin_48148422 已提交
124
    free(timer);
H
hzcheng 已提交
125
  }
weixin_48148422's avatar
weixin_48148422 已提交
126
}
H
hzcheng 已提交
127

weixin_48148422's avatar
weixin_48148422 已提交
128
static void lockTimerList(timer_list_t* list) {
129
  int64_t tid = taosGetPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
130
  int       i = 0;
weixin_48148422's avatar
weixin_48148422 已提交
131
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
weixin_48148422's avatar
weixin_48148422 已提交
132 133 134
    if (++i % 1000 == 0) {
      sched_yield();
    }
H
hzcheng 已提交
135
  }
weixin_48148422's avatar
weixin_48148422 已提交
136
}
H
hzcheng 已提交
137

weixin_48148422's avatar
weixin_48148422 已提交
138
static void unlockTimerList(timer_list_t* list) {
139
  int64_t tid = taosGetPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
140
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
weixin_48148422's avatar
weixin_48148422 已提交
141
    assert(false);
weixin_48148422's avatar
weixin_48148422 已提交
142
    tmrError("%d trying to unlock a timer list not locked by current thread.", tid);
H
hzcheng 已提交
143 144 145
  }
}

weixin_48148422's avatar
weixin_48148422 已提交
146 147 148
static void addTimer(tmr_obj_t* timer) {
  timerAddRef(timer);
  timer->wheel = tListLen(wheels);
H
hzcheng 已提交
149

weixin_48148422's avatar
weixin_48148422 已提交
150 151
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
H
hzcheng 已提交
152

weixin_48148422's avatar
weixin_48148422 已提交
153 154 155 156 157
  lockTimerList(list);
  timer->mnext = list->timers;
  list->timers = timer;
  unlockTimerList(list);
}
H
hzcheng 已提交
158

weixin_48148422's avatar
weixin_48148422 已提交
159 160 161 162 163 164 165 166 167 168
static tmr_obj_t* findTimer(uintptr_t id) {
  tmr_obj_t* timer = NULL;
  if (id > 0) {
    uint32_t      idx = (uint32_t)(id % timerMap.size);
    timer_list_t* list = timerMap.slots + idx;
    lockTimerList(list);
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
      if (timer->id == id) {
        timerAddRef(timer);
        break;
H
hzcheng 已提交
169 170
      }
    }
weixin_48148422's avatar
weixin_48148422 已提交
171
    unlockTimerList(list);
H
hzcheng 已提交
172
  }
weixin_48148422's avatar
weixin_48148422 已提交
173
  return timer;
H
hzcheng 已提交
174 175
}

weixin_48148422's avatar
weixin_48148422 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
static void removeTimer(uintptr_t id) {
  tmr_obj_t*    prev = NULL;
  uint32_t      idx = (uint32_t)(id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
  lockTimerList(list);
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
    if (p->id == id) {
      if (prev == NULL) {
        list->timers = p->mnext;
      } else {
        prev->mnext = p->mnext;
      }
      timerDecRef(p);
      break;
    }
    prev = p;
H
hzcheng 已提交
192
  }
weixin_48148422's avatar
weixin_48148422 已提交
193 194
  unlockTimerList(list);
}
H
hzcheng 已提交
195

weixin_48148422's avatar
weixin_48148422 已提交
196 197 198 199 200 201 202 203 204
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
  timerAddRef(timer);
  // select a wheel for the timer, we are not an accurate timer,
  // but the inaccuracy should not be too large.
  timer->wheel = tListLen(wheels) - 1;
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (delay < wheel->resolution * wheel->size) {
      timer->wheel = i;
H
hzcheng 已提交
205 206 207 208
      break;
    }
  }

weixin_48148422's avatar
weixin_48148422 已提交
209 210 211
  time_wheel_t* wheel = wheels + timer->wheel;
  timer->prev = NULL;
  timer->expireAt = taosGetTimestampMs() + delay;
H
hzcheng 已提交
212

weixin_48148422's avatar
weixin_48148422 已提交
213
  pthread_mutex_lock(&wheel->mutex);
H
hzcheng 已提交
214

weixin_48148422's avatar
weixin_48148422 已提交
215 216 217 218 219 220
  uint32_t idx = 0;
  if (timer->expireAt > wheel->nextScanAt) {
    // adjust delay according to next scan time of this wheel
    // so that the timer is not fired earlier than desired.
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
H
hzcheng 已提交
221 222
  }

weixin_48148422's avatar
weixin_48148422 已提交
223 224 225 226 227 228 229
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
  tmr_obj_t* p = wheel->slots[timer->slot];
  wheel->slots[timer->slot] = timer;
  timer->next = p;
  if (p != NULL) {
    p->prev = timer;
  }
H
hzcheng 已提交
230

weixin_48148422's avatar
weixin_48148422 已提交
231
  pthread_mutex_unlock(&wheel->mutex);
H
hzcheng 已提交
232 233
}

weixin_48148422's avatar
weixin_48148422 已提交
234 235 236 237 238 239 240 241 242 243 244 245
static bool removeFromWheel(tmr_obj_t* timer) {
  if (timer->wheel >= tListLen(wheels)) {
    return false;
  }
  time_wheel_t* wheel = wheels + timer->wheel;

  bool removed = false;
  pthread_mutex_lock(&wheel->mutex);
  // other thread may modify timer->wheel, check again.
  if (timer->wheel < tListLen(wheels)) {
    if (timer->prev != NULL) {
      timer->prev->next = timer->next;
S
slguan 已提交
246
    }
weixin_48148422's avatar
weixin_48148422 已提交
247 248
    if (timer->next != NULL) {
      timer->next->prev = timer->prev;
S
slguan 已提交
249
    }
weixin_48148422's avatar
weixin_48148422 已提交
250 251 252 253 254 255 256 257
    if (timer == wheel->slots[timer->slot]) {
      wheel->slots[timer->slot] = timer->next;
    }
    timer->wheel = tListLen(wheels);
    timer->next = NULL;
    timer->prev = NULL;
    timerDecRef(timer);
    removed = true;
S
slguan 已提交
258
  }
weixin_48148422's avatar
weixin_48148422 已提交
259
  pthread_mutex_unlock(&wheel->mutex);
S
slguan 已提交
260

weixin_48148422's avatar
weixin_48148422 已提交
261
  return removed;
S
slguan 已提交
262 263
}

weixin_48148422's avatar
weixin_48148422 已提交
264 265
static void processExpiredTimer(void* handle, void* arg) {
  tmr_obj_t* timer = (tmr_obj_t*)handle;
266
  timer->executedBy = taosGetPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
267
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
weixin_48148422's avatar
weixin_48148422 已提交
268
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
269
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
weixin_48148422's avatar
weixin_48148422 已提交
270
    tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
H
hzcheng 已提交
271

weixin_48148422's avatar
weixin_48148422 已提交
272 273
    (*timer->fp)(timer->param, (tmr_h)timer->id);
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
H
hzcheng 已提交
274

weixin_48148422's avatar
weixin_48148422 已提交
275
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
weixin_48148422's avatar
weixin_48148422 已提交
276 277 278 279 280
    tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
  }
  removeTimer(timer->id);
  timerDecRef(timer);
}
H
hzcheng 已提交
281

weixin_48148422's avatar
weixin_48148422 已提交
282
static void addToExpired(tmr_obj_t* head) {
weixin_48148422's avatar
weixin_48148422 已提交
283
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
H
hzcheng 已提交
284

weixin_48148422's avatar
weixin_48148422 已提交
285
  while (head != NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
286
    uintptr_t id = head->id;
weixin_48148422's avatar
weixin_48148422 已提交
287
    tmr_obj_t* next = head->next;
weixin_48148422's avatar
weixin_48148422 已提交
288 289
    tmrTrace(fmt, head->ctrl->label, id, head->fp, head->param);

weixin_48148422's avatar
weixin_48148422 已提交
290 291 292 293 294 295
    SSchedMsg  schedMsg;
    schedMsg.fp = NULL;
    schedMsg.tfp = processExpiredTimer;
    schedMsg.ahandle = head;
    schedMsg.thandle = NULL;
    taosScheduleTask(tmrQhandle, &schedMsg);
weixin_48148422's avatar
weixin_48148422 已提交
296

weixin_48148422's avatar
weixin_48148422 已提交
297
    tmrTrace("timer[id=%" PRIuPTR "] has been added to queue.", id);
weixin_48148422's avatar
weixin_48148422 已提交
298 299 300
    head = next;
  }
}
H
hzcheng 已提交
301

weixin_48148422's avatar
weixin_48148422 已提交
302 303 304 305 306 307 308 309 310
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int mseconds, void* param, tmr_ctrl_t* ctrl) {
  uintptr_t id = getNextTimerId();
  timer->id = id;
  timer->state = TIMER_STATE_WAITING;
  timer->fp = fp;
  timer->param = param;
  timer->ctrl = ctrl;
  addTimer(timer);

weixin_48148422's avatar
weixin_48148422 已提交
311
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
weixin_48148422's avatar
weixin_48148422 已提交
312 313 314 315 316 317
  tmrTrace(fmt, ctrl->label, timer->id, timer->fp, timer->param);

  if (mseconds == 0) {
    timer->wheel = tListLen(wheels);
    timerAddRef(timer);
    addToExpired(timer);
H
hzcheng 已提交
318
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
319
    addToWheel(timer, mseconds);
H
hzcheng 已提交
320 321
  }

weixin_48148422's avatar
weixin_48148422 已提交
322 323
  // note: use `timer->id` here is unsafe as `timer` may already be freed
  return id;
H
hzcheng 已提交
324 325
}

weixin_48148422's avatar
weixin_48148422 已提交
326 327 328 329 330
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return NULL;
  }
H
hzcheng 已提交
331

weixin_48148422's avatar
weixin_48148422 已提交
332 333
  tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t));
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
334
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
335 336
    return NULL;
  }
H
hzcheng 已提交
337

weixin_48148422's avatar
weixin_48148422 已提交
338 339
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
}
H
hzcheng 已提交
340

weixin_48148422's avatar
weixin_48148422 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
static void taosTimerLoopFunc(int signo) {
  int64_t now = taosGetTimestampMs();

  for (int i = 0; i < tListLen(wheels); i++) {
    // `expried` is a temporary expire list.
    // expired timers are first add to this list, then move
    // to expired queue as a batch to improve performance.
    // note this list is used as a stack in this function.
    tmr_obj_t* expired = NULL;

    time_wheel_t* wheel = wheels + i;
    while (now >= wheel->nextScanAt) {
      pthread_mutex_lock(&wheel->mutex);
      wheel->index = (wheel->index + 1) % wheel->size;
      tmr_obj_t* timer = wheel->slots[wheel->index];
      while (timer != NULL) {
        tmr_obj_t* next = timer->next;
        if (now < timer->expireAt) {
          timer = next;
          continue;
        }

        // remove from the wheel
        if (timer->prev == NULL) {
          wheel->slots[wheel->index] = next;
          if (next != NULL) {
            next->prev = NULL;
          }
        } else {
          timer->prev->next = next;
          if (next != NULL) {
            next->prev = timer->prev;
          }
        }
        timer->wheel = tListLen(wheels);

        // add to temporary expire list
        timer->next = expired;
        timer->prev = NULL;
        if (expired != NULL) {
          expired->prev = timer;
        }
        expired = timer;

        timer = next;
      }
      pthread_mutex_unlock(&wheel->mutex);
      wheel->nextScanAt += wheel->resolution;
H
hzcheng 已提交
389 390
    }

weixin_48148422's avatar
weixin_48148422 已提交
391
    addToExpired(expired);
H
hzcheng 已提交
392
  }
weixin_48148422's avatar
weixin_48148422 已提交
393
}
H
hzcheng 已提交
394

weixin_48148422's avatar
weixin_48148422 已提交
395 396
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
397
    bool reusable = false;
weixin_48148422's avatar
weixin_48148422 已提交
398 399 400 401 402 403
    if (removeFromWheel(timer)) {
      removeTimer(timer->id);
      // only safe to reuse the timer when timer is removed from the wheel.
      // we cannot guarantee the thread safety of the timr in all other cases.
      reusable = true;
    }
weixin_48148422's avatar
weixin_48148422 已提交
404
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
weixin_48148422's avatar
weixin_48148422 已提交
405
    tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
406 407 408 409
    return reusable;
  }
  
  if (state != TIMER_STATE_EXPIRED) {
weixin_48148422's avatar
weixin_48148422 已提交
410
    // timer already stopped or cancelled, has nothing to do in this case
weixin_48148422's avatar
weixin_48148422 已提交
411 412 413 414
    return false;
  }
  
  if (timer->executedBy == taosGetPthreadId()) {
weixin_48148422's avatar
weixin_48148422 已提交
415 416 417
    // taosTmrReset is called in the timer callback, should do nothing in this
    // case to avoid dead lock. note taosTmrReset must be the last statement
    // of the callback funtion, will be a bug otherwise.
weixin_48148422's avatar
weixin_48148422 已提交
418
    return false;
H
hzcheng 已提交
419
  }
weixin_48148422's avatar
weixin_48148422 已提交
420 421 422 423
  
  // timer callback is executing in another thread, we SHOULD wait it to stop,
  // BUT this may result in dead lock if current thread are holding a lock which
  // the timer callback need to acquire. so, we HAVE TO return directly.
weixin_48148422's avatar
weixin_48148422 已提交
424
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
weixin_48148422's avatar
weixin_48148422 已提交
425 426
  tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
  return false;
weixin_48148422's avatar
weixin_48148422 已提交
427
}
H
hzcheng 已提交
428

weixin_48148422's avatar
weixin_48148422 已提交
429 430
bool taosTmrStop(tmr_h timerId) {
  uintptr_t id = (uintptr_t)timerId;
H
hzcheng 已提交
431

weixin_48148422's avatar
weixin_48148422 已提交
432 433
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
434
    tmrTrace("timer[id=%" PRIuPTR "] does not exist", id);
weixin_48148422's avatar
weixin_48148422 已提交
435 436
    return false;
  }
H
hzcheng 已提交
437

weixin_48148422's avatar
weixin_48148422 已提交
438
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
439 440
  doStopTimer(timer, state);
  timerDecRef(timer);
H
hzcheng 已提交
441

weixin_48148422's avatar
weixin_48148422 已提交
442
  return state == TIMER_STATE_WAITING;
H
hzcheng 已提交
443 444
}

weixin_48148422's avatar
weixin_48148422 已提交
445 446 447 448
bool taosTmrStopA(tmr_h* timerId) {
  bool ret = taosTmrStop(*timerId);
  *timerId = NULL;
  return ret;
H
hzcheng 已提交
449 450
}

weixin_48148422's avatar
weixin_48148422 已提交
451 452 453
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, tmr_h* pTmrId) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
H
hzcheng 已提交
454 455 456
    return NULL;
  }

weixin_48148422's avatar
weixin_48148422 已提交
457 458 459 460
  uintptr_t  id = (uintptr_t)*pTmrId;
  bool       stopped = false;
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
461
    tmrTrace("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
H
hzcheng 已提交
462
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
463
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
464 465 466 467 468
    if (!doStopTimer(timer, state)) {
      timerDecRef(timer);
      timer = NULL;
    }
    stopped = state == TIMER_STATE_WAITING;
H
hzcheng 已提交
469 470
  }

weixin_48148422's avatar
weixin_48148422 已提交
471 472 473
  if (timer == NULL) {
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
    return stopped;
H
hzcheng 已提交
474 475
  }

weixin_48148422's avatar
weixin_48148422 已提交
476
  tmrTrace("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
weixin_48148422's avatar
weixin_48148422 已提交
477 478 479 480 481 482 483 484

  // wait until there's no other reference to this timer,
  // so that we can reuse this timer safely.
  for (int i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
    if (i % 1000 == 0) {
      sched_yield();
    }
  }
H
hzcheng 已提交
485

weixin_48148422's avatar
weixin_48148422 已提交
486 487 488
  assert(timer->refCount == 1);
  memset(timer, 0, sizeof(*timer));
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
H
hzcheng 已提交
489

weixin_48148422's avatar
weixin_48148422 已提交
490
  return stopped;
H
hzcheng 已提交
491 492
}

weixin_48148422's avatar
weixin_48148422 已提交
493
static void taosTmrModuleInit(void) {
494 495 496 497 498 499 500
  tmrCtrls = malloc(sizeof(tmr_ctrl_t) * taosMaxTmrCtrl);
  if (tmrCtrls == NULL) {
    tmrError("failed to allocate memory for timer controllers.");
    return;
  }

  for (int i = 0; i < taosMaxTmrCtrl - 1; ++i) {
weixin_48148422's avatar
weixin_48148422 已提交
501 502 503 504
    tmr_ctrl_t* ctrl = tmrCtrls + i;
    ctrl->next = ctrl + 1;
  }
  unusedTmrCtrl = tmrCtrls;
H
hzcheng 已提交
505

weixin_48148422's avatar
weixin_48148422 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
  pthread_mutex_init(&tmrCtrlMutex, NULL);

  int64_t now = taosGetTimestampMs();
  for (int i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (pthread_mutex_init(&wheel->mutex, NULL) != 0) {
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
      return;
    }
    wheel->nextScanAt = now + wheel->resolution;
    wheel->index = 0;
    wheel->slots = (tmr_obj_t**)calloc(wheel->size, sizeof(tmr_obj_t*));
    if (wheel->slots == NULL) {
      tmrError("failed to allocate wheel slots");
      return;
    }
    timerMap.size += wheel->size;
H
hzcheng 已提交
523 524
  }

weixin_48148422's avatar
weixin_48148422 已提交
525 526 527 528 529 530
  timerMap.count = 0;
  timerMap.slots = (timer_list_t*)calloc(timerMap.size, sizeof(timer_list_t));
  if (timerMap.slots == NULL) {
    tmrError("failed to allocate hash map");
    return;
  }
H
hzcheng 已提交
531

weixin_48148422's avatar
weixin_48148422 已提交
532
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
weixin_48148422's avatar
weixin_48148422 已提交
533
  taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
H
hzcheng 已提交
534

weixin_48148422's avatar
weixin_48148422 已提交
535 536
  tmrTrace("timer module is initialized, number of threads: %d", taosTmrThreads);
}
H
hzcheng 已提交
537

weixin_48148422's avatar
weixin_48148422 已提交
538 539
void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) {
  pthread_once(&tmrModuleInit, taosTmrModuleInit);
H
hzcheng 已提交
540

weixin_48148422's avatar
weixin_48148422 已提交
541 542 543 544
  pthread_mutex_lock(&tmrCtrlMutex);
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
  if (ctrl != NULL) {
    unusedTmrCtrl = ctrl->next;
545
    numOfTmrCtrl++;
H
hzcheng 已提交
546
  }
weixin_48148422's avatar
weixin_48148422 已提交
547 548 549
  pthread_mutex_unlock(&tmrCtrlMutex);

  if (ctrl == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
550
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
weixin_48148422's avatar
weixin_48148422 已提交
551 552 553 554 555
    return NULL;
  }

  strncpy(ctrl->label, label, sizeof(ctrl->label));
  ctrl->label[sizeof(ctrl->label) - 1] = 0;
weixin_48148422's avatar
weixin_48148422 已提交
556
  tmrTrace("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
weixin_48148422's avatar
weixin_48148422 已提交
557
  return ctrl;
H
hzcheng 已提交
558 559
}

weixin_48148422's avatar
weixin_48148422 已提交
560 561
void taosTmrCleanUp(void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
weixin_48148422's avatar
weixin_48148422 已提交
562 563 564
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return;
  }
weixin_48148422's avatar
weixin_48148422 已提交
565

weixin_48148422's avatar
weixin_48148422 已提交
566
  tmrTrace("%s timer controller is cleaned up.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
567
  ctrl->label[0] = 0;
H
hzcheng 已提交
568

weixin_48148422's avatar
weixin_48148422 已提交
569 570
  pthread_mutex_lock(&tmrCtrlMutex);
  ctrl->next = unusedTmrCtrl;
571
  numOfTmrCtrl--;
weixin_48148422's avatar
weixin_48148422 已提交
572 573
  unusedTmrCtrl = ctrl;
  pthread_mutex_unlock(&tmrCtrlMutex);
574 575

  if (numOfTmrCtrl <=0) {
weixin_48148422's avatar
weixin_48148422 已提交
576
    taosUninitTimer();
577
    
weixin_48148422's avatar
weixin_48148422 已提交
578 579
    taosCleanUpScheduler(tmrQhandle);

580 581 582 583 584 585
    for (int i = 0; i < tListLen(wheels); i++) {
      time_wheel_t* wheel = wheels + i;
      pthread_mutex_destroy(&wheel->mutex); 
      free(wheel->slots);
    }

weixin_48148422's avatar
weixin_48148422 已提交
586 587 588 589 590 591 592 593 594 595 596 597
    pthread_mutex_destroy(&tmrCtrlMutex);

    for (size_t i = 0; i < timerMap.size; i++) {
      timer_list_t* list = timerMap.slots + i;
      tmr_obj_t* t = list->timers;
      while (t != NULL) {
        tmr_obj_t* next = t->mnext;
        free(t);
        t = next;
      }
    }
    free(timerMap.slots);
598 599 600 601
    free(tmrCtrls);

    tmrTrace("timer module is cleaned up");
  }
H
hzcheng 已提交
602
}