ttimer.c 18.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
fix bug  
dengyihao 已提交
16
#include "ttimer.h"
S
slguan 已提交
17
#include "os.h"
dengyihao's avatar
fix bug  
dengyihao 已提交
18
#include "taoserror.h"
H
hzcheng 已提交
19 20 21 22
#include "tlog.h"
#include "tsched.h"
#include "tutil.h"

dengyihao's avatar
fix bug  
dengyihao 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
#define tmrFatal(...)                                        \
  {                                                          \
    if (tmrDebugFlag & DEBUG_FATAL) {                        \
      taosPrintLog("TMR FATAL ", tmrDebugFlag, __VA_ARGS__); \
    }                                                        \
  }
#define tmrError(...)                                        \
  {                                                          \
    if (tmrDebugFlag & DEBUG_ERROR) {                        \
      taosPrintLog("TMR ERROR ", tmrDebugFlag, __VA_ARGS__); \
    }                                                        \
  }
#define tmrWarn(...)                                        \
  {                                                         \
    if (tmrDebugFlag & DEBUG_WARN) {                        \
      taosPrintLog("TMR WARN ", tmrDebugFlag, __VA_ARGS__); \
    }                                                       \
  }
#define tmrInfo(...)                                   \
  {                                                    \
    if (tmrDebugFlag & DEBUG_INFO) {                   \
      taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \
    }                                                  \
  }
#define tmrDebug(...)                                  \
  {                                                    \
    if (tmrDebugFlag & DEBUG_DEBUG) {                  \
      taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \
    }                                                  \
  }
#define tmrTrace(...)                                  \
  {                                                    \
    if (tmrDebugFlag & DEBUG_TRACE) {                  \
      taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \
    }                                                  \
  }
H
hzcheng 已提交
59

weixin_48148422's avatar
weixin_48148422 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72
#define TIMER_STATE_WAITING 0
#define TIMER_STATE_EXPIRED 1
#define TIMER_STATE_STOPPED 2
#define TIMER_STATE_CANCELED 3

typedef union _tmr_ctrl_t {
  char label[16];
  struct {
    // pad to ensure 'next' is the end of this union
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
    union _tmr_ctrl_t* next;
  };
} tmr_ctrl_t;
H
hzcheng 已提交
73

weixin_48148422's avatar
weixin_48148422 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86
typedef struct tmr_obj_t {
  uintptr_t         id;
  tmr_ctrl_t*       ctrl;
  struct tmr_obj_t* mnext;
  struct tmr_obj_t* prev;
  struct tmr_obj_t* next;
  uint16_t          slot;
  uint8_t           wheel;
  uint8_t           state;
  uint8_t           refCount;
  uint8_t           reserved1;
  uint16_t          reserved2;
  union {
87 88
    int64_t expireAt;
    int64_t executedBy;
weixin_48148422's avatar
weixin_48148422 已提交
89 90 91 92
  };
  TAOS_TMR_CALLBACK fp;
  void*             param;
} tmr_obj_t;
H
hzcheng 已提交
93

weixin_48148422's avatar
weixin_48148422 已提交
94
typedef struct timer_list_t {
95
  int64_t    lockedBy;
weixin_48148422's avatar
weixin_48148422 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
  tmr_obj_t* timers;
} timer_list_t;

typedef struct timer_map_t {
  uint32_t      size;
  uint32_t      count;
  timer_list_t* slots;
} timer_map_t;

typedef struct time_wheel_t {
  pthread_mutex_t mutex;
  int64_t         nextScanAt;
  uint32_t        resolution;
  uint16_t        size;
  uint16_t        index;
  tmr_obj_t**     slots;
} time_wheel_t;

S
Shengliang Guan 已提交
114
int32_t tsMaxTmrCtrl = 512;
weixin_48148422's avatar
weixin_48148422 已提交
115 116 117

