ttimer.c 17.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20 21
#include "tlog.h"
#include "tsched.h"
#include "ttimer.h"
#include "tutil.h"

S
Shengliang Guan 已提交
22
extern int8_t tscEmbedded;
S
slguan 已提交
23

24 25
#define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
#define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
S
Shengliang Guan 已提交
26 27 28 29
#define tmrWarn(...)  { if (tmrDebugFlag & DEBUG_WARN)  { taosPrintLog("TMR WARN ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
#define tmrInfo(...)  { if (tmrDebugFlag & DEBUG_INFO)  { taosPrintLog("TMR ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
#define tmrDebug(...) { if (tmrDebugFlag & DEBUG_DEBUG) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrTrace(...) { if (tmrDebugFlag & DEBUG_TRACE) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }}
H
hzcheng 已提交
30

weixin_48148422's avatar
weixin_48148422 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43
#define TIMER_STATE_WAITING 0
#define TIMER_STATE_EXPIRED 1
#define TIMER_STATE_STOPPED 2
#define TIMER_STATE_CANCELED 3

typedef union _tmr_ctrl_t {
  char label[16];
  struct {
    // pad to ensure 'next' is the end of this union
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
    union _tmr_ctrl_t* next;
  };
} tmr_ctrl_t;
H
hzcheng 已提交
44

weixin_48148422's avatar
weixin_48148422 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57
typedef struct tmr_obj_t {
  uintptr_t         id;
  tmr_ctrl_t*       ctrl;
  struct tmr_obj_t* mnext;
  struct tmr_obj_t* prev;
  struct tmr_obj_t* next;
  uint16_t          slot;
  uint8_t           wheel;
  uint8_t           state;
  uint8_t           refCount;
  uint8_t           reserved1;
  uint16_t          reserved2;
  union {
58 59
    int64_t expireAt;
    int64_t executedBy;
weixin_48148422's avatar
weixin_48148422 已提交
60 61 62 63
  };
  TAOS_TMR_CALLBACK fp;
  void*             param;
} tmr_obj_t;
H
hzcheng 已提交
64

weixin_48148422's avatar
weixin_48148422 已提交
65
typedef struct timer_list_t {
66
  int64_t    lockedBy;
weixin_48148422's avatar
weixin_48148422 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  tmr_obj_t* timers;
} timer_list_t;

typedef struct timer_map_t {
  uint32_t      size;
  uint32_t      count;
  timer_list_t* slots;
} timer_map_t;

typedef struct time_wheel_t {
  pthread_mutex_t mutex;
  int64_t         nextScanAt;
  uint32_t        resolution;
  uint16_t        size;
  uint16_t        index;
  tmr_obj_t**     slots;
} time_wheel_t;

S
slguan 已提交
85
int32_t tmrDebugFlag = 131;
S
Shengliang Guan 已提交
86
uint32_t tsMaxTmrCtrl = 512;
weixin_48148422's avatar
weixin_48148422 已提交
87 88 89

static pthread_once_t  tmrModuleInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tmrCtrlMutex;
90
static tmr_ctrl_t*     tmrCtrls;
weixin_48148422's avatar
weixin_48148422 已提交
91
static tmr_ctrl_t*     unusedTmrCtrl = NULL;
92 93 94 95
static void*           tmrQhandle;
static int             numOfTmrCtrl = 0;

int taosTmrThreads = 1;
weixin_48148422's avatar
weixin_48148422 已提交
96 97 98 99 100 101 102 103 104 105 106 107
static uintptr_t nextTimerId = 0;

static time_wheel_t wheels[] = {
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
    {.resolution = 1000, .size = 1024},
    {.resolution = 60000, .size = 1024},
};
static timer_map_t timerMap;

static uintptr_t getNextTimerId() {
  uintptr_t id;
  do {
weixin_48148422's avatar
weixin_48148422 已提交
108
    id = atomic_add_fetch_ptr(&nextTimerId, 1);
weixin_48148422's avatar
weixin_48148422 已提交
109 110
  } while (id == 0);
  return id;
H
hzcheng 已提交
111 112
}

weixin_48148422's avatar
weixin_48148422 已提交
113
static void timerAddRef(tmr_obj_t* timer) { atomic_add_fetch_8(&timer->refCount, 1); }
H
hzcheng 已提交
114

weixin_48148422's avatar
weixin_48148422 已提交
115
static void timerDecRef(tmr_obj_t* timer) {
weixin_48148422's avatar
weixin_48148422 已提交
116
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
weixin_48148422's avatar
weixin_48148422 已提交
117
    free(timer);
H
hzcheng 已提交
118
  }
