ttimer.c 17.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "tlog.h"
#include "tsched.h"
#include "ttimer.h"
#include "tutil.h"
S
Shengliang Guan 已提交
21
#include "taoserror.h"
H
hzcheng 已提交
22

23 24 25 26
#define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrWarn(...)  { if (tmrDebugFlag & DEBUG_WARN)  { taosPrintLog("TMR WARN ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrInfo(...)  { if (tmrDebugFlag & DEBUG_INFO)  { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }}
S
Shengliang Guan 已提交
27 28
#define tmrDebug(...) { if (tmrDebugFlag & DEBUG_DEBUG) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrTrace(...) { if (tmrDebugFlag & DEBUG_TRACE) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }}
H
hzcheng 已提交
29

weixin_48148422's avatar
weixin_48148422 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42
#define TIMER_STATE_WAITING 0
#define TIMER_STATE_EXPIRED 1
#define TIMER_STATE_STOPPED 2
#define TIMER_STATE_CANCELED 3

typedef union _tmr_ctrl_t {
  char label[16];
  struct {
    // pad to ensure 'next' is the end of this union
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
    union _tmr_ctrl_t* next;
  };
} tmr_ctrl_t;
H
hzcheng 已提交
43

weixin_48148422's avatar
weixin_48148422 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56
typedef struct tmr_obj_t {
  uintptr_t         id;
  tmr_ctrl_t*       ctrl;
  struct tmr_obj_t* mnext;
  struct tmr_obj_t* prev;
  struct tmr_obj_t* next;
  uint16_t          slot;
  uint8_t           wheel;
  uint8_t           state;
  uint8_t           refCount;
  uint8_t           reserved1;
  uint16_t          reserved2;
  union {
57 58
    int64_t expireAt;
    int64_t executedBy;
weixin_48148422's avatar
weixin_48148422 已提交
59 60 61 62
  };
  TAOS_TMR_CALLBACK fp;
  void*             param;
} tmr_obj_t;
H
hzcheng 已提交
63

weixin_48148422's avatar
weixin_48148422 已提交
64
typedef struct timer_list_t {
65
  int64_t    lockedBy;
weixin_48148422's avatar
weixin_48148422 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
  tmr_obj_t* timers;
} timer_list_t;

typedef struct timer_map_t {
  uint32_t      size;
  uint32_t      count;
  timer_list_t* slots;
} timer_map_t;

typedef struct time_wheel_t {
  pthread_mutex_t mutex;
  int64_t         nextScanAt;
  uint32_t        resolution;
  uint16_t        size;
  uint16_t        index;
  tmr_obj_t**     slots;
} time_wheel_t;

S
slguan 已提交
84
int32_t tmrDebugFlag = 131;
S
Shengliang Guan 已提交
85
uint32_t tsMaxTmrCtrl = 512;
weixin_48148422's avatar
weixin_48148422 已提交
86 87 88

static pthread_once_t  tmrModuleInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tmrCtrlMutex;
89
static tmr_ctrl_t*     tmrCtrls;
weixin_48148422's avatar
weixin_48148422 已提交
90
static tmr_ctrl_t*     unusedTmrCtrl = NULL;
91 92 93 94
static void*           tmrQhandle;
static int             numOfTmrCtrl = 0;

int taosTmrThreads = 1;
weixin_48148422's avatar
weixin_48148422 已提交
95 96 97 98 99 100 101 102 103 104 105 106
static uintptr_t nextTimerId = 0;

static time_wheel_t wheels[] = {
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
    {.resolution = 1000, .size = 1024},
    {.resolution = 60000, .size = 1024},
};
static timer_map_t timerMap;

static uintptr_t getNextTimerId() {
  uintptr_t id;
  do {
weixin_48148422's avatar
weixin_48148422 已提交
107
    id = atomic_add_fetch_ptr(&nextTimerId, 1);
weixin_48148422's avatar
weixin_48148422 已提交
108 109
  } while (id == 0);
  return id;
H
hzcheng 已提交
110 111
}

weixin_48148422's avatar
weixin_48148422 已提交
112
static void timerAddRef(tmr_obj_t* timer) { atomic_add_fetch_8(&timer->refCount, 1); }
H
hzcheng 已提交
113