static pthread_once_t  tmrModuleInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tmrCtrlMutex;
118
static tmr_ctrl_t*     tmrCtrls;
weixin_48148422's avatar
weixin_48148422 已提交
119
static tmr_ctrl_t*     unusedTmrCtrl = NULL;
120 121 122
static void*           tmrQhandle;
static int             numOfTmrCtrl = 0;

dengyihao's avatar
fix bug  
dengyihao 已提交
123
int              taosTmrThreads = 1;
weixin_48148422's avatar
weixin_48148422 已提交
124 125 126 127 128 129 130 131 132 133 134 135
static uintptr_t nextTimerId = 0;

static time_wheel_t wheels[] = {
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
    {.resolution = 1000, .size = 1024},
    {.resolution = 60000, .size = 1024},
};
static timer_map_t timerMap;

static uintptr_t getNextTimerId() {
  uintptr_t id;
  do {
weixin_48148422's avatar
weixin_48148422 已提交
136
    id = atomic_add_fetch_ptr(&nextTimerId, 1);
weixin_48148422's avatar
weixin_48148422 已提交
137 138
  } while (id == 0);
  return id;
H
hzcheng 已提交
139 140
}

weixin_48148422's avatar
weixin_48148422 已提交
141
static void timerAddRef(tmr_obj_t* timer) { atomic_add_fetch_8(&timer->refCount, 1); }
H
hzcheng 已提交
142

weixin_48148422's avatar
weixin_48148422 已提交
143
static void timerDecRef(tmr_obj_t* timer) {
weixin_48148422's avatar
weixin_48148422 已提交
144
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
weixin_48148422's avatar
weixin_48148422 已提交
145
    free(timer);
H
hzcheng 已提交
146
  }
weixin_48148422's avatar
weixin_48148422 已提交
147
}
H
hzcheng 已提交
148

weixin_48148422's avatar
weixin_48148422 已提交
149
static void lockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
150
  int64_t tid = taosGetSelfPthreadId();
dengyihao's avatar
fix bug  
dengyihao 已提交
151
  int     i = 0;
weixin_48148422's avatar
weixin_48148422 已提交
152
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
weixin_48148422's avatar
weixin_48148422 已提交
153 154 155
    if (++i % 1000 == 0) {
      sched_yield();
    }
H
hzcheng 已提交
156
  }
weixin_48148422's avatar
weixin_48148422 已提交
157
}
H
hzcheng 已提交
158

weixin_48148422's avatar
weixin_48148422 已提交
159
static void unlockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
160
  int64_t tid = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
161
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
weixin_48148422's avatar
weixin_48148422 已提交
162
    assert(false);
H
Hui Li 已提交
163
    tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
H
hzcheng 已提交
164 165 166
  }
}

weixin_48148422's avatar
weixin_48148422 已提交
167 168 169
static void addTimer(tmr_obj_t* timer) {
  timerAddRef(timer);
  timer->wheel = tListLen(wheels);
H
hzcheng 已提交
170

weixin_48148422's avatar
weixin_48148422 已提交
171 172
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
H
hzcheng 已提交
173

weixin_48148422's avatar
weixin_48148422 已提交
174 175 176 177 178
  lockTimerList(list);
  timer->mnext = list->timers;
  list->timers = timer;
  unlockTimerList(list);
}
H
hzcheng 已提交
179

weixin_48148422's avatar
weixin_48148422 已提交
180 181 182 183 184 185 186 187 188 189
static tmr_obj_t* findTimer(uintptr_t id) {
  tmr_obj_t* timer = NULL;
  if (id > 0) {
    uint32_t      idx = (uint32_t)(id % timerMap.size);
    timer_list_t* list = timerMap.slots + idx;
    lockTimerList(list);
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
      if (timer->id == id) {
        timerAddRef(timer);
        break;
H
hzcheng 已提交
190 191
      }
    }
weixin_48148422's avatar
weixin_48148422 已提交
192
    unlockTimerList(list);
H
hzcheng 已提交
193
  }
weixin_48148422's avatar
weixin_48148422 已提交
194
  return timer;
H
hzcheng 已提交
195 196
}

