提交 47ae0423 编写于 作者: S slguan

Merge branch 'master' of https://github.com/taosdata/TDengine

...@@ -78,8 +78,13 @@ typedef struct { ...@@ -78,8 +78,13 @@ typedef struct {
int count; int count;
} tmr_list_t; } tmr_list_t;
#define TMR_CTRL_STAGE_UNUSED 0
#define TMR_CTRL_STAGE_IN_USE 1
#define TMR_CTRL_STAGE_CLEANNING_UP 2
typedef struct _tmr_ctrl_t { typedef struct _tmr_ctrl_t {
void * signature; int stage; /* life cycle stage of this tmr ctrl */
pthread_mutex_t mutex; /* mutex to protect critical resource */ pthread_mutex_t mutex; /* mutex to protect critical resource */
int resolution; /* resolution in mseconds */ int resolution; /* resolution in mseconds */
int numOfPeriods; /* total number of periods */ int numOfPeriods; /* total number of periods */
...@@ -110,15 +115,37 @@ void *taosTimerLoopFunc(int signo) { ...@@ -110,15 +115,37 @@ void *taosTimerLoopFunc(int signo) {
for (int i = 1; i < maxNumOfTmrCtrl; ++i) { for (int i = 1; i < maxNumOfTmrCtrl; ++i) {
pCtrl = tmrCtrl + i; pCtrl = tmrCtrl + i;
if (pCtrl->signature) { // save 'stage' to a local variable so that all later code can
count++; // use the same 'stage'. acquire semantic is required to ensure
// 'stage' is load before other 'pCtrl' fields.
int stage = __atomic_load_n(&pCtrl->stage, __ATOMIC_ACQUIRE);
if (stage == TMR_CTRL_STAGE_IN_USE) {
pCtrl->ticks++; pCtrl->ticks++;
if (pCtrl->ticks >= pCtrl->maxTicks) { if (pCtrl->ticks >= pCtrl->maxTicks) {
taosTmrProcessList(pCtrl); taosTmrProcessList(pCtrl);
pCtrl->ticks = 0; pCtrl->ticks = 0;
} }
if (count >= numOfTmrCtrl) break; } else if (stage == TMR_CTRL_STAGE_CLEANNING_UP) {
__atomic_store_n(&pCtrl->stage, TMR_CTRL_STAGE_UNUSED, __ATOMIC_RELEASE);
pthread_mutex_destroy(&pCtrl->mutex);
tfree(pCtrl->tmrList);
tmrMemPoolCleanUp(pCtrl->poolHandle);
// decrease 'numOfTmrCtrl', need to be atomic but relaxed semantic is fine
// because we don't need (unable to guarantee either) an accurate counter.
int num = __atomic_add_fetch(&numOfTmrCtrl, -1, __ATOMIC_RELAXED);
tmrTrace("%s is cleaned up, numOfTmrCtrls:%d", pCtrl->label, num);
// return 'id' to the poool and then this timer controller can be reused,
// this must be the last step.
taosFreeId(tmrIdPool, pCtrl->tmrCtrlId);
} else {
continue;
} }
if (++count >= __atomic_load_n(&numOfTmrCtrl, __ATOMIC_RELAXED))
break;
} }
return NULL; return NULL;
...@@ -206,9 +233,6 @@ void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) { ...@@ -206,9 +233,6 @@ void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) {
} }
pCtrl = tmrCtrl + tmrCtrlId; pCtrl = tmrCtrl + tmrCtrlId;
tfree(pCtrl->tmrList);
tmrMemPoolCleanUp(pCtrl->poolHandle);
memset(pCtrl, 0, sizeof(tmr_ctrl_t)); memset(pCtrl, 0, sizeof(tmr_ctrl_t));
pCtrl->tmrCtrlId = tmrCtrlId; pCtrl->tmrCtrlId = tmrCtrlId;
...@@ -217,18 +241,25 @@ void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) { ...@@ -217,18 +241,25 @@ void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) {
if ((pCtrl->poolHandle = tmrMemPoolInit(maxNumOfTmrs + 10, sizeof(tmr_obj_t))) == NULL) { if ((pCtrl->poolHandle = tmrMemPoolInit(maxNumOfTmrs + 10, sizeof(tmr_obj_t))) == NULL) {
tmrError("%s failed to allocate mem pool", label); tmrError("%s failed to allocate mem pool", label);
tmrMemPoolCleanUp(pCtrl->poolHandle); taosFreeId(tmrIdPool, tmrCtrlId);
return NULL; return NULL;
} }
if (resolution < MSECONDS_PER_TICK) resolution = MSECONDS_PER_TICK; if (resolution < MSECONDS_PER_TICK) resolution = MSECONDS_PER_TICK;
pCtrl->resolution = resolution; pCtrl->resolution = resolution;
pCtrl->maxTicks = resolution / MSECONDS_PER_TICK; pCtrl->maxTicks = resolution / MSECONDS_PER_TICK;
pCtrl->ticks = rand() / pCtrl->maxTicks; pCtrl->ticks = rand() % pCtrl->maxTicks;
pCtrl->numOfPeriods = longest / resolution; pCtrl->numOfPeriods = longest / resolution;
if (pCtrl->numOfPeriods < 10) pCtrl->numOfPeriods = 10; if (pCtrl->numOfPeriods < 10) pCtrl->numOfPeriods = 10;
pCtrl->tmrList = (tmr_list_t *)malloc(sizeof(tmr_list_t) * pCtrl->numOfPeriods); pCtrl->tmrList = (tmr_list_t *)malloc(sizeof(tmr_list_t) * pCtrl->numOfPeriods);
if (pCtrl->tmrList == NULL) {
tmrError("%s failed to allocate tmrList", label);
tmrMemPoolCleanUp(pCtrl->poolHandle);
taosFreeId(tmrIdPool, tmrCtrlId);
return NULL;
}
for (int i = 0; i < pCtrl->numOfPeriods; i++) { for (int i = 0; i < pCtrl->numOfPeriods; i++) {
pCtrl->tmrList[i].head = NULL; pCtrl->tmrList[i].head = NULL;
pCtrl->tmrList[i].count = 0; pCtrl->tmrList[i].count = 0;
...@@ -236,12 +267,20 @@ void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) { ...@@ -236,12 +267,20 @@ void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) {
if (pthread_mutex_init(&pCtrl->mutex, NULL) < 0) { if (pthread_mutex_init(&pCtrl->mutex, NULL) < 0) {
tmrError("%s failed to create the mutex, reason:%s", label, strerror(errno)); tmrError("%s failed to create the mutex, reason:%s", label, strerror(errno));
taosTmrCleanUp(pCtrl); free(pCtrl->tmrList);
tmrMemPoolCleanUp(pCtrl->poolHandle);
taosFreeId(tmrIdPool, tmrCtrlId);
return NULL; return NULL;
} }
pCtrl->signature = pCtrl; // set 'stage' to 'in use' to mark the completion of initialization,
numOfTmrCtrl++; // release semantic is required to ensure all operations prior this
// are visible to other threads first.
__atomic_store_n(&pCtrl->stage, TMR_CTRL_STAGE_IN_USE, __ATOMIC_RELEASE);
// increase 'numOfTmrCtrl', need to be atomic but relaxed semantic is fine
__atomic_add_fetch(&numOfTmrCtrl, 1, __ATOMIC_RELAXED);
tmrTrace("%s timer ctrl is initialized, index:%d", label, tmrCtrlId); tmrTrace("%s timer ctrl is initialized, index:%d", label, tmrCtrlId);
return pCtrl; return pCtrl;
} }
...@@ -293,12 +332,14 @@ void taosTmrProcessList(tmr_ctrl_t *pCtrl) { ...@@ -293,12 +332,14 @@ void taosTmrProcessList(tmr_ctrl_t *pCtrl) {
void taosTmrCleanUp(void *handle) { void taosTmrCleanUp(void *handle) {
tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle; tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;
if (pCtrl == NULL || pCtrl->signature != pCtrl) return; if (pCtrl == NULL)
return;
pCtrl->signature = NULL; // set 'stage' to 'cleanning up' if it is 'in use' atomically,
taosFreeId(tmrIdPool, pCtrl->tmrCtrlId); // actual cleanning up will be done in 'taosTimerLoopFunc'.
numOfTmrCtrl--; int oldStage = TMR_CTRL_STAGE_IN_USE;
tmrTrace("%s is cleaned up, numOfTmrs:%d", pCtrl->label, numOfTmrCtrl); __atomic_compare_exchange_n(&pCtrl->stage, &oldStage, TMR_CTRL_STAGE_CLEANNING_UP,
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
} }
tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle) { tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle) {
...@@ -538,7 +579,6 @@ mpool_h tmrMemPoolInit(int numOfBlock, int blockSize) { ...@@ -538,7 +579,6 @@ mpool_h tmrMemPoolInit(int numOfBlock, int blockSize) {
pool_p->blockSize = blockSize; pool_p->blockSize = blockSize;
pool_p->numOfBlock = numOfBlock; pool_p->numOfBlock = numOfBlock;
pool_p->pool = (char *)malloc(blockSize * numOfBlock); pool_p->pool = (char *)malloc(blockSize * numOfBlock);
memset(pool_p->pool, 0, blockSize * numOfBlock);
pool_p->freeList = (int *)malloc(sizeof(int) * numOfBlock); pool_p->freeList = (int *)malloc(sizeof(int) * numOfBlock);
if (pool_p->pool == NULL || pool_p->freeList == NULL) { if (pool_p->pool == NULL || pool_p->freeList == NULL) {
...@@ -596,6 +636,6 @@ void tmrMemPoolCleanUp(mpool_h handle) { ...@@ -596,6 +636,6 @@ void tmrMemPoolCleanUp(mpool_h handle) {
if (pool_p->pool) free(pool_p->pool); if (pool_p->pool) free(pool_p->pool);
if (pool_p->freeList) free(pool_p->freeList); if (pool_p->freeList) free(pool_p->freeList);
memset(&pool_p, 0, sizeof(pool_p)); memset(pool_p, 0, sizeof(*pool_p));
free(pool_p); free(pool_p);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册