weixin_48148422's avatar
weixin_48148422 已提交
114
static void timerDecRef(tmr_obj_t* timer) {
weixin_48148422's avatar
weixin_48148422 已提交
115
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
weixin_48148422's avatar
weixin_48148422 已提交
116
    free(timer);
H
hzcheng 已提交
117
  }
weixin_48148422's avatar
weixin_48148422 已提交
118
}
H
hzcheng 已提交
119

weixin_48148422's avatar
weixin_48148422 已提交
120
static void lockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
121
  int64_t tid = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
122
  int       i = 0;
weixin_48148422's avatar
weixin_48148422 已提交
123
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
weixin_48148422's avatar
weixin_48148422 已提交
124 125 126
    if (++i % 1000 == 0) {
      sched_yield();
    }
H
hzcheng 已提交
127
  }
weixin_48148422's avatar
weixin_48148422 已提交
128
}
H
hzcheng 已提交
129

weixin_48148422's avatar
weixin_48148422 已提交
130
static void unlockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
131
  int64_t tid = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
132
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
weixin_48148422's avatar
weixin_48148422 已提交
133
    assert(false);
H
Hui Li 已提交
134
    tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
H
hzcheng 已提交
135 136 137
  }
}

weixin_48148422's avatar
weixin_48148422 已提交
138 139 140
static void addTimer(tmr_obj_t* timer) {
  timerAddRef(timer);
  timer->wheel = tListLen(wheels);
H
hzcheng 已提交
141

weixin_48148422's avatar
weixin_48148422 已提交
142 143
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
H
hzcheng 已提交
144

weixin_48148422's avatar
weixin_48148422 已提交
145 146 147 148 149
  lockTimerList(list);
  timer->mnext = list->timers;
  list->timers = timer;
  unlockTimerList(list);
}
H
hzcheng 已提交
150

weixin_48148422's avatar
weixin_48148422 已提交
151 152 153 154 155 156 157 158 159 160
static tmr_obj_t* findTimer(uintptr_t id) {
  tmr_obj_t* timer = NULL;
  if (id > 0) {
    uint32_t      idx = (uint32_t)(id % timerMap.size);
    timer_list_t* list = timerMap.slots + idx;
    lockTimerList(list);
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
      if (timer->id == id) {
        timerAddRef(timer);
        break;
H
hzcheng 已提交
161 162
      }
    }
weixin_48148422's avatar
weixin_48148422 已提交
163
    unlockTimerList(list);
H
hzcheng 已提交
164
  }
weixin_48148422's avatar
weixin_48148422 已提交
165
  return timer;
H
hzcheng 已提交
166 167
}

weixin_48148422's avatar
weixin_48148422 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
static void removeTimer(uintptr_t id) {
  tmr_obj_t*    prev = NULL;
  uint32_t      idx = (uint32_t)(id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
  lockTimerList(list);
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
    if (p->id == id) {
      if (prev == NULL) {
        list->timers = p->mnext;
      } else {
        prev->mnext = p->mnext;
      }
      timerDecRef(p);
      break;
    }
    prev = p;
H
hzcheng 已提交
184
  }
weixin_48148422's avatar
weixin_48148422 已提交
185 186
  unlockTimerList(list);
}
H
hzcheng 已提交
187

weixin_48148422's avatar
weixin_48148422 已提交
188 189 190 191 192 193 194 195 196
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
  timerAddRef(timer);
  // select a wheel for the timer, we are not an accurate timer,
  // but the inaccuracy should not be too large.
  timer->wheel = tListLen(wheels) - 1;
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (delay < wheel->resolution * wheel->size) {
      timer->wheel = i;
H
hzcheng 已提交
197 198 199 200
      break;
    }
  }

weixin_48148422's avatar
weixin_48148422 已提交
201 202
  time_wheel_t* wheel = wheels + timer->wheel;
  timer->prev = NULL;
203
  timer->expireAt = taosGetMonotonicMs() + delay;
H
hzcheng 已提交
204

weixin_48148422's avatar
weixin_48148422 已提交
205
  pthread_mutex_lock(&wheel->mutex);
H
hzcheng 已提交
206

weixin_48148422's avatar
weixin_48148422 已提交
207 208 209 210 211 212
  uint32_t idx = 0;
  if (timer->expireAt > wheel->nextScanAt) {
    // adjust delay according to next scan time of this wheel
    // so that the timer is not fired earlier than desired.
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
H
hzcheng 已提交
213 214
  }