weixin_48148422's avatar
weixin_48148422 已提交
119
}
H
hzcheng 已提交
120

weixin_48148422's avatar
weixin_48148422 已提交
121
static void lockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
122
  int64_t tid = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
123
  int       i = 0;
weixin_48148422's avatar
weixin_48148422 已提交
124
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
weixin_48148422's avatar
weixin_48148422 已提交
125 126 127
    if (++i % 1000 == 0) {
      sched_yield();
    }
H
hzcheng 已提交
128
  }
weixin_48148422's avatar
weixin_48148422 已提交
129
}
H
hzcheng 已提交
130

weixin_48148422's avatar
weixin_48148422 已提交
131
static void unlockTimerList(timer_list_t* list) {
S
TD-2616  
Shengliang Guan 已提交
132
  int64_t tid = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
133
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
weixin_48148422's avatar
weixin_48148422 已提交
134
    assert(false);
H
Hui Li 已提交
135
    tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
H
hzcheng 已提交
136 137 138
  }
}

weixin_48148422's avatar
weixin_48148422 已提交
139 140 141
static void addTimer(tmr_obj_t* timer) {
  timerAddRef(timer);
  timer->wheel = tListLen(wheels);
H
hzcheng 已提交
142

weixin_48148422's avatar
weixin_48148422 已提交
143 144
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
H
hzcheng 已提交
145

weixin_48148422's avatar
weixin_48148422 已提交
146 147 148 149 150
  lockTimerList(list);
  timer->mnext = list->timers;
  list->timers = timer;
  unlockTimerList(list);
}
H
hzcheng 已提交
151

weixin_48148422's avatar
weixin_48148422 已提交
152 153 154 155 156 157 158 159 160 161
static tmr_obj_t* findTimer(uintptr_t id) {
  tmr_obj_t* timer = NULL;
  if (id > 0) {
    uint32_t      idx = (uint32_t)(id % timerMap.size);
    timer_list_t* list = timerMap.slots + idx;
    lockTimerList(list);
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
      if (timer->id == id) {
        timerAddRef(timer);
        break;
H
hzcheng 已提交
162 163
      }
    }
weixin_48148422's avatar
weixin_48148422 已提交
164
    unlockTimerList(list);
H
hzcheng 已提交
165
  }
weixin_48148422's avatar
weixin_48148422 已提交
166
  return timer;
H
hzcheng 已提交
167 168
}

weixin_48148422's avatar
weixin_48148422 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
static void removeTimer(uintptr_t id) {
  tmr_obj_t*    prev = NULL;
  uint32_t      idx = (uint32_t)(id % timerMap.size);
  timer_list_t* list = timerMap.slots + idx;
  lockTimerList(list);
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
    if (p->id == id) {
      if (prev == NULL) {
        list->timers = p->mnext;
      } else {
        prev->mnext = p->mnext;
      }
      timerDecRef(p);
      break;
    }
    prev = p;
H
hzcheng 已提交
185
  }
weixin_48148422's avatar
weixin_48148422 已提交
186 187
  unlockTimerList(list);
}
H
hzcheng 已提交
188

weixin_48148422's avatar
weixin_48148422 已提交
189 190 191 192 193 194 195 196 197
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
  timerAddRef(timer);
  // select a wheel for the timer, we are not an accurate timer,
  // but the inaccuracy should not be too large.
  timer->wheel = tListLen(wheels) - 1;
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (delay < wheel->resolution * wheel->size) {
      timer->wheel = i;
H
hzcheng 已提交
198 199 200 201
      break;
    }
  }

weixin_48148422's avatar
weixin_48148422 已提交
202 203
  time_wheel_t* wheel = wheels + timer->wheel;
  timer->prev = NULL;
204
  timer->expireAt = taosGetMonotonicMs() + delay;
H
hzcheng 已提交
205

weixin_48148422's avatar
weixin_48148422 已提交
206
  pthread_mutex_lock(&wheel->mutex);
H
hzcheng 已提交
207

weixin_48148422's avatar
weixin_48148422 已提交
208 209 210 211 212 213
  uint32_t idx = 0;
  if (timer->expireAt > wheel->nextScanAt) {
    // adjust delay according to next scan time of this wheel
    // so that the timer is not fired earlier than desired.
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
H
hzcheng 已提交
214 215
  }