weixin_48148422's avatar
weixin_48148422 已提交
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
static void removeTimer(uintptr_t id) {
  tmr_obj_t*    prev = NULL;
  uint32_t      idx = (uint32_t)(id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
  lockTimerList(list);
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
    if (p->id == id) {
      if (prev == NULL) {
        list->timers = p->mnext;
      } else {
        prev->mnext = p->mnext;
      }
      timerDecRef(p);
      break;
    }
    prev = p;
H
hzcheng 已提交
213
  }
weixin_48148422's avatar
weixin_48148422 已提交
214 215
  unlockTimerList(list);
}
H
hzcheng 已提交
216

weixin_48148422's avatar
weixin_48148422 已提交
217 218 219 220 221 222 223 224 225
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
  timerAddRef(timer);
  // select a wheel for the timer, we are not an accurate timer,
  // but the inaccuracy should not be too large.
  timer->wheel = tListLen(wheels) - 1;
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (delay < wheel->resolution * wheel->size) {
      timer->wheel = i;
H
hzcheng 已提交
226 227 228 229
      break;
    }
  }

weixin_48148422's avatar
weixin_48148422 已提交
230 231
  time_wheel_t* wheel = wheels + timer->wheel;
  timer->prev = NULL;
232
  timer->expireAt = taosGetMonotonicMs() + delay;
H
hzcheng 已提交
233

weixin_48148422's avatar
weixin_48148422 已提交
234
  pthread_mutex_lock(&wheel->mutex);
H
hzcheng 已提交
235

weixin_48148422's avatar
weixin_48148422 已提交
236 237 238 239 240 241
  uint32_t idx = 0;
  if (timer->expireAt > wheel->nextScanAt) {
    // adjust delay according to next scan time of this wheel
    // so that the timer is not fired earlier than desired.
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
H
hzcheng 已提交
242 243
  }

weixin_48148422's avatar
weixin_48148422 已提交
244 245 246 247 248 249 250
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
  tmr_obj_t* p = wheel->slots[timer->slot];
  wheel->slots[timer->slot] = timer;
  timer->next = p;
  if (p != NULL) {
    p->prev = timer;
  }
H
hzcheng 已提交
251

weixin_48148422's avatar
weixin_48148422 已提交
252
  pthread_mutex_unlock(&wheel->mutex);
H
hzcheng 已提交
253 254
}

weixin_48148422's avatar
weixin_48148422 已提交
255
static bool removeFromWheel(tmr_obj_t* timer) {
256 257
  uint8_t wheelIdx = timer->wheel;
  if (wheelIdx >= tListLen(wheels)) {
weixin_48148422's avatar
weixin_48148422 已提交
258 259
    return false;
  }
260
  time_wheel_t* wheel = wheels + wheelIdx;
weixin_48148422's avatar
weixin_48148422 已提交
261 262 263 264 265 266 267

  bool removed = false;
  pthread_mutex_lock(&wheel->mutex);
  // other thread may modify timer->wheel, check again.
  if (timer->wheel < tListLen(wheels)) {
    if (timer->prev != NULL) {
      timer->prev->next = timer->next;
S
slguan 已提交
268
    }
weixin_48148422's avatar
weixin_48148422 已提交
269 270
    if (timer->next != NULL) {
      timer->next->prev = timer->prev;
S
slguan 已提交
271
    }
weixin_48148422's avatar
weixin_48148422 已提交
272 273 274 275 276 277 278 279
    if (timer == wheel->slots[timer->slot]) {
      wheel->slots[timer->slot] = timer->next;
    }
    timer->wheel = tListLen(wheels);
    timer->next = NULL;
    timer->prev = NULL;
    timerDecRef(timer);
    removed = true;
S
slguan 已提交
280
  }
weixin_48148422's avatar
weixin_48148422 已提交
281
  pthread_mutex_unlock(&wheel->mutex);
S
slguan 已提交
282

weixin_48148422's avatar
weixin_48148422 已提交
283
  return removed;
S
slguan 已提交
284 285
}