weixin_48148422's avatar
weixin_48148422 已提交
215 216 217 218 219 220 221
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
  tmr_obj_t* p = wheel->slots[timer->slot];
  wheel->slots[timer->slot] = timer;
  timer->next = p;
  if (p != NULL) {
    p->prev = timer;
  }
H
hzcheng 已提交
222

weixin_48148422's avatar
weixin_48148422 已提交
223
  pthread_mutex_unlock(&wheel->mutex);
H
hzcheng 已提交
224 225
}

weixin_48148422's avatar
weixin_48148422 已提交
226
static bool removeFromWheel(tmr_obj_t* timer) {
227 228
  uint8_t wheelIdx = timer->wheel;
  if (wheelIdx >= tListLen(wheels)) {
weixin_48148422's avatar
weixin_48148422 已提交
229 230
    return false;
  }
231
  time_wheel_t* wheel = wheels + wheelIdx;
weixin_48148422's avatar
weixin_48148422 已提交
232 233 234 235 236 237 238

  bool removed = false;
  pthread_mutex_lock(&wheel->mutex);
  // other thread may modify timer->wheel, check again.
  if (timer->wheel < tListLen(wheels)) {
    if (timer->prev != NULL) {
      timer->prev->next = timer->next;
S
slguan 已提交
239
    }
weixin_48148422's avatar
weixin_48148422 已提交
240 241
    if (timer->next != NULL) {
      timer->next->prev = timer->prev;
S
slguan 已提交
242
    }
weixin_48148422's avatar
weixin_48148422 已提交
243 244 245 246 247 248 249 250
    if (timer == wheel->slots[timer->slot]) {
      wheel->slots[timer->slot] = timer->next;
    }
    timer->wheel = tListLen(wheels);
    timer->next = NULL;
    timer->prev = NULL;
    timerDecRef(timer);
    removed = true;
S
slguan 已提交
251
  }
weixin_48148422's avatar
weixin_48148422 已提交
252
  pthread_mutex_unlock(&wheel->mutex);
S
slguan 已提交
253

weixin_48148422's avatar
weixin_48148422 已提交
254
  return removed;
S
slguan 已提交
255 256
}

weixin_48148422's avatar
weixin_48148422 已提交
257 258
static void processExpiredTimer(void* handle, void* arg) {
  tmr_obj_t* timer = (tmr_obj_t*)handle;
S
TD-2616  
Shengliang Guan 已提交
259
  timer->executedBy = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
260
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
weixin_48148422's avatar
weixin_48148422 已提交
261
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
262
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
263
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
H
hzcheng 已提交
264

weixin_48148422's avatar
weixin_48148422 已提交
265 266
    (*timer->fp)(timer->param, (tmr_h)timer->id);
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
H
hzcheng 已提交
267

weixin_48148422's avatar
weixin_48148422 已提交
268
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
269
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
270 271 272 273
  }
  removeTimer(timer->id);
  timerDecRef(timer);
}
H
hzcheng 已提交
274

weixin_48148422's avatar
weixin_48148422 已提交
275
static void addToExpired(tmr_obj_t* head) {
weixin_48148422's avatar
weixin_48148422 已提交
276
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
H
hzcheng 已提交
277

weixin_48148422's avatar
weixin_48148422 已提交
278
  while (head != NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
279
    uintptr_t id = head->id;
weixin_48148422's avatar
weixin_48148422 已提交
280
    tmr_obj_t* next = head->next;
281
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
weixin_48148422's avatar
weixin_48148422 已提交
282

weixin_48148422's avatar
weixin_48148422 已提交
283 284 285
    SSchedMsg  schedMsg;
    schedMsg.fp = NULL;
    schedMsg.tfp = processExpiredTimer;
H
Hui Li 已提交
286
    schedMsg.msg = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
287 288 289
    schedMsg.ahandle = head;
    schedMsg.thandle = NULL;
    taosScheduleTask(tmrQhandle, &schedMsg);
weixin_48148422's avatar
weixin_48148422 已提交
290

291
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
weixin_48148422's avatar
weixin_48148422 已提交
292 293 294
    head = next;
  }
}
H
hzcheng 已提交
295