weixin_48148422's avatar
weixin_48148422 已提交
216 217 218 219 220 221 222
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
  tmr_obj_t* p = wheel->slots[timer->slot];
  wheel->slots[timer->slot] = timer;
  timer->next = p;
  if (p != NULL) {
    p->prev = timer;
  }
H
hzcheng 已提交
223

weixin_48148422's avatar
weixin_48148422 已提交
224
  pthread_mutex_unlock(&wheel->mutex);
H
hzcheng 已提交
225 226
}

weixin_48148422's avatar
weixin_48148422 已提交
227
static bool removeFromWheel(tmr_obj_t* timer) {
228 229
  uint8_t wheelIdx = timer->wheel;
  if (wheelIdx >= tListLen(wheels)) {
weixin_48148422's avatar
weixin_48148422 已提交
230 231
    return false;
  }
232
  time_wheel_t* wheel = wheels + wheelIdx;
weixin_48148422's avatar
weixin_48148422 已提交
233 234 235 236 237 238 239

  bool removed = false;
  pthread_mutex_lock(&wheel->mutex);
  // other thread may modify timer->wheel, check again.
  if (timer->wheel < tListLen(wheels)) {
    if (timer->prev != NULL) {
      timer->prev->next = timer->next;
S
slguan 已提交
240
    }
weixin_48148422's avatar
weixin_48148422 已提交
241 242
    if (timer->next != NULL) {
      timer->next->prev = timer->prev;
S
slguan 已提交
243
    }
weixin_48148422's avatar
weixin_48148422 已提交
244 245 246 247 248 249 250 251
    if (timer == wheel->slots[timer->slot]) {
      wheel->slots[timer->slot] = timer->next;
    }
    timer->wheel = tListLen(wheels);
    timer->next = NULL;
    timer->prev = NULL;
    timerDecRef(timer);
    removed = true;
S
slguan 已提交
252
  }
weixin_48148422's avatar
weixin_48148422 已提交
253
  pthread_mutex_unlock(&wheel->mutex);
S
slguan 已提交
254

weixin_48148422's avatar
weixin_48148422 已提交
255
  return removed;
S
slguan 已提交
256 257
}

weixin_48148422's avatar
weixin_48148422 已提交
258 259
static void processExpiredTimer(void* handle, void* arg) {
  tmr_obj_t* timer = (tmr_obj_t*)handle;
S
TD-2616  
Shengliang Guan 已提交
260
  timer->executedBy = taosGetSelfPthreadId();
weixin_48148422's avatar
weixin_48148422 已提交
261
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
weixin_48148422's avatar
weixin_48148422 已提交
262
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
263
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
264
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
H
hzcheng 已提交
265

weixin_48148422's avatar
weixin_48148422 已提交
266 267
    (*timer->fp)(timer->param, (tmr_h)timer->id);
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
H
hzcheng 已提交
268

weixin_48148422's avatar
weixin_48148422 已提交
269
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
270
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
271 272 273 274
  }
  removeTimer(timer->id);
  timerDecRef(timer);
}
H
hzcheng 已提交
275

weixin_48148422's avatar
weixin_48148422 已提交
276
static void addToExpired(tmr_obj_t* head) {
weixin_48148422's avatar
weixin_48148422 已提交
277
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
H
hzcheng 已提交
278

weixin_48148422's avatar
weixin_48148422 已提交
279
  while (head != NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
280
    uintptr_t id = head->id;
weixin_48148422's avatar
weixin_48148422 已提交
281
    tmr_obj_t* next = head->next;
282
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
weixin_48148422's avatar
weixin_48148422 已提交
283

weixin_48148422's avatar
weixin_48148422 已提交
284 285 286
    SSchedMsg  schedMsg;
    schedMsg.fp = NULL;
    schedMsg.tfp = processExpiredTimer;
H
Hui Li 已提交
287
    schedMsg.msg = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
288 289 290
    schedMsg.ahandle = head;
    schedMsg.thandle = NULL;
    taosScheduleTask(tmrQhandle, &schedMsg);
weixin_48148422's avatar
weixin_48148422 已提交
291

292
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
weixin_48148422's avatar
weixin_48148422 已提交
293 294 295
    head = next;
  }
}
H
hzcheng 已提交
296