weixin_48148422's avatar
weixin_48148422 已提交
286 287
static void processExpiredTimer(void* handle, void* arg) {
  tmr_obj_t* timer = (tmr_obj_t*)handle;
S
TD-2616  
Shengliang Guan 已提交
288
  timer->executedBy = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
289
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
weixin_48148422's avatar
weixin_48148422 已提交
290
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
291
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
292
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
H
hzcheng 已提交
293

weixin_48148422's avatar
weixin_48148422 已提交
294 295
    (*timer->fp)(timer->param, (tmr_h)timer->id);
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
H
hzcheng 已提交
296

weixin_48148422's avatar
weixin_48148422 已提交
297
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
298
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
299 300 301 302
  }
  removeTimer(timer->id);
  timerDecRef(timer);
}
H
hzcheng 已提交
303

weixin_48148422's avatar
weixin_48148422 已提交
304
static void addToExpired(tmr_obj_t* head) {
weixin_48148422's avatar
weixin_48148422 已提交
305
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
H
hzcheng 已提交
306

weixin_48148422's avatar
weixin_48148422 已提交
307
  while (head != NULL) {
dengyihao's avatar
fix bug  
dengyihao 已提交
308
    uintptr_t  id = head->id;
weixin_48148422's avatar
weixin_48148422 已提交
309
    tmr_obj_t* next = head->next;
310
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
weixin_48148422's avatar
weixin_48148422 已提交
311

dengyihao's avatar
fix bug  
dengyihao 已提交
312
    SSchedMsg schedMsg;
weixin_48148422's avatar
weixin_48148422 已提交
313 314
    schedMsg.fp = NULL;
    schedMsg.tfp = processExpiredTimer;
H
Hui Li 已提交
315
    schedMsg.msg = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
316 317 318
    schedMsg.ahandle = head;
    schedMsg.thandle = NULL;
    taosScheduleTask(tmrQhandle, &schedMsg);
weixin_48148422's avatar
weixin_48148422 已提交
319

320
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
weixin_48148422's avatar
weixin_48148422 已提交
321 322 323
    head = next;
  }
}
H
hzcheng 已提交
324

weixin_48148422's avatar
weixin_48148422 已提交
325 326 327 328 329 330 331 332 333
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int mseconds, void* param, tmr_ctrl_t* ctrl) {
  uintptr_t id = getNextTimerId();
  timer->id = id;
  timer->state = TIMER_STATE_WAITING;
  timer->fp = fp;
  timer->param = param;
  timer->ctrl = ctrl;
  addTimer(timer);

weixin_48148422's avatar
weixin_48148422 已提交
334
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
335
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
336 337 338 339 340

  if (mseconds == 0) {
    timer->wheel = tListLen(wheels);
    timerAddRef(timer);
    addToExpired(timer);
H
hzcheng 已提交
341
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
342
    addToWheel(timer, mseconds);
H
hzcheng 已提交
343 344
  }

weixin_48148422's avatar
weixin_48148422 已提交
345 346
  // note: use `timer->id` here is unsafe as `timer` may already be freed
  return id;
H
hzcheng 已提交
347 348
}

weixin_48148422's avatar
weixin_48148422 已提交
349 350 351 352 353
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return NULL;
  }
H
hzcheng 已提交
354

weixin_48148422's avatar
weixin_48148422 已提交
355 356
  tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t));
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
357
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
358 359
    return NULL;
  }
H
hzcheng 已提交
360

weixin_48148422's avatar
weixin_48148422 已提交
361 362
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
}
H
hzcheng 已提交
363