weixin_48148422's avatar
weixin_48148422 已提交
296 297 298 299 300 301 302 303 304
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int mseconds, void* param, tmr_ctrl_t* ctrl) {
  uintptr_t id = getNextTimerId();
  timer->id = id;
  timer->state = TIMER_STATE_WAITING;
  timer->fp = fp;
  timer->param = param;
  timer->ctrl = ctrl;
  addTimer(timer);

weixin_48148422's avatar
weixin_48148422 已提交
305
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
306
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
307 308 309 310 311

  if (mseconds == 0) {
    timer->wheel = tListLen(wheels);
    timerAddRef(timer);
    addToExpired(timer);
H
hzcheng 已提交
312
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
313
    addToWheel(timer, mseconds);
H
hzcheng 已提交
314 315
  }

weixin_48148422's avatar
weixin_48148422 已提交
316 317
  // note: use `timer->id` here is unsafe as `timer` may already be freed
  return id;
H
hzcheng 已提交
318 319
}

weixin_48148422's avatar
weixin_48148422 已提交
320 321 322 323 324
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return NULL;
  }
H
hzcheng 已提交
325

weixin_48148422's avatar
weixin_48148422 已提交
326 327
  tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t));
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
328
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
329 330
    return NULL;
  }
H
hzcheng 已提交
331

weixin_48148422's avatar
weixin_48148422 已提交
332 333
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
}
H
hzcheng 已提交
334

weixin_48148422's avatar
weixin_48148422 已提交
335
static void taosTimerLoopFunc(int signo) {
336
  int64_t now = taosGetMonotonicMs();
weixin_48148422's avatar
weixin_48148422 已提交
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381

  for (int i = 0; i < tListLen(wheels); i++) {
    // `expried` is a temporary expire list.
    // expired timers are first add to this list, then move
    // to expired queue as a batch to improve performance.
    // note this list is used as a stack in this function.
    tmr_obj_t* expired = NULL;

    time_wheel_t* wheel = wheels + i;
    while (now >= wheel->nextScanAt) {
      pthread_mutex_lock(&wheel->mutex);
      wheel->index = (wheel->index + 1) % wheel->size;
      tmr_obj_t* timer = wheel->slots[wheel->index];
      while (timer != NULL) {
        tmr_obj_t* next = timer->next;
        if (now < timer->expireAt) {
          timer = next;
          continue;
        }

        // remove from the wheel
        if (timer->prev == NULL) {
          wheel->slots[wheel->index] = next;
          if (next != NULL) {
            next->prev = NULL;
          }
        } else {
          timer->prev->next = next;
          if (next != NULL) {
            next->prev = timer->prev;
          }
        }
        timer->wheel = tListLen(wheels);

        // add to temporary expire list
        timer->next = expired;
        timer->prev = NULL;
        if (expired != NULL) {
          expired->prev = timer;
        }
        expired = timer;

        timer = next;
      }
      wheel->nextScanAt += wheel->resolution;
B
Bomin Zhang 已提交
382
      pthread_mutex_unlock(&wheel->mutex);
H
hzcheng 已提交
383 384
    }

weixin_48148422's avatar
weixin_48148422 已提交
385
    addToExpired(expired);
H
hzcheng 已提交
386
  }
weixin_48148422's avatar
weixin_48148422 已提交
387
}
H
hzcheng 已提交
388

weixin_48148422's avatar
weixin_48148422 已提交
389 390
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
391
    bool reusable = false;
weixin_48148422's avatar
weixin_48148422 已提交
392 393 394 395 396 397
    if (removeFromWheel(timer)) {
      removeTimer(timer->id);
      // only safe to reuse the timer when timer is removed from the wheel.
      // we cannot guarantee the thread safety of the timr in all other cases.
      reusable = true;
    }
weixin_48148422's avatar
weixin_48148422 已提交
398
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
399
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
400 401
    return reusable;
  }
402

weixin_48148422's avatar
weixin_48148422 已提交
403
  if (state != TIMER_STATE_EXPIRED) {
weixin_48148422's avatar
weixin_48148422 已提交
404
    // timer already stopped or cancelled, has nothing to do in this case
weixin_48148422's avatar
weixin_48148422 已提交
405 406
    return false;
  }
407

S
TD-2616  
Shengliang Guan 已提交
408
  if (timer->executedBy == taosGetSelfPthreadId()) {
weixin_48148422's avatar
weixin_48148422 已提交
409 410 411
    // taosTmrReset is called in the timer callback, should do nothing in this
    // case to avoid dead lock. note taosTmrReset must be the last statement
    // of the callback funtion, will be a bug otherwise.
weixin_48148422's avatar
weixin_48148422 已提交
412
    return false;
H
hzcheng 已提交
413
  }
