From abfac9ab09de6fed1fbc971872dead0741d24c55 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Dec 2019 14:13:07 +0800 Subject: [PATCH] dump the number of tasks in the queue every 30sec. --- src/inc/tsched.h | 2 ++ src/util/src/tsched.c | 40 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/inc/tsched.h b/src/inc/tsched.h index dffd7a298a..827ecbbb42 100644 --- a/src/inc/tsched.h +++ b/src/inc/tsched.h @@ -32,6 +32,8 @@ typedef struct _sched_msg { void *taosInitScheduler(int queueSize, int numOfThreads, const char *label); +void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl); + int taosScheduleTask(void *qhandle, SSchedMsg *pMsg); void taosCleanUpScheduler(void *param); diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index bd49c670f6..4eefd2ad95 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -16,6 +16,9 @@ #include "os.h" #include "tlog.h" #include "tsched.h" +#include "ttimer.h" + +#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. typedef struct { char label[16]; @@ -28,10 +31,13 @@ typedef struct { int numOfThreads; pthread_t * qthread; SSchedMsg * queue; + + void* pTmrCtrl; + void* pTimer; } SSchedQueue; -void *taosProcessSchedQueue(void *param); -void taosCleanUpScheduler(void *param); +static void *taosProcessSchedQueue(void *param); +static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { pthread_attr_t attr; @@ -96,6 +102,17 @@ _error: return NULL; } +void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) { + SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label); + + if (tmrCtrl != NULL && pSched != NULL) { + pSched->pTmrCtrl = tmrCtrl; + taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer); + } + + return pSched; +} + void *taosProcessSchedQueue(void *param) { SSchedMsg msg; SSchedQueue *pSched = (SSchedQueue *)param; @@ -173,8 +190,27 @@ void taosCleanUpScheduler(void *param) { tsem_destroy(&pSched->emptySem); tsem_destroy(&pSched->fullSem); pthread_mutex_destroy(&pSched->queueMutex); + + if (pSched->pTimer) { + taosTmrStopA(&pSched->pTimer); + } free(pSched->queue); free(pSched->qthread); free(pSched); // fix memory leak } + +// for debug purpose, dump the scheduler status every 1min. +void taosDumpSchedulerStatus(void *qhandle, void *tmrId) { + SSchedQueue *pSched = (SSchedQueue *)qhandle; + if (pSched == NULL || pSched->pTimer == NULL || pSched->pTimer != tmrId) { + return; + } + + int32_t size = ((pSched->emptySlot - pSched->fullSlot) + pSched->queueSize) % pSched->queueSize; + if (size > 0) { + pTrace("scheduler:%s, current tasks in queue:%d, task thread:%d", pSched->label, size, pSched->numOfThreads); + } + + taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer); +} -- GitLab