weixin_48148422's avatar
weixin_48148422 已提交
364
static void taosTimerLoopFunc(int signo) {
365
  int64_t now = taosGetMonotonicMs();
weixin_48148422's avatar
weixin_48148422 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410

  for (int i = 0; i < tListLen(wheels); i++) {
    // `expried` is a temporary expire list.
    // expired timers are first add to this list, then move
    // to expired queue as a batch to improve performance.
    // note this list is used as a stack in this function.
    tmr_obj_t* expired = NULL;

    time_wheel_t* wheel = wheels + i;
    while (now >= wheel->nextScanAt) {
      pthread_mutex_lock(&wheel->mutex);
      wheel->index = (wheel->index + 1) % wheel->size;
      tmr_obj_t* timer = wheel->slots[wheel->index];
      while (timer != NULL) {
        tmr_obj_t* next = timer->next;
        if (now < timer->expireAt) {
          timer = next;
          continue;
        }

        // remove from the wheel
        if (timer->prev == NULL) {
          wheel->slots[wheel->index] = next;
          if (next != NULL) {
            next->prev = NULL;
          }
        } else {
          timer->prev->next = next;
          if (next != NULL) {
            next->prev = timer->prev;
          }
        }
        timer->wheel = tListLen(wheels);

        // add to temporary expire list
        timer->next = expired;
        timer->prev = NULL;
        if (expired != NULL) {
          expired->prev = timer;
        }
        expired = timer;

        timer = next;
      }
      wheel->nextScanAt += wheel->resolution;
B
Bomin Zhang 已提交
411
      pthread_mutex_unlock(&wheel->mutex);
H
hzcheng 已提交
412 413
    }

weixin_48148422's avatar
weixin_48148422 已提交
414
    addToExpired(expired);
H
hzcheng 已提交
415
  }
weixin_48148422's avatar
weixin_48148422 已提交
416
}
H
hzcheng 已提交
417

weixin_48148422's avatar
weixin_48148422 已提交
418 419
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
420
    bool reusable = false;
weixin_48148422's avatar
weixin_48148422 已提交
421 422 423 424 425 426
    if (removeFromWheel(timer)) {
      removeTimer(timer->id);
      // only safe to reuse the timer when timer is removed from the wheel.
      // we cannot guarantee the thread safety of the timr in all other cases.
      reusable = true;
    }
weixin_48148422's avatar
weixin_48148422 已提交
427
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
428
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
429 430
    return reusable;
  }
431

weixin_48148422's avatar
weixin_48148422 已提交
432
  if (state != TIMER_STATE_EXPIRED) {
weixin_48148422's avatar
weixin_48148422 已提交
433
    // timer already stopped or cancelled, has nothing to do in this case
weixin_48148422's avatar
weixin_48148422 已提交
434 435
    return false;
  }
436

S
TD-2616  
Shengliang Guan 已提交
437
  if (timer->executedBy == taosGetSelfPthreadId()) {
weixin_48148422's avatar
weixin_48148422 已提交
438 439 440
    // taosTmrReset is called in the timer callback, should do nothing in this
    // case to avoid dead lock. note taosTmrReset must be the last statement
    // of the callback funtion, will be a bug otherwise.
weixin_48148422's avatar
weixin_48148422 已提交
441
    return false;
H
hzcheng 已提交
442
  }
443

weixin_48148422's avatar
weixin_48148422 已提交
444 445 446
  // timer callback is executing in another thread, we SHOULD wait it to stop,
  // BUT this may result in dead lock if current thread are holding a lock which
  // the timer callback need to acquire. so, we HAVE TO return directly.
weixin_48148422's avatar
weixin_48148422 已提交
447
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
448
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
449
  return false;
weixin_48148422's avatar
weixin_48148422 已提交
450
}
H
hzcheng 已提交
451

weixin_48148422's avatar
weixin_48148422 已提交
452 453
bool taosTmrStop(tmr_h timerId) {
  uintptr_t id = (uintptr_t)timerId;
H
hzcheng 已提交
454

weixin_48148422's avatar
weixin_48148422 已提交
455 456
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
457
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
weixin_48148422's avatar
weixin_48148422 已提交
458 459
    return false;
  }
H
hzcheng 已提交
460

weixin_48148422's avatar
weixin_48148422 已提交
461
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
462 463
  doStopTimer(timer, state);
  timerDecRef(timer);
H
hzcheng 已提交
464