414

weixin_48148422's avatar
weixin_48148422 已提交
415 416 417
  // timer callback is executing in another thread, we SHOULD wait it to stop,
  // BUT this may result in dead lock if current thread are holding a lock which
  // the timer callback need to acquire. so, we HAVE TO return directly.
weixin_48148422's avatar
weixin_48148422 已提交
418
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
419
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
420
  return false;
weixin_48148422's avatar
weixin_48148422 已提交
421
}
H
hzcheng 已提交
422

weixin_48148422's avatar
weixin_48148422 已提交
423 424
bool taosTmrStop(tmr_h timerId) {
  uintptr_t id = (uintptr_t)timerId;
H
hzcheng 已提交
425

weixin_48148422's avatar
weixin_48148422 已提交
426 427
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
428
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
weixin_48148422's avatar
weixin_48148422 已提交
429 430
    return false;
  }
H
hzcheng 已提交
431

weixin_48148422's avatar
weixin_48148422 已提交
432
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
433 434
  doStopTimer(timer, state);
  timerDecRef(timer);
H
hzcheng 已提交
435

weixin_48148422's avatar
weixin_48148422 已提交
436
  return state == TIMER_STATE_WAITING;
H
hzcheng 已提交
437 438
}

weixin_48148422's avatar
weixin_48148422 已提交
439 440 441 442
bool taosTmrStopA(tmr_h* timerId) {
  bool ret = taosTmrStop(*timerId);
  *timerId = NULL;
  return ret;
H
hzcheng 已提交
443 444
}

weixin_48148422's avatar
weixin_48148422 已提交
445 446 447
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, tmr_h* pTmrId) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
S
TD-1037  
Shengliang Guan 已提交
448
    return false;
H
hzcheng 已提交
449 450
  }

weixin_48148422's avatar
weixin_48148422 已提交
451 452 453 454
  uintptr_t  id = (uintptr_t)*pTmrId;
  bool       stopped = false;
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
455
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
H
hzcheng 已提交
456
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
457
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
458 459 460 461 462
    if (!doStopTimer(timer, state)) {
      timerDecRef(timer);
      timer = NULL;
    }
    stopped = state == TIMER_STATE_WAITING;
H
hzcheng 已提交
463 464
  }

weixin_48148422's avatar
weixin_48148422 已提交
465 466 467
  if (timer == NULL) {
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
    return stopped;
H
hzcheng 已提交
468 469
  }

470
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
weixin_48148422's avatar
weixin_48148422 已提交
471 472 473 474 475 476 477 478

  // wait until there's no other reference to this timer,
  // so that we can reuse this timer safely.
  for (int i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
    if (i % 1000 == 0) {
      sched_yield();
    }
  }
H
hzcheng 已提交
479

weixin_48148422's avatar
weixin_48148422 已提交
480 481 482
  assert(timer->refCount == 1);
  memset(timer, 0, sizeof(*timer));
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
H
hzcheng 已提交
483

weixin_48148422's avatar
weixin_48148422 已提交
484
  return stopped;
H
hzcheng 已提交
485 486
}

weixin_48148422's avatar
weixin_48148422 已提交
487
static void taosTmrModuleInit(void) {
S
Shengliang Guan 已提交
488
  tmrCtrls = malloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
489 490 491 492 493
  if (tmrCtrls == NULL) {
    tmrError("failed to allocate memory for timer controllers.");
    return;
  }

S
TD-1037  
Shengliang Guan 已提交
494
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
weixin_48148422's avatar
weixin_48148422 已提交
495 496 497
    tmr_ctrl_t* ctrl = tmrCtrls + i;
    ctrl->next = ctrl + 1;
  }
S
Shengliang Guan 已提交
498
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
499
  unusedTmrCtrl = tmrCtrls;
H
hzcheng 已提交
500

weixin_48148422's avatar
weixin_48148422 已提交
501 502
  pthread_mutex_init(&tmrCtrlMutex, NULL);

503
  int64_t now = taosGetMonotonicMs();
