ttimer.c 17.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tlog.h"
#include "tsched.h"
weixin_48148422's avatar
weixin_48148422 已提交
19
#include "ttime.h"
H
hzcheng 已提交
20 21 22
#include "ttimer.h"
#include "tutil.h"

S
slguan 已提交
23 24 25 26 27 28 29
#define tmrError(...)                                 \
  do { if (tmrDebugFlag & DEBUG_ERROR) {              \
    taosPrintLog("ERROR TMR ", tmrDebugFlag, __VA_ARGS__); \
  } } while(0)

#define tmrWarn(...)                                  \
  do { if (tmrDebugFlag & DEBUG_WARN) {               \
S
Shengliang Guan 已提交
30
    taosPrintLog("WARN TMR ", tmrDebugFlag, __VA_ARGS__); \
S
slguan 已提交
31 32 33 34 35 36
  } } while(0)

#define tmrTrace(...)                           \
  do { if (tmrDebugFlag & DEBUG_TRACE) {        \
    taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \
  } } while(0)
H
hzcheng 已提交
37

weixin_48148422's avatar
weixin_48148422 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
#define TIMER_STATE_WAITING 0
#define TIMER_STATE_EXPIRED 1
#define TIMER_STATE_STOPPED 2
#define TIMER_STATE_CANCELED 3

typedef union _tmr_ctrl_t {
  char label[16];
  struct {
    // pad to ensure 'next' is the end of this union
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
    union _tmr_ctrl_t* next;
  };
} tmr_ctrl_t;
H
hzcheng 已提交
51

weixin_48148422's avatar
weixin_48148422 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64
typedef struct tmr_obj_t {
  uintptr_t         id;
  tmr_ctrl_t*       ctrl;
  struct tmr_obj_t* mnext;
  struct tmr_obj_t* prev;
  struct tmr_obj_t* next;
  uint16_t          slot;
  uint8_t           wheel;
  uint8_t           state;
  uint8_t           refCount;
  uint8_t           reserved1;
  uint16_t          reserved2;
  union {
65 66
    int64_t expireAt;
    int64_t executedBy;
weixin_48148422's avatar
weixin_48148422 已提交
67 68 69 70
  };
  TAOS_TMR_CALLBACK fp;
  void*             param;
} tmr_obj_t;
H
hzcheng 已提交
71

weixin_48148422's avatar
weixin_48148422 已提交
72
typedef struct timer_list_t {
73
  int64_t    lockedBy;
weixin_48148422's avatar
weixin_48148422 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  tmr_obj_t* timers;
} timer_list_t;

typedef struct timer_map_t {
  uint32_t      size;
  uint32_t      count;
  timer_list_t* slots;
} timer_map_t;

typedef struct time_wheel_t {
  pthread_mutex_t mutex;
  int64_t         nextScanAt;
  uint32_t        resolution;
  uint16_t        size;
  uint16_t        index;
  tmr_obj_t**     slots;
} time_wheel_t;

S
slguan 已提交
92
int32_t tmrDebugFlag = 131;
93
uint32_t taosMaxTmrCtrl = 512;
weixin_48148422's avatar
weixin_48148422 已提交
94 95 96

static pthread_once_t  tmrModuleInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tmrCtrlMutex;
97
static tmr_ctrl_t*     tmrCtrls;
weixin_48148422's avatar
weixin_48148422 已提交
98
static tmr_ctrl_t*     unusedTmrCtrl = NULL;
99 100 101 102
static void*           tmrQhandle;
static int             numOfTmrCtrl = 0;

int taosTmrThreads = 1;
weixin_48148422's avatar
weixin_48148422 已提交
103 104 105 106 107 108 109 110 111 112 113 114
static uintptr_t nextTimerId = 0;

static time_wheel_t wheels[] = {
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
    {.resolution = 1000, .size = 1024},
    {.resolution = 60000, .size = 1024},
};
static timer_map_t timerMap;

