未验证 提交 33160b6b 编写于 作者: S slguan 提交者: GitHub

Merge pull request #759 from taosdata/hotfix/tbase-1217

fix TBASE-1217
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "os.h" #include "os.h"
#include <inttypes.h>
#include "tlog.h" #include "tlog.h"
#include "tsched.h" #include "tsched.h"
#include "ttime.h" #include "ttime.h"
...@@ -253,13 +254,13 @@ static void processExpiredTimer(void* handle, void* arg) { ...@@ -253,13 +254,13 @@ static void processExpiredTimer(void* handle, void* arg) {
timer->executedBy = taosGetPthreadId(); timer->executedBy = taosGetPthreadId();
uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED); uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
if (state == TIMER_STATE_WAITING) { if (state == TIMER_STATE_WAITING) {
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] execution start."; const char* fmt = "%s timer[id=" PRIuPTR ", fp=%p, param=%p] execution start.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
(*timer->fp)(timer->param, (tmr_h)timer->id); (*timer->fp)(timer->param, (tmr_h)timer->id);
atomic_store_8(&timer->state, TIMER_STATE_STOPPED); atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
fmt = "%s timer[id=%lld, fp=%p, param=%p] execution end."; fmt = "%s timer[id=" PRIuPTR ", fp=%p, param=%p] execution end.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
} }
removeTimer(timer->id); removeTimer(timer->id);
...@@ -267,7 +268,7 @@ static void processExpiredTimer(void* handle, void* arg) { ...@@ -267,7 +268,7 @@ static void processExpiredTimer(void* handle, void* arg) {
} }
static void addToExpired(tmr_obj_t* head) { static void addToExpired(tmr_obj_t* head) {
const char* fmt = "%s adding expired timer[id=%lld, fp=%p, param=%p] to queue."; const char* fmt = "%s adding expired timer[id=" PRIuPTR ", fp=%p, param=%p] to queue.";
while (head != NULL) { while (head != NULL) {
uintptr_t id = head->id; uintptr_t id = head->id;
...@@ -281,7 +282,7 @@ static void addToExpired(tmr_obj_t* head) { ...@@ -281,7 +282,7 @@ static void addToExpired(tmr_obj_t* head) {
schedMsg.thandle = NULL; schedMsg.thandle = NULL;
taosScheduleTask(tmrQhandle, &schedMsg); taosScheduleTask(tmrQhandle, &schedMsg);
tmrTrace("timer[id=%lld] has been added to queue.", id); tmrTrace("timer[id=" PRIuPTR "] has been added to queue.", id);
head = next; head = next;
} }
} }
...@@ -295,7 +296,7 @@ static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int msecon ...@@ -295,7 +296,7 @@ static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int msecon
timer->ctrl = ctrl; timer->ctrl = ctrl;
addTimer(timer); addTimer(timer);
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] started"; const char* fmt = "%s timer[id=" PRIuPTR ", fp=%p, param=%p] started";
tmrTrace(fmt, ctrl->label, timer->id, timer->fp, timer->param); tmrTrace(fmt, ctrl->label, timer->id, timer->fp, timer->param);
if (mseconds == 0) { if (mseconds == 0) {
...@@ -380,40 +381,37 @@ static void taosTimerLoopFunc(int signo) { ...@@ -380,40 +381,37 @@ static void taosTimerLoopFunc(int signo) {
} }
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) { static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
bool reusable = false;
if (state == TIMER_STATE_WAITING) { if (state == TIMER_STATE_WAITING) {
bool reusable = false;
if (removeFromWheel(timer)) { if (removeFromWheel(timer)) {
removeTimer(timer->id); removeTimer(timer->id);
// only safe to reuse the timer when timer is removed from the wheel. // only safe to reuse the timer when timer is removed from the wheel.
// we cannot guarantee the thread safety of the timr in all other cases. // we cannot guarantee the thread safety of the timr in all other cases.
reusable = true; reusable = true;
} }
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] is cancelled."; const char* fmt = "%s timer[id=" PRIuPTR ", fp=%p, param=%p] is cancelled.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
} else if (state != TIMER_STATE_EXPIRED) { return reusable;
}
if (state != TIMER_STATE_EXPIRED) {
// timer already stopped or cancelled, has nothing to do in this case // timer already stopped or cancelled, has nothing to do in this case
} else if (timer->executedBy == taosGetPthreadId()) { return false;
}
if (timer->executedBy == taosGetPthreadId()) {
// taosTmrReset is called in the timer callback, should do nothing in this // taosTmrReset is called in the timer callback, should do nothing in this
// case to avoid dead lock. note taosTmrReset must be the last statement // case to avoid dead lock. note taosTmrReset must be the last statement
// of the callback funtion, will be a bug otherwise. // of the callback funtion, will be a bug otherwise.
} else { return false;
assert(timer->executedBy != taosGetPthreadId());
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] fired, waiting...";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
for (int i = 1; atomic_load_8(&timer->state) != TIMER_STATE_STOPPED; i++) {
if (i % 1000 == 0) {
sched_yield();
}
}
fmt = "%s timer[id=%lld, fp=%p, param=%p] stopped.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
} }
return reusable; // timer callback is executing in another thread, we SHOULD wait it to stop,
// BUT this may result in dead lock if current thread are holding a lock which
// the timer callback need to acquire. so, we HAVE TO return directly.
const char* fmt = "%s timer[id=" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
return false;
} }
bool taosTmrStop(tmr_h timerId) { bool taosTmrStop(tmr_h timerId) {
...@@ -421,7 +419,7 @@ bool taosTmrStop(tmr_h timerId) { ...@@ -421,7 +419,7 @@ bool taosTmrStop(tmr_h timerId) {
tmr_obj_t* timer = findTimer(id); tmr_obj_t* timer = findTimer(id);
if (timer == NULL) { if (timer == NULL) {
tmrTrace("timer[id=%lld] does not exist", id); tmrTrace("timer[id=" PRIuPTR "] does not exist", id);
return false; return false;
} }
...@@ -448,7 +446,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, ...@@ -448,7 +446,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle,
bool stopped = false; bool stopped = false;
tmr_obj_t* timer = findTimer(id); tmr_obj_t* timer = findTimer(id);
if (timer == NULL) { if (timer == NULL) {
tmrTrace("%s timer[id=%lld] does not exist", ctrl->label, id); tmrTrace("%s timer[id=" PRIuPTR "] does not exist", ctrl->label, id);
} else { } else {
uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
if (!doStopTimer(timer, state)) { if (!doStopTimer(timer, state)) {
...@@ -463,7 +461,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, ...@@ -463,7 +461,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle,
return stopped; return stopped;
} }
tmrTrace("%s timer[id=%lld] is reused", ctrl->label, timer->id); tmrTrace("%s timer[id=" PRIuPTR "] is reused", ctrl->label, timer->id);
// wait until there's no other reference to this timer, // wait until there's no other reference to this timer,
// so that we can reuse this timer safely. // so that we can reuse this timer safely.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册