weixin_48148422's avatar
weixin_48148422 已提交
504 505 506 507 508 509 510 511 512 513 514 515 516 517
  for (int i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (pthread_mutex_init(&wheel->mutex, NULL) != 0) {
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
      return;
    }
    wheel->nextScanAt = now + wheel->resolution;
    wheel->index = 0;
    wheel->slots = (tmr_obj_t**)calloc(wheel->size, sizeof(tmr_obj_t*));
    if (wheel->slots == NULL) {
      tmrError("failed to allocate wheel slots");
      return;
    }
    timerMap.size += wheel->size;
H
hzcheng 已提交
518 519
  }

weixin_48148422's avatar
weixin_48148422 已提交
520 521 522 523 524 525
  timerMap.count = 0;
  timerMap.slots = (timer_list_t*)calloc(timerMap.size, sizeof(timer_list_t));
  if (timerMap.slots == NULL) {
    tmrError("failed to allocate hash map");
    return;
  }
H
hzcheng 已提交
526

weixin_48148422's avatar
weixin_48148422 已提交
527
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
weixin_48148422's avatar
weixin_48148422 已提交
528
  taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
H
hzcheng 已提交
529

530
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
weixin_48148422's avatar
weixin_48148422 已提交
531
}
H
hzcheng 已提交
532

weixin_48148422's avatar
weixin_48148422 已提交
533
void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) {
534 535 536
  const char* ret = taosMonotonicInit();
  tmrDebug("ttimer monotonic clock source:%s", ret);

weixin_48148422's avatar
weixin_48148422 已提交
537
  pthread_once(&tmrModuleInit, taosTmrModuleInit);
H
hzcheng 已提交
538

weixin_48148422's avatar
weixin_48148422 已提交
539 540 541 542
  pthread_mutex_lock(&tmrCtrlMutex);
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
  if (ctrl != NULL) {
    unusedTmrCtrl = ctrl->next;
543
    numOfTmrCtrl++;
H
hzcheng 已提交
544
  }
weixin_48148422's avatar
weixin_48148422 已提交
545 546 547
  pthread_mutex_unlock(&tmrCtrlMutex);

  if (ctrl == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
548
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
S
Shengliang Guan 已提交
549
    terrno = TSDB_CODE_OUT_OF_MEMORY;
weixin_48148422's avatar
weixin_48148422 已提交
550 551 552
    return NULL;
  }

B
Bomin Zhang 已提交
553
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
554
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
weixin_48148422's avatar
weixin_48148422 已提交
555
  return ctrl;
H
hzcheng 已提交
556 557
}

weixin_48148422's avatar
weixin_48148422 已提交
558 559
void taosTmrCleanUp(void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
weixin_48148422's avatar
weixin_48148422 已提交
560 561 562
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return;
  }
weixin_48148422's avatar
weixin_48148422 已提交
563

564
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
565
  ctrl->label[0] = 0;
H
hzcheng 已提交
566

weixin_48148422's avatar
weixin_48148422 已提交
567 568
  pthread_mutex_lock(&tmrCtrlMutex);
  ctrl->next = unusedTmrCtrl;
569
  numOfTmrCtrl--;
weixin_48148422's avatar
weixin_48148422 已提交
570 571
  unusedTmrCtrl = ctrl;
  pthread_mutex_unlock(&tmrCtrlMutex);
572 573

  if (numOfTmrCtrl <=0) {
weixin_48148422's avatar
weixin_48148422 已提交
574
    taosUninitTimer();
575

weixin_48148422's avatar
weixin_48148422 已提交
576 577
    taosCleanUpScheduler(tmrQhandle);

578 579
    for (int i = 0; i < tListLen(wheels); i++) {
      time_wheel_t* wheel = wheels + i;
580
      pthread_mutex_destroy(&wheel->mutex);
581 582 583
      free(wheel->slots);
    }

weixin_48148422's avatar
weixin_48148422 已提交
584 585 586 587 588 589 590 591 592 593 594 595
    pthread_mutex_destroy(&tmrCtrlMutex);

    for (size_t i = 0; i < timerMap.size; i++) {
      timer_list_t* list = timerMap.slots + i;
      tmr_obj_t* t = list->timers;
      while (t != NULL) {
        tmr_obj_t* next = t->mnext;
        free(t);
        t = next;
      }
    }
    free(timerMap.slots);
596 597
    free(tmrCtrls);

598
    tmrDebug("timer module is cleaned up");
599
  }
H
hzcheng 已提交
600
}