static uintptr_t getNextTimerId() {
  uintptr_t id;
  do {
weixin_48148422's avatar
weixin_48148422 已提交
115
    id = atomic_add_fetch_ptr(&nextTimerId, 1);
weixin_48148422's avatar
weixin_48148422 已提交
116 117
  } while (id == 0);
  return id;
H
hzcheng 已提交
118 119
}

weixin_48148422's avatar
weixin_48148422 已提交
120
static void timerAddRef(tmr_obj_t* timer) { atomic_add_fetch_8(&timer->refCount, 1); }
H
hzcheng 已提交
121

weixin_48148422's avatar
weixin_48148422 已提交
122
static void timerDecRef(tmr_obj_t* timer) {
weixin_48148422's avatar
weixin_48148422 已提交
123
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
weixin_48148422's avatar
weixin_48148422 已提交
124
    free(timer);
H
hzcheng 已提交
125
  }
weixin_48148422's avatar
weixin_48148422 已提交
126
}
H
hzcheng 已提交
127

weixin_48148422's avatar
weixin_48148422 已提交
128
static void lockTimerList(timer_list_t* list) {
129
  int64_t tid = taosGetPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
130
  int       i = 0;
weixin_48148422's avatar
weixin_48148422 已提交
131
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
weixin_48148422's avatar
weixin_48148422 已提交
132 133 134
    if (++i % 1000 == 0) {
      sched_yield();
    }
H
hzcheng 已提交
135
  }
weixin_48148422's avatar
weixin_48148422 已提交
136
}
H
hzcheng 已提交
137

weixin_48148422's avatar
weixin_48148422 已提交
138
static void unlockTimerList(timer_list_t* list) {
139
  int64_t tid = taosGetPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
140
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
weixin_48148422's avatar
weixin_48148422 已提交
141
    assert(false);
H
Hui Li 已提交
142
    tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
H
hzcheng 已提交
143 144 145
  }
}

weixin_48148422's avatar
weixin_48148422 已提交
146 147 148
static void addTimer(tmr_obj_t* timer) {
  timerAddRef(timer);
  timer->wheel = tListLen(wheels);
H
hzcheng 已提交
149

weixin_48148422's avatar
weixin_48148422 已提交
150 151
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
H
hzcheng 已提交
152

weixin_48148422's avatar
weixin_48148422 已提交
153 154 155 156 157
  lockTimerList(list);
  timer->mnext = list->timers;
  list->timers = timer;
  unlockTimerList(list);
}
H
hzcheng 已提交
158

weixin_48148422's avatar
weixin_48148422 已提交
159 160 161 162 163 164 165 166 167 168
static tmr_obj_t* findTimer(uintptr_t id) {
  tmr_obj_t* timer = NULL;
  if (id > 0) {
    uint32_t      idx = (uint32_t)(id % timerMap.size);
    timer_list_t* list = timerMap.slots + idx;
    lockTimerList(list);
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
      if (timer->id == id) {
        timerAddRef(timer);
        break;
H
hzcheng 已提交
169 170
      }
    }
weixin_48148422's avatar
weixin_48148422 已提交
171
    unlockTimerList(list);
H
hzcheng 已提交
172
  }
weixin_48148422's avatar
weixin_48148422 已提交
173
  return timer;
H
hzcheng 已提交
174 175
}

weixin_48148422's avatar
weixin_48148422 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
static void removeTimer(uintptr_t id) {
  tmr_obj_t*    prev = NULL;
  uint32_t      idx = (uint32_t)(id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
  lockTimerList(list);
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
    if (p->id == id) {
      if (prev == NULL) {
        list->timers = p->mnext;
      } else {
        prev->mnext = p->mnext;
      }
      timerDecRef(p);
      break;
    }
    prev = p;
H
hzcheng 已提交
192
  }
weixin_48148422's avatar
weixin_48148422 已提交
193 194
  unlockTimerList(list);
}
H
hzcheng 已提交
195