weixin_48148422's avatar
weixin_48148422 已提交
465
  return state == TIMER_STATE_WAITING;
H
hzcheng 已提交
466 467
}

weixin_48148422's avatar
weixin_48148422 已提交
468 469 470 471
bool taosTmrStopA(tmr_h* timerId) {
  bool ret = taosTmrStop(*timerId);
  *timerId = NULL;
  return ret;
H
hzcheng 已提交
472 473
}

weixin_48148422's avatar
weixin_48148422 已提交
474 475 476
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, tmr_h* pTmrId) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
S
TD-1037  
Shengliang Guan 已提交
477
    return false;
H
hzcheng 已提交
478 479
  }

weixin_48148422's avatar
weixin_48148422 已提交
480 481 482 483
  uintptr_t  id = (uintptr_t)*pTmrId;
  bool       stopped = false;
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
484
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
H
hzcheng 已提交
485
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
486
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
487 488 489 490 491
    if (!doStopTimer(timer, state)) {
      timerDecRef(timer);
      timer = NULL;
    }
    stopped = state == TIMER_STATE_WAITING;
H
hzcheng 已提交
492 493
  }

weixin_48148422's avatar
weixin_48148422 已提交
494 495 496
  if (timer == NULL) {
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
    return stopped;
H
hzcheng 已提交
497 498
  }

499
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
weixin_48148422's avatar
weixin_48148422 已提交
500 501 502 503 504 505 506 507

  // wait until there's no other reference to this timer,
  // so that we can reuse this timer safely.
  for (int i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
    if (i % 1000 == 0) {
      sched_yield();
    }
  }
H
hzcheng 已提交
508

weixin_48148422's avatar
weixin_48148422 已提交
509 510 511
  assert(timer->refCount == 1);
  memset(timer, 0, sizeof(*timer));
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
H
hzcheng 已提交
512

weixin_48148422's avatar
weixin_48148422 已提交
513
  return stopped;
H
hzcheng 已提交
514 515
}

weixin_48148422's avatar
weixin_48148422 已提交
516
static void taosTmrModuleInit(void) {
S
Shengliang Guan 已提交
517
  tmrCtrls = malloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
518 519 520 521 522
  if (tmrCtrls == NULL) {
    tmrError("failed to allocate memory for timer controllers.");
    return;
  }

dengyihao's avatar
fix bug  
dengyihao 已提交
523 524
  memset(&timerMap, 0, sizeof(timerMap));

S
TD-1037  
Shengliang Guan 已提交
525
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
weixin_48148422's avatar
weixin_48148422 已提交
526 527 528
    tmr_ctrl_t* ctrl = tmrCtrls + i;
    ctrl->next = ctrl + 1;
  }
S
Shengliang Guan 已提交
529
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
530
  unusedTmrCtrl = tmrCtrls;
H
hzcheng 已提交
531

weixin_48148422's avatar
weixin_48148422 已提交
532 533
  pthread_mutex_init(&tmrCtrlMutex, NULL);

534
  int64_t now = taosGetMonotonicMs();
weixin_48148422's avatar
weixin_48148422 已提交
535 536 537 538 539 540 541 542 543 544 545 546 547 548
  for (int i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (pthread_mutex_init(&wheel->mutex, NULL) != 0) {
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
      return;
    }
    wheel->nextScanAt = now + wheel->resolution;
    wheel->index = 0;
    wheel->slots = (tmr_obj_t**)calloc(wheel->size, sizeof(tmr_obj_t*));
    if (wheel->slots == NULL) {
      tmrError("failed to allocate wheel slots");
      return;
    }
    timerMap.size += wheel->size;
H
hzcheng 已提交
549 550
  }

weixin_48148422's avatar
weixin_48148422 已提交
551 552 553 554 555 556
  timerMap.count = 0;
  timerMap.slots = (timer_list_t*)calloc(timerMap.size, sizeof(timer_list_t));
  if (timerMap.slots == NULL) {
    tmrError("failed to allocate hash map");
    return;
  }
H
hzcheng 已提交
557