weixin_48148422's avatar
weixin_48148422 已提交
297 298 299 300 301 302 303 304 305
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int mseconds, void* param, tmr_ctrl_t* ctrl) {
  uintptr_t id = getNextTimerId();
  timer->id = id;
  timer->state = TIMER_STATE_WAITING;
  timer->fp = fp;
  timer->param = param;
  timer->ctrl = ctrl;
  addTimer(timer);

weixin_48148422's avatar
weixin_48148422 已提交
306
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
307
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
308 309 310 311 312

  if (mseconds == 0) {
    timer->wheel = tListLen(wheels);
    timerAddRef(timer);
    addToExpired(timer);
H
hzcheng 已提交
313
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
314
    addToWheel(timer, mseconds);
H
hzcheng 已提交
315 316
  }

weixin_48148422's avatar
weixin_48148422 已提交
317 318
  // note: use `timer->id` here is unsafe as `timer` may already be freed
  return id;
H
hzcheng 已提交
319 320
}

weixin_48148422's avatar
weixin_48148422 已提交
321 322 323 324 325
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return NULL;
  }
H
hzcheng 已提交
326

weixin_48148422's avatar
weixin_48148422 已提交
327 328
  tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t));
  if (timer == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
329
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
330 331
    return NULL;
  }
H
hzcheng 已提交
332

weixin_48148422's avatar
weixin_48148422 已提交
333 334
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
}
H
hzcheng 已提交
335

weixin_48148422's avatar
weixin_48148422 已提交
336
static void taosTimerLoopFunc(int signo) {
337
  int64_t now = taosGetMonotonicMs();
weixin_48148422's avatar
weixin_48148422 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382

  for (int i = 0; i < tListLen(wheels); i++) {
    // `expried` is a temporary expire list.
    // expired timers are first add to this list, then move
    // to expired queue as a batch to improve performance.
    // note this list is used as a stack in this function.
    tmr_obj_t* expired = NULL;

    time_wheel_t* wheel = wheels + i;
    while (now >= wheel->nextScanAt) {
      pthread_mutex_lock(&wheel->mutex);
      wheel->index = (wheel->index + 1) % wheel->size;
      tmr_obj_t* timer = wheel->slots[wheel->index];
      while (timer != NULL) {
        tmr_obj_t* next = timer->next;
        if (now < timer->expireAt) {
          timer = next;
          continue;
        }

        // remove from the wheel
        if (timer->prev == NULL) {
          wheel->slots[wheel->index] = next;
          if (next != NULL) {
            next->prev = NULL;
          }
        } else {
          timer->prev->next = next;
          if (next != NULL) {
            next->prev = timer->prev;
          }
        }
        timer->wheel = tListLen(wheels);

        // add to temporary expire list
        timer->next = expired;
        timer->prev = NULL;
        if (expired != NULL) {
          expired->prev = timer;
        }
        expired = timer;

        timer = next;
      }
      wheel->nextScanAt += wheel->resolution;
B
Bomin Zhang 已提交
383
      pthread_mutex_unlock(&wheel->mutex);
H
hzcheng 已提交
384 385
    }

weixin_48148422's avatar
weixin_48148422 已提交
386
    addToExpired(expired);
H
hzcheng 已提交
387
  }
weixin_48148422's avatar
weixin_48148422 已提交
388
}
H
hzcheng 已提交
389

weixin_48148422's avatar
weixin_48148422 已提交
390 391
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
  if (state == TIMER_STATE_WAITING) {
weixin_48148422's avatar
weixin_48148422 已提交
392
    bool reusable = false;
weixin_48148422's avatar
weixin_48148422 已提交
393 394 395 396 397 398
    if (removeFromWheel(timer)) {
      removeTimer(timer->id);
      // only safe to reuse the timer when timer is removed from the wheel.
      // we cannot guarantee the thread safety of the timr in all other cases.
      reusable = true;
    }
weixin_48148422's avatar
weixin_48148422 已提交
399
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
400
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
401 402
    return reusable;
  }
403

weixin_48148422's avatar
weixin_48148422 已提交
404
  if (state != TIMER_STATE_EXPIRED) {
weixin_48148422's avatar
weixin_48148422 已提交
405
    // timer already stopped or cancelled, has nothing to do in this case
weixin_48148422's avatar
weixin_48148422 已提交
406 407
    return false;
  }
408

S
TD-2616  
Shengliang Guan 已提交
409
  if (timer->executedBy == taosGetSelfPthreadId()) {
weixin_48148422's avatar
weixin_48148422 已提交
410 411 412
    // taosTmrReset is called in the timer callback, should do nothing in this
    // case to avoid dead lock. note taosTmrReset must be the last statement
    // of the callback funtion, will be a bug otherwise.
weixin_48148422's avatar
weixin_48148422 已提交
413
    return false;
H
hzcheng 已提交
414
  }