weixin_48148422's avatar
weixin_48148422 已提交
196 197 198 199 200 201 202 203 204
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
  timerAddRef(timer);
  // select a wheel for the timer, we are not an accurate timer,
  // but the inaccuracy should not be too large.
  timer->wheel = tListLen(wheels) - 1;
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (delay < wheel->resolution * wheel->size) {
      timer->wheel = i;
H
hzcheng 已提交
205 206 207 208
      break;
    }
  }

weixin_48148422's avatar
weixin_48148422 已提交
209 210 211
  time_wheel_t* wheel = wheels + timer->wheel;
  timer->prev = NULL;
  timer->expireAt = taosGetTimestampMs() + delay;
H
hzcheng 已提交
212

weixin_48148422's avatar
weixin_48148422 已提交
213
  pthread_mutex_lock(&wheel->mutex);
H
hzcheng 已提交
214

weixin_48148422's avatar
weixin_48148422 已提交
215 216 217 218 219 220
  uint32_t idx = 0;
  if (timer->expireAt > wheel->nextScanAt) {
    // adjust delay according to next scan time of this wheel
    // so that the timer is not fired earlier than desired.
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
H
hzcheng 已提交
221 222
  }

weixin_48148422's avatar
weixin_48148422 已提交
223 224 225 226 227 228 229
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
  tmr_obj_t* p = wheel->slots[timer->slot];
  wheel->slots[timer->slot] = timer;
  timer->next = p;
  if (p != NULL) {
    p->prev = timer;
  }
H
hzcheng 已提交
230

weixin_48148422's avatar
weixin_48148422 已提交
231
  pthread_mutex_unlock(&wheel->mutex);
H
hzcheng 已提交
232 233
}

weixin_48148422's avatar
weixin_48148422 已提交
234 235 236 237 238 239 240 241 242 243 244 245
static bool removeFromWheel(tmr_obj_t* timer) {
  if (timer->wheel >= tListLen(wheels)) {
    return false;
  }
  time_wheel_t* wheel = wheels + timer->wheel;

  bool removed = false;
  pthread_mutex_lock(&wheel->mutex);
  // other thread may modify timer->wheel, check again.
  if (timer->wheel < tListLen(wheels)) {
    if (timer->prev != NULL) {
      timer->prev->next = timer->next;
S
slguan 已提交
246
    }
weixin_48148422's avatar
weixin_48148422 已提交
247 248
    if (timer->next != NULL) {
      timer->next->prev = timer->prev;
S
slguan 已提交
249
    }
weixin_48148422's avatar
weixin_48148422 已提交
250 251 252 253 254 255 256 257
    if (timer == wheel->slots[timer->slot]) {
      wheel->slots[timer->slot] = timer->next;
    }
    timer->wheel = tListLen(wheels);
    timer->next = NULL;
    timer->prev = NULL;
    timerDecRef(timer);
    removed = true;
S
slguan 已提交
258
  }
weixin_48148422's avatar
weixin_48148422 已提交
259
  pthread_mutex_unlock(&wheel->mutex);
S
slguan 已提交
260

weixin_48148422's avatar
weixin_48148422 已提交
261
  return removed;
S
slguan 已提交
262 263
}

weixin_48148422's avatar
weixin_48148422 已提交
264 265
static void processExpiredTimer(void* handle, void* arg) {
  tmr_obj_t* timer = (tmr_obj_t*)handle;
266
  timer->executedBy = taosGetPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
267
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
weixin_48148422's avatar
weixin_48148422 已提交
268
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
269
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
weixin_48148422's avatar
weixin_48148422 已提交
270
    tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
H
hzcheng 已提交
271

weixin_48148422's avatar
weixin_48148422 已提交
272 273
    (*timer->fp)(timer->param, (tmr_h)timer->id);
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
H
hzcheng 已提交
274

weixin_48148422's avatar
weixin_48148422 已提交
275
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
weixin_48148422's avatar
weixin_48148422 已提交
276 277 278 279 280
    tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
  }
  removeTimer(timer->id);
  timerDecRef(timer);
}
H
hzcheng 已提交
281

