提交 9cd7c54e 编写于 作者: D dapan1121

fix: fix task sched crash issue

上级 b85c06b4
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_UTIL_SCHED_H_ #define _TD_UTIL_SCHED_H_
#include "os.h" #include "os.h"
#include "tdef.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -30,6 +31,24 @@ typedef struct SSchedMsg { ...@@ -30,6 +31,24 @@ typedef struct SSchedMsg {
void *thandle; void *thandle;
} SSchedMsg; } SSchedMsg;
typedef struct {
char label[TSDB_LABEL_LEN];
tsem_t emptySem;
tsem_t fullSem;
TdThreadMutex queueMutex;
int32_t fullSlot;
int32_t emptySlot;
int32_t queueSize;
int32_t numOfThreads;
TdThread *qthread;
SSchedMsg *queue;
int8_t stop;
void *pTmrCtrl;
void *pTimer;
} SSchedQueue;
/** /**
* Create a thread-safe ring-buffer based task queue and return the instance. A thread * Create a thread-safe ring-buffer based task queue and return the instance. A thread
* pool will be created to consume the messages in the queue. * pool will be created to consume the messages in the queue.
...@@ -38,7 +57,7 @@ typedef struct SSchedMsg { ...@@ -38,7 +57,7 @@ typedef struct SSchedMsg {
* @param label the label of the queue * @param label the label of the queue
* @return the created queue scheduler * @return the created queue scheduler
*/ */
void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label); void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label, SSchedQueue* pSched);
/** /**
* Create a thread-safe ring-buffer based task queue and return the instance. * Create a thread-safe ring-buffer based task queue and return the instance.
......
...@@ -62,7 +62,7 @@ static void indexDestroy(void* sIdx); ...@@ -62,7 +62,7 @@ static void indexDestroy(void* sIdx);
void indexInit() { void indexInit() {
// refactor later // refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index", NULL);
indexRefMgt = taosOpenRef(1000, indexDestroy); indexRefMgt = taosOpenRef(1000, indexDestroy);
} }
void indexCleanup() { void indexCleanup() {
......
...@@ -96,12 +96,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag ...@@ -96,12 +96,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
return true; return true;
} }
static void* pTaskQueue = NULL; static SSchedQueue pTaskQueue = {0};
int32_t initTaskQueue() { int32_t initTaskQueue() {
int32_t queueSize = tsMaxShellConns * 2; int32_t queueSize = tsMaxShellConns * 2;
pTaskQueue = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc"); void *p = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc", &pTaskQueue);
if (NULL == pTaskQueue) { if (NULL == p) {
qError("failed to init task queue"); qError("failed to init task queue");
return -1; return -1;
} }
...@@ -111,7 +111,7 @@ int32_t initTaskQueue() { ...@@ -111,7 +111,7 @@ int32_t initTaskQueue() {
} }
int32_t cleanupTaskQueue() { int32_t cleanupTaskQueue() {
taosCleanUpScheduler(pTaskQueue); taosCleanUpScheduler(&pTaskQueue);
return 0; return 0;
} }
...@@ -134,7 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) ...@@ -134,7 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
schedMsg.thandle = execParam; schedMsg.thandle = execParam;
schedMsg.msg = code; schedMsg.msg = code;
taosScheduleTask(pTaskQueue, &schedMsg); taosScheduleTask(&pTaskQueue, &schedMsg);
return 0; return 0;
} }
......
...@@ -22,30 +22,16 @@ ...@@ -22,30 +22,16 @@
#define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue. #define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue.
typedef struct {
char label[TSDB_LABEL_LEN];
tsem_t emptySem;
tsem_t fullSem;
TdThreadMutex queueMutex;
int32_t fullSlot;
int32_t emptySlot;
int32_t queueSize;
int32_t numOfThreads;
TdThread *qthread;
SSchedMsg *queue;
bool stop;
void *pTmrCtrl;
void *pTimer;
} SSchedQueue;
static void *taosProcessSchedQueue(void *param); static void *taosProcessSchedQueue(void *param);
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label) { void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label, SSchedQueue *pSched) {
SSchedQueue *pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1); if (NULL == pSched) {
if (pSched == NULL) { pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1);
uError("%s: no enough memory for pSched", label); if (pSched == NULL) {
return NULL; uError("%s: no enough memory for pSched", label);
return NULL;
}
} }
pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize); pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize);
...@@ -86,7 +72,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab ...@@ -86,7 +72,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
return NULL; return NULL;
} }
pSched->stop = false; atomic_store_8(&pSched->stop, 0);
for (int32_t i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
TdThreadAttr attr; TdThreadAttr attr;
taosThreadAttrInit(&attr); taosThreadAttrInit(&attr);
...@@ -107,7 +93,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab ...@@ -107,7 +93,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
} }
void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) { void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label); SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label, NULL);
if (tmrCtrl != NULL && pSched != NULL) { if (tmrCtrl != NULL && pSched != NULL) {
pSched->pTmrCtrl = tmrCtrl; pSched->pTmrCtrl = tmrCtrl;
...@@ -131,7 +117,7 @@ void *taosProcessSchedQueue(void *scheduler) { ...@@ -131,7 +117,7 @@ void *taosProcessSchedQueue(void *scheduler) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
ASSERT(0); ASSERT(0);
} }
if (pSched->stop) { if (atomic_load_8(&pSched->stop)) {
break; break;
} }
...@@ -172,6 +158,11 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { ...@@ -172,6 +158,11 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
return; return;
} }
if (atomic_load_8(&pSched->stop)) {
uError("sched is already stopped, msg:%p is dropped", pMsg);
return;
}
if ((ret = tsem_wait(&pSched->emptySem)) != 0) { if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
ASSERT(0); ASSERT(0);
...@@ -202,7 +193,10 @@ void taosCleanUpScheduler(void *param) { ...@@ -202,7 +193,10 @@ void taosCleanUpScheduler(void *param) {
uDebug("start to cleanup %s schedQsueue", pSched->label); uDebug("start to cleanup %s schedQsueue", pSched->label);
pSched->stop = true; atomic_store_8(&pSched->stop, 1);
taosMsleep(200);
for (int32_t i = 0; i < pSched->numOfThreads; ++i) { for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) { if (taosCheckPthreadValid(pSched->qthread[i])) {
tsem_post(&pSched->fullSem); tsem_post(&pSched->fullSem);
...@@ -226,7 +220,7 @@ void taosCleanUpScheduler(void *param) { ...@@ -226,7 +220,7 @@ void taosCleanUpScheduler(void *param) {
if (pSched->queue) taosMemoryFree(pSched->queue); if (pSched->queue) taosMemoryFree(pSched->queue);
if (pSched->qthread) taosMemoryFree(pSched->qthread); if (pSched->qthread) taosMemoryFree(pSched->qthread);
taosMemoryFree(pSched); // fix memory leak //taosMemoryFree(pSched);
} }
// for debug purpose, dump the scheduler status every 1min. // for debug purpose, dump the scheduler status every 1min.
......
...@@ -555,7 +555,7 @@ static void taosTmrModuleInit(void) { ...@@ -555,7 +555,7 @@ static void taosTmrModuleInit(void) {
return; return;
} }
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr"); tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK); taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads); tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册