415

weixin_48148422's avatar
weixin_48148422 已提交
416 417 418
  // timer callback is executing in another thread, we SHOULD wait it to stop,
  // BUT this may result in dead lock if current thread are holding a lock which
  // the timer callback need to acquire. so, we HAVE TO return directly.
weixin_48148422's avatar
weixin_48148422 已提交
419
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
420
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
weixin_48148422's avatar
weixin_48148422 已提交
421
  return false;
weixin_48148422's avatar
weixin_48148422 已提交
422
}
H
hzcheng 已提交
423

weixin_48148422's avatar
weixin_48148422 已提交
424 425
bool taosTmrStop(tmr_h timerId) {
  uintptr_t id = (uintptr_t)timerId;
H
hzcheng 已提交
426

weixin_48148422's avatar
weixin_48148422 已提交
427 428
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
429
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
weixin_48148422's avatar
weixin_48148422 已提交
430 431
    return false;
  }
H
hzcheng 已提交
432

weixin_48148422's avatar
weixin_48148422 已提交
433
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
434 435
  doStopTimer(timer, state);
  timerDecRef(timer);
H
hzcheng 已提交
436

weixin_48148422's avatar
weixin_48148422 已提交
437
  return state == TIMER_STATE_WAITING;
H
hzcheng 已提交
438 439
}

weixin_48148422's avatar
weixin_48148422 已提交
440 441 442 443
bool taosTmrStopA(tmr_h* timerId) {
  bool ret = taosTmrStop(*timerId);
  *timerId = NULL;
  return ret;
H
hzcheng 已提交
444 445
}

weixin_48148422's avatar
weixin_48148422 已提交
446 447 448
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, tmr_h* pTmrId) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
  if (ctrl == NULL || ctrl->label[0] == 0) {
S
TD-1037  
Shengliang Guan 已提交
449
    return false;
H
hzcheng 已提交
450 451
  }

weixin_48148422's avatar
weixin_48148422 已提交
452 453 454 455
  uintptr_t  id = (uintptr_t)*pTmrId;
  bool       stopped = false;
  tmr_obj_t* timer = findTimer(id);
  if (timer == NULL) {
456
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
H
hzcheng 已提交
457
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
458
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
weixin_48148422's avatar
weixin_48148422 已提交
459 460 461 462 463
    if (!doStopTimer(timer, state)) {
      timerDecRef(timer);
      timer = NULL;
    }
    stopped = state == TIMER_STATE_WAITING;
H
hzcheng 已提交
464 465
  }

weixin_48148422's avatar
weixin_48148422 已提交
466 467 468
  if (timer == NULL) {
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
    return stopped;
H
hzcheng 已提交
469 470
  }

471
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
weixin_48148422's avatar
weixin_48148422 已提交
472 473 474 475 476 477 478 479

  // wait until there's no other reference to this timer,
  // so that we can reuse this timer safely.
  for (int i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
    if (i % 1000 == 0) {
      sched_yield();
    }
  }
H
hzcheng 已提交
480

weixin_48148422's avatar
weixin_48148422 已提交
481 482 483
  assert(timer->refCount == 1);
  memset(timer, 0, sizeof(*timer));
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
H
hzcheng 已提交
484

weixin_48148422's avatar
weixin_48148422 已提交
485
  return stopped;
H
hzcheng 已提交
486 487
}

weixin_48148422's avatar
weixin_48148422 已提交
488
static void taosTmrModuleInit(void) {
S
Shengliang Guan 已提交
489
  tmrCtrls = malloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
490 491 492 493 494
  if (tmrCtrls == NULL) {
    tmrError("failed to allocate memory for timer controllers.");
    return;
  }

S
TD-1037  
Shengliang Guan 已提交
495
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
weixin_48148422's avatar
weixin_48148422 已提交
496 497 498
    tmr_ctrl_t* ctrl = tmrCtrls + i;
    ctrl->next = ctrl + 1;
  }
S
Shengliang Guan 已提交
499
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
weixin_48148422's avatar
weixin_48148422 已提交
500
  unusedTmrCtrl = tmrCtrls;
H
hzcheng 已提交
501

weixin_48148422's avatar
weixin_48148422 已提交
502 503
  pthread_mutex_init(&tmrCtrlMutex, NULL);

504
  int64_t now = taosGetMonotonicMs();