weixin_48148422's avatar
weixin_48148422 已提交
282
static void addToExpired(tmr_obj_t* head) {
weixin_48148422's avatar
weixin_48148422 已提交
283
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
H
hzcheng 已提交
284

weixin_48148422's avatar
weixin_48148422 已提交
285
  while (head != NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
286
    uintptr_t id = head->id;
weixin_48148422's avatar
weixin_48148422 已提交
287
    tmr_obj_t* next = head->next;
weixin_48148422's avatar
weixin_48148422 已提交
288 289
    tmrTrace(fmt, head->ctrl->label, id, head->fp, head->param);

weixin_48148422's avatar
weixin_48148422 已提交
290 291 292
    SSchedMsg  schedMsg;
    schedMsg.fp = NULL;
    schedMsg.tfp = processExpiredTimer;
H
Hui Li 已提交
293
    schedMsg.msg = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
294 295 296
    schedMsg.ahandle = head;
    schedMsg.thandle = NULL;
    taosScheduleTask(tmrQhandle, &schedMsg);
weixin_48148422's avatar
weixin_48148422 已提交
297

weixin_48148422's avatar
weixin_48148422 已提交
298
    tmrTrace("timer[id=%" PRIuPTR "] has been added to queue.", id);
weixin_48148422's avatar
weixin_48148422 已提交
299 300 301
    head = next;
  }
}
H
hzcheng 已提交
302

weixin_48148422's avatar
weixin_48148422 已提交
303 304 305 306 307 308 309 310 311
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int mseconds, void* param, tmr_ctrl_t* ctrl) {
  uintptr_t id = getNextTimerId();
  timer->id = id;
  timer->state = TIMER_STATE_WAITING;
  timer->fp = fp;
  timer->param = param;
  timer->ctrl = ctrl;
  addTimer(timer);

weixin_48148422's avatar
weixin_48148422 已提交
312
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
weixin_48148422's avatar
weixin_48148422 已提交
313 314 315 316 317 318
  tmrTrace(fmt, ctrl->label, timer->id, timer->fp, timer->param);

  if (mseconds == 0) {
    timer->wheel = tListLen(wheels);
    timerAddRef(timer);
    addToExpired(timer);
H
hzcheng 已提交
319
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
320
    addToWheel(timer, mseconds);
H
hzcheng 已提交
321 322
  }

weixin_48148422's avatar
weixin_48148422 已提交
323 324
  // note: use `timer->id` here is unsafe as `timer` may already be freed
  return id;
H
hzcheng 已提交
325 326
}

weixin_48148422's avatar
weixin_48148422 已提交
327 328 329 330 331
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return NULL;
  }
H
hzcheng 已提交
332

weixin_48148422's avatar
weixin_48148422 已提交
333 334
  tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t));
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
335
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
336 337
    return NULL;
  }
H
hzcheng 已提交
338

weixin_48148422's avatar
weixin_48148422 已提交
339 340
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
}
H
hzcheng 已提交
341

weixin_48148422's avatar
weixin_48148422 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
static void taosTimerLoopFunc(int signo) {
  int64_t now = taosGetTimestampMs();

  for (int i = 0; i < tListLen(wheels); i++) {
    // `expried` is a temporary expire list.
    // expired timers are first add to this list, then move
    // to expired queue as a batch to improve performance.
    // note this list is used as a stack in this function.
    tmr_obj_t* expired = NULL;

    time_wheel_t* wheel = wheels + i;
    while (now >= wheel->nextScanAt) {
      pthread_mutex_lock(&wheel->mutex);
      wheel->index = (wheel->index + 1) % wheel->size;
      tmr_obj_t* timer = wheel->slots[wheel->index];
      while (timer != NULL) {
        tmr_obj_t* next = timer->next;
        if (now < timer->expireAt) {
          timer = next;
          continue;
        }

        // remove from the wheel
        if (timer->prev == NULL) {
          wheel->slots[wheel->index] = next;
          if (next != NULL) {
            next->prev = NULL;
          }
        } else {
          timer->prev->next = next;
          if (next != NULL) {
            next->prev = timer->prev;
          }
        }
        timer->wheel = tListLen(wheels);

        // add to temporary expire list
        timer->next = expired;
        timer->prev = NULL;
        if (expired != NULL) {
          expired->prev = timer;
        }
        expired = timer;

        timer = next;
      }
      pthread_mutex_unlock(&wheel->mutex);
      wheel->nextScanAt += wheel->resolution;
H
hzcheng 已提交
390 391
    }

weixin_48148422's avatar
weixin_48148422 已提交
392
    addToExpired(expired);
H
hzcheng 已提交
393
  }
weixin_48148422's avatar
weixin_48148422 已提交
394
}
H
hzcheng 已提交
395

weixin_48148422's avatar
weixin_48148422 已提交
396 397
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
398
    bool reusable = false;
weixin_48148422's avatar
weixin_48148422 已提交
399 400 401 402 403 404
    if (removeFromWheel(timer)) {
      removeTimer(timer->id);
      // only safe to reuse the timer when timer is removed from the wheel.
      // we cannot guarantee the thread safety of the timr in all other cases.
      reusable = true;
    }
weixin_48148422's avatar
weixin_48148422 已提交
405
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
weixin_48148422's avatar
weixin_48148422 已提交
406
    tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
407 408
    return reusable;
  }
409

weixin_48148422's avatar
weixin_48148422 已提交
410
  if (state != TIMER_STATE_EXPIRED) {
weixin_48148422's avatar
weixin_48148422 已提交
411
    // timer already stopped or cancelled, has nothing to do in this case
weixin_48148422's avatar
weixin_48148422 已提交
412 413
    return false;
  }
414

weixin_48148422's avatar
weixin_48148422 已提交
415
  if (timer->executedBy == taosGetPthreadId()) {
weixin_48148422's avatar
weixin_48148422 已提交
416 417 418
    // taosTmrReset is called in the timer callback, should do nothing in this
    // case to avoid dead lock. note taosTmrReset must be the last statement
    // of the callback funtion, will be a bug otherwise.
weixin_48148422's avatar
weixin_48148422 已提交
419
    return false;
H
hzcheng 已提交
420
  }
421

weixin_48148422's avatar
weixin_48148422 已提交
422 423 424
  // timer callback is executing in another thread, we SHOULD wait it to stop,
  // BUT this may result in dead lock if current thread are holding a lock which
  // the timer callback need to acquire. so, we HAVE TO return directly.
weixin_48148422's avatar
weixin_48148422 已提交
425
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
weixin_48148422's avatar
weixin_48148422 已提交
426 427
  tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
  return false;
weixin_48148422's avatar
weixin_48148422 已提交
428
}
H
hzcheng 已提交
429

weixin_48148422's avatar
weixin_48148422 已提交
430 431
bool taosTmrStop(tmr_h timerId) {
  uintptr_t id = (uintptr_t)timerId;
H
hzcheng 已提交
432

weixin_48148422's avatar
weixin_48148422 已提交
433 434
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
435
    tmrTrace("timer[id=%" PRIuPTR "] does not exist", id);
weixin_48148422's avatar
weixin_48148422 已提交
436 437
    return false;
  }
H
hzcheng 已提交
438

weixin_48148422's avatar
weixin_48148422 已提交
439
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
440 441
  doStopTimer(timer, state);
  timerDecRef(timer);
H
hzcheng 已提交
442

weixin_48148422's avatar
weixin_48148422 已提交
443
  return state == TIMER_STATE_WAITING;
H
hzcheng 已提交
444 445
}

weixin_48148422's avatar
weixin_48148422 已提交
446 447 448 449
bool taosTmrStopA(tmr_h* timerId) {
  bool ret = taosTmrStop(*timerId);
  *timerId = NULL;
  return ret;
H
hzcheng 已提交
450 451
}

weixin_48148422's avatar
weixin_48148422 已提交
452 453 454
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, tmr_h* pTmrId) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
H
hzcheng 已提交
455 456 457
    return NULL;
  }

weixin_48148422's avatar
weixin_48148422 已提交
458 459 460 461
  uintptr_t  id = (uintptr_t)*pTmrId;
  bool       stopped = false;
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
462
    tmrTrace("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
H
hzcheng 已提交
463
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
464
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
465 466 467 468 469
    if (!doStopTimer(timer, state)) {
      timerDecRef(timer);
      timer = NULL;
    }
    stopped = state == TIMER_STATE_WAITING;
H
hzcheng 已提交
470 471
  }

weixin_48148422's avatar
weixin_48148422 已提交
472 473 474
  if (timer == NULL) {
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
    return stopped;
H
hzcheng 已提交
475 476
  }

weixin_48148422's avatar
weixin_48148422 已提交
477
  tmrTrace("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
weixin_48148422's avatar
weixin_48148422 已提交
478 479 480 481 482 483 484 485

  // wait until there's no other reference to this timer,
  // so that we can reuse this timer safely.
  for (int i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
    if (i % 1000 == 0) {
      sched_yield();
    }
  }
H
hzcheng 已提交
486

weixin_48148422's avatar
weixin_48148422 已提交
487 488 489
  assert(timer->refCount == 1);
  memset(timer, 0, sizeof(*timer));
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
H
hzcheng 已提交
490

weixin_48148422's avatar
weixin_48148422 已提交
491
  return stopped;
H
hzcheng 已提交
492 493
}

weixin_48148422's avatar
weixin_48148422 已提交
494
static void taosTmrModuleInit(void) {
495 496 497 498 499 500 501
  tmrCtrls = malloc(sizeof(tmr_ctrl_t) * taosMaxTmrCtrl);
  if (tmrCtrls == NULL) {
    tmrError("failed to allocate memory for timer controllers.");
    return;
  }

  for (int i = 0; i < taosMaxTmrCtrl - 1; ++i) {
weixin_48148422's avatar
weixin_48148422 已提交
502 503 504
    tmr_ctrl_t* ctrl = tmrCtrls + i;
    ctrl->next = ctrl + 1;
  }
505
  (tmrCtrls + taosMaxTmrCtrl - 1)->next = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
506
  unusedTmrCtrl = tmrCtrls;
H
hzcheng 已提交
507

weixin_48148422's avatar
weixin_48148422 已提交
508 509 510 511 512 513 514 515 516
  pthread_mutex_init(&tmrCtrlMutex, NULL);

  int64_t now = taosGetTimestampMs();
  for (int i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (pthread_mutex_init(&wheel->mutex, NULL) != 0) {
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
      return;
    }
H
Hui Li 已提交
517
    pthread_mutex_lock(&wheel->mutex);
weixin_48148422's avatar
weixin_48148422 已提交
518 519 520 521 522
    wheel->nextScanAt = now + wheel->resolution;
    wheel->index = 0;
    wheel->slots = (tmr_obj_t**)calloc(wheel->size, sizeof(tmr_obj_t*));
    if (wheel->slots == NULL) {
      tmrError("failed to allocate wheel slots");
H
Hui Li 已提交
523
      pthread_mutex_unlock(&wheel->mutex);
weixin_48148422's avatar
weixin_48148422 已提交
524 525 526
      return;
    }
    timerMap.size += wheel->size;
H
Hui Li 已提交
527
    pthread_mutex_unlock(&wheel->mutex);
H
hzcheng 已提交
528 529
  }

weixin_48148422's avatar
weixin_48148422 已提交
530 531 532 533 534 535
  timerMap.count = 0;
  timerMap.slots = (timer_list_t*)calloc(timerMap.size, sizeof(timer_list_t));
  if (timerMap.slots == NULL) {
    tmrError("failed to allocate hash map");
    return;
  }
H
hzcheng 已提交
536

weixin_48148422's avatar
weixin_48148422 已提交
537
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
weixin_48148422's avatar
weixin_48148422 已提交
538
  taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
H
hzcheng 已提交
539

weixin_48148422's avatar
weixin_48148422 已提交
540 541
  tmrTrace("timer module is initialized, number of threads: %d", taosTmrThreads);
}
H
hzcheng 已提交
542