weixin_48148422's avatar
weixin_48148422 已提交
558
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
weixin_48148422's avatar
weixin_48148422 已提交
559
  taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
H
hzcheng 已提交
560

561
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
weixin_48148422's avatar
weixin_48148422 已提交
562
}
H
hzcheng 已提交
563

weixin_48148422's avatar
weixin_48148422 已提交
564
void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) {
565 566 567
  const char* ret = taosMonotonicInit();
  tmrDebug("ttimer monotonic clock source:%s", ret);

weixin_48148422's avatar
weixin_48148422 已提交
568
  pthread_once(&tmrModuleInit, taosTmrModuleInit);
H
hzcheng 已提交
569

weixin_48148422's avatar
weixin_48148422 已提交
570 571 572 573
  pthread_mutex_lock(&tmrCtrlMutex);
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
  if (ctrl != NULL) {
    unusedTmrCtrl = ctrl->next;
574
    numOfTmrCtrl++;
H
hzcheng 已提交
575
  }
weixin_48148422's avatar
weixin_48148422 已提交
576 577 578
  pthread_mutex_unlock(&tmrCtrlMutex);

  if (ctrl == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
579
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
S
Shengliang Guan 已提交
580
    terrno = TSDB_CODE_OUT_OF_MEMORY;
weixin_48148422's avatar
weixin_48148422 已提交
581 582 583
    return NULL;
  }

B
Bomin Zhang 已提交
584
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
585
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
weixin_48148422's avatar
weixin_48148422 已提交
586
  return ctrl;
H
hzcheng 已提交
587 588
}

weixin_48148422's avatar
weixin_48148422 已提交
589 590
void taosTmrCleanUp(void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
weixin_48148422's avatar
weixin_48148422 已提交
591 592 593
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return;
  }
weixin_48148422's avatar
weixin_48148422 已提交
594

595
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
596
  ctrl->label[0] = 0;
H
hzcheng 已提交
597

weixin_48148422's avatar
weixin_48148422 已提交
598 599
  pthread_mutex_lock(&tmrCtrlMutex);
  ctrl->next = unusedTmrCtrl;
600
  numOfTmrCtrl--;
weixin_48148422's avatar
weixin_48148422 已提交
601 602
  unusedTmrCtrl = ctrl;
  pthread_mutex_unlock(&tmrCtrlMutex);
603

dengyihao's avatar
fix bug  
dengyihao 已提交
604 605
  tmrDebug("time controller's tmr ctrl size:  %d", numOfTmrCtrl);
  if (numOfTmrCtrl <= 0) {
weixin_48148422's avatar
weixin_48148422 已提交
606
    taosUninitTimer();
607

weixin_48148422's avatar
weixin_48148422 已提交
608 609
    taosCleanUpScheduler(tmrQhandle);

610 611
    for (int i = 0; i < tListLen(wheels); i++) {
      time_wheel_t* wheel = wheels + i;
612
      pthread_mutex_destroy(&wheel->mutex);
613 614 615
      free(wheel->slots);
    }

weixin_48148422's avatar
weixin_48148422 已提交
616 617 618 619
    pthread_mutex_destroy(&tmrCtrlMutex);

    for (size_t i = 0; i < timerMap.size; i++) {
      timer_list_t* list = timerMap.slots + i;
dengyihao's avatar
fix bug  
dengyihao 已提交
620
      tmr_obj_t*    t = list->timers;
weixin_48148422's avatar
weixin_48148422 已提交
621 622 623 624 625 626 627
      while (t != NULL) {
        tmr_obj_t* next = t->mnext;
        free(t);
        t = next;
      }
    }
    free(timerMap.slots);
628 629
    free(tmrCtrls);

dengyihao's avatar
fix bug  
dengyihao 已提交
630 631 632
    tmrCtrls = NULL;
    unusedTmrCtrl = NULL;
    tmrModuleInit = PTHREAD_ONCE_INIT;  // to support restart
633
  }
H
hzcheng 已提交
634
}