weixin_48148422's avatar
weixin_48148422 已提交
505 506 507 508 509 510 511 512 513 514 515 516 517 518
  for (int i = 0; i < tListLen(wheels); i++) {
    time_wheel_t* wheel = wheels + i;
    if (pthread_mutex_init(&wheel->mutex, NULL) != 0) {
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
      return;
    }
    wheel->nextScanAt = now + wheel->resolution;
    wheel->index = 0;
    wheel->slots = (tmr_obj_t**)calloc(wheel->size, sizeof(tmr_obj_t*));
    if (wheel->slots == NULL) {
      tmrError("failed to allocate wheel slots");
      return;
    }
    timerMap.size += wheel->size;
H
hzcheng 已提交
519 520
  }

weixin_48148422's avatar
weixin_48148422 已提交
521 522 523 524 525 526
  timerMap.count = 0;
  timerMap.slots = (timer_list_t*)calloc(timerMap.size, sizeof(timer_list_t));
  if (timerMap.slots == NULL) {
    tmrError("failed to allocate hash map");
    return;
  }
H
hzcheng 已提交
527

weixin_48148422's avatar
weixin_48148422 已提交
528
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
weixin_48148422's avatar
weixin_48148422 已提交
529
  taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
H
hzcheng 已提交
530

531
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
weixin_48148422's avatar
weixin_48148422 已提交
532
}
H
hzcheng 已提交
533

weixin_48148422's avatar
weixin_48148422 已提交
534 535
void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) {
  pthread_once(&tmrModuleInit, taosTmrModuleInit);
H
hzcheng 已提交
536

weixin_48148422's avatar
weixin_48148422 已提交
537 538 539 540
  pthread_mutex_lock(&tmrCtrlMutex);
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
  if (ctrl != NULL) {
    unusedTmrCtrl = ctrl->next;
541
    numOfTmrCtrl++;
H
hzcheng 已提交
542
  }
weixin_48148422's avatar
weixin_48148422 已提交
543 544 545
  pthread_mutex_unlock(&tmrCtrlMutex);

  if (ctrl == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
546
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
weixin_48148422's avatar
weixin_48148422 已提交
547 548 549
    return NULL;
  }

B
Bomin Zhang 已提交
550
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
551
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
weixin_48148422's avatar
weixin_48148422 已提交
552
  return ctrl;
H
hzcheng 已提交
553 554
}

weixin_48148422's avatar
weixin_48148422 已提交
555 556
void taosTmrCleanUp(void* handle) {
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
weixin_48148422's avatar
weixin_48148422 已提交
557 558 559
  if (ctrl == NULL || ctrl->label[0] == 0) {
    return;
  }
weixin_48148422's avatar
weixin_48148422 已提交
560

561
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
weixin_48148422's avatar
weixin_48148422 已提交
562
  ctrl->label[0] = 0;
H
hzcheng 已提交
563

weixin_48148422's avatar
weixin_48148422 已提交
564 565
  pthread_mutex_lock(&tmrCtrlMutex);
  ctrl->next = unusedTmrCtrl;
566
  numOfTmrCtrl--;
weixin_48148422's avatar
weixin_48148422 已提交
567 568
  unusedTmrCtrl = ctrl;
  pthread_mutex_unlock(&tmrCtrlMutex);
569 570

  if (numOfTmrCtrl <=0) {
weixin_48148422's avatar
weixin_48148422 已提交
571
    taosUninitTimer();
572

weixin_48148422's avatar
weixin_48148422 已提交
573 574
    taosCleanUpScheduler(tmrQhandle);

575 576
    for (int i = 0; i < tListLen(wheels); i++) {
      time_wheel_t* wheel = wheels + i;
577
      pthread_mutex_destroy(&wheel->mutex);
578 579 580
      free(wheel->slots);
    }

weixin_48148422's avatar
weixin_48148422 已提交
581 582 583 584 585 586 587 588 589 590 591 592
    pthread_mutex_destroy(&tmrCtrlMutex);

    for (size_t i = 0; i < timerMap.size; i++) {
      timer_list_t* list = timerMap.slots + i;
      tmr_obj_t* t = list->timers;
      while (t != NULL) {
        tmr_obj_t* next = t->mnext;
        free(t);
        t = next;
      }
    }
    free(timerMap.slots);
593 594
    free(tmrCtrls);

595
    tmrDebug("timer module is cleaned up");
596
  }
H
hzcheng 已提交
597
}