weixin_48148422's avatar
weixin_48148422 已提交
543 544
void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) {
  pthread_once(&tmrModuleInit, taosTmrModuleInit);
H
hzcheng 已提交
545

weixin_48148422's avatar
weixin_48148422 已提交
546 547 548 549
  pthread_mutex_lock(&tmrCtrlMutex);
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
  if (ctrl != NULL) {
    unusedTmrCtrl = ctrl->next;
550
    numOfTmrCtrl++;
H
hzcheng 已提交
551
  }
weixin_48148422's avatar
weixin_48148422 已提交
552 553 554
  pthread_mutex_unlock(&tmrCtrlMutex);

  if (ctrl == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
555
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
weixin_48148422's avatar
weixin_48148422 已提交
556 557 558
    return NULL;
  }

B
Bomin Zhang 已提交
559
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
weixin_48148422's avatar
weixin_48148422 已提交
560
  tmrTrace("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
weixin_48148422's avatar
weixin_48148422 已提交
561
  return ctrl;
H
hzcheng 已提交
562 563
}

weixin_48148422's avatar
weixin_48148422 已提交
564 565
void taosTmrCleanUp(void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
weixin_48148422's avatar
weixin_48148422 已提交
566 567 568
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return;
  }
weixin_48148422's avatar
weixin_48148422 已提交
569

weixin_48148422's avatar
weixin_48148422 已提交
570
  tmrTrace("%s timer controller is cleaned up.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
571
  ctrl->label[0] = 0;
H
hzcheng 已提交
572

weixin_48148422's avatar
weixin_48148422 已提交
573 574
  pthread_mutex_lock(&tmrCtrlMutex);
  ctrl->next = unusedTmrCtrl;
575
  numOfTmrCtrl--;
weixin_48148422's avatar
weixin_48148422 已提交
576 577
  unusedTmrCtrl = ctrl;
  pthread_mutex_unlock(&tmrCtrlMutex);
578 579

  if (numOfTmrCtrl <=0) {
weixin_48148422's avatar
weixin_48148422 已提交
580
    taosUninitTimer();
581

weixin_48148422's avatar
weixin_48148422 已提交
582 583
    taosCleanUpScheduler(tmrQhandle);

584 585
    for (int i = 0; i < tListLen(wheels); i++) {
      time_wheel_t* wheel = wheels + i;
586
      pthread_mutex_destroy(&wheel->mutex);
587 588 589
      free(wheel->slots);
    }

weixin_48148422's avatar
weixin_48148422 已提交
590 591 592 593 594 595 596 597 598 599 600 601
    pthread_mutex_destroy(&tmrCtrlMutex);

    for (size_t i = 0; i < timerMap.size; i++) {
      timer_list_t* list = timerMap.slots + i;
      tmr_obj_t* t = list->timers;
      while (t != NULL) {
        tmr_obj_t* next = t->mnext;
        free(t);
        t = next;
      }
    }
    free(timerMap.slots);
602 603 604 605
    free(tmrCtrls);

    tmrTrace("timer module is cleaned up");
  }
H
hzcheng 已提交
606
}