From 9a06248432a70b29ce76b4d77331e1056ba21726 Mon Sep 17 00:00:00 2001 From: Jun Li Date: Sun, 27 Jun 2021 06:13:33 -0700 Subject: [PATCH] Rename file and add some comment/changes (#6587) 1. rename semphone to semaphore 2. add comment for tsched.h 3. change the function signature for taosSchedulerTask, changing from return int to void. We currently don't check any return code of the function. 4. add some error handlings. For fatal error, just exit the process because the program may run into a random state. --- src/os/inc/os.h | 2 +- src/os/inc/{osSemphone.h => osSemaphore.h} | 4 +- .../darwin/{dwSemphone.c => dwSemaphore.c} | 0 .../detail/{osSemphone.c => osSemaphore.c} | 0 .../src/windows/{wSemphone.c => wSemaphore.c} | 0 src/util/inc/tsched.h | 39 +++++++++-- src/util/src/tsched.c | 67 ++++++++++++------- 7 files changed, 79 insertions(+), 33 deletions(-) rename src/os/inc/{osSemphone.h => osSemaphore.h} (97%) rename src/os/src/darwin/{dwSemphone.c => dwSemaphore.c} (100%) rename src/os/src/detail/{osSemphone.c => osSemaphore.c} (100%) rename src/os/src/windows/{wSemphone.c => wSemaphore.c} (100%) diff --git a/src/os/inc/os.h b/src/os/inc/os.h index 6731ca6d7d..903e80d5c7 100644 --- a/src/os/inc/os.h +++ b/src/os/inc/os.h @@ -29,7 +29,7 @@ extern "C" { #include "osMath.h" #include "osMemory.h" #include "osRand.h" -#include "osSemphone.h" +#include "osSemaphore.h" #include "osSignal.h" #include "osSleep.h" #include "osSocket.h" diff --git a/src/os/inc/osSemphone.h b/src/os/inc/osSemaphore.h similarity index 97% rename from src/os/inc/osSemphone.h rename to src/os/inc/osSemaphore.h index fe59095205..10d14700e0 100644 --- a/src/os/inc/osSemphone.h +++ b/src/os/inc/osSemaphore.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_OS_SEMPHONE_H -#define TDENGINE_OS_SEMPHONE_H +#ifndef TDENGINE_OS_SEMAPHORE_H +#define TDENGINE_OS_SEMAPHORE_H #ifdef __cplusplus extern "C" { diff --git a/src/os/src/darwin/dwSemphone.c b/src/os/src/darwin/dwSemaphore.c similarity index 100% rename from src/os/src/darwin/dwSemphone.c rename to src/os/src/darwin/dwSemaphore.c diff --git a/src/os/src/detail/osSemphone.c b/src/os/src/detail/osSemaphore.c similarity index 100% rename from src/os/src/detail/osSemphone.c rename to src/os/src/detail/osSemaphore.c diff --git a/src/os/src/windows/wSemphone.c b/src/os/src/windows/wSemaphore.c similarity index 100% rename from src/os/src/windows/wSemphone.c rename to src/os/src/windows/wSemaphore.c diff --git a/src/util/inc/tsched.h b/src/util/inc/tsched.h index 3e481cbc32..a1591512c1 100644 --- a/src/util/inc/tsched.h +++ b/src/util/inc/tsched.h @@ -28,10 +28,41 @@ typedef struct SSchedMsg { void *thandle; } SSchedMsg; -void *taosInitScheduler(int queueSize, int numOfThreads, const char *label); -void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl); -int taosScheduleTask(void *qhandle, SSchedMsg *pMsg); -void taosCleanUpScheduler(void *param); +/** + * Create a thread-safe ring-buffer based task queue and return the instance. A thread + * pool will be created to consume the messages in the queue. + * @param capacity the queue capacity + * @param numOfThreads the number of threads for the thread pool + * @param label the label of the queue + * @return the created queue scheduler + */ +void *taosInitScheduler(int capacity, int numOfThreads, const char *label); + +/** + * Create a thread-safe ring-buffer based task queue and return the instance. + * Same as taosInitScheduler, and it also print the queue status every 1 minite. + * @param capacity the queue capacity + * @param numOfThreads the number of threads for the thread pool + * @param label the label of the queue + * @param tmrCtrl the timer controller, tmr_ctrl_t* + * @return the created queue scheduler + */ +void *taosInitSchedulerWithInfo(int capacity, int numOfThreads, const char *label, void *tmrCtrl); + +/** + * Clean up the queue scheduler instance and free the memory. + * @param queueScheduler the queue scheduler to free + */ +void taosCleanUpScheduler(void *queueScheduler); + +/** + * Schedule a new task to run, the task is described by pMsg. + * The function may be blocked if no thread is available to execute the task. + * That may happen when all threads are busy. + * @param queueScheduler the queue scheduler instance + * @param pMsg the message for the task + */ +void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg); #ifdef __cplusplus } diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index f014dd0fab..16142470c9 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -108,39 +108,47 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) { SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label); - + if (tmrCtrl != NULL && pSched != NULL) { pSched->pTmrCtrl = tmrCtrl; taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer); } - + return pSched; } -void *taosProcessSchedQueue(void *param) { +void *taosProcessSchedQueue(void *scheduler) { SSchedMsg msg; - SSchedQueue *pSched = (SSchedQueue *)param; + SSchedQueue *pSched = (SSchedQueue *)scheduler; + int ret = 0; while (1) { - if (tsem_wait(&pSched->fullSem) != 0) { - uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); + if ((ret = tsem_wait(&pSched->fullSem)) != 0) { + uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); + exit(ret); } if (pSched->stop) { break; } - if (pthread_mutex_lock(&pSched->queueMutex) != 0) - uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); + if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) { + uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); + exit(ret); + } msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; - if (pthread_mutex_unlock(&pSched->queueMutex) != 0) - uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); + if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) { + uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); + exit(ret); + } - if (tsem_post(&pSched->emptySem) != 0) - uError("post %s emptySem failed(%s)", pSched->label, strerror(errno)); + if ((ret = tsem_post(&pSched->emptySem)) != 0) { + uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno)); + exit(ret); + } if (msg.fp) (*(msg.fp))(&msg); @@ -151,30 +159,37 @@ void *taosProcessSchedQueue(void *param) { return NULL; } -int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { - SSchedQueue *pSched = (SSchedQueue *)qhandle; +void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { + SSchedQueue *pSched = (SSchedQueue *)queueScheduler; + int ret = 0; + if (pSched == NULL) { uError("sched is not ready, msg:%p is dropped", pMsg); - return 0; + return; } - if (tsem_wait(&pSched->emptySem) != 0) { - uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); + if ((ret = tsem_wait(&pSched->emptySem)) != 0) { + uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); + exit(ret); } - if (pthread_mutex_lock(&pSched->queueMutex) != 0) - uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); + if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) { + uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); + exit(ret); + } pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize; - if (pthread_mutex_unlock(&pSched->queueMutex) != 0) - uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); - - if (tsem_post(&pSched->fullSem) != 0) - uError("post %s fullSem failed(%s)", pSched->label, strerror(errno)); + if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) { + uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); + exit(ret); + } - return 0; + if ((ret = tsem_post(&pSched->fullSem)) != 0) { + uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno)); + exit(ret); + } } void taosCleanUpScheduler(void *param) { @@ -219,4 +234,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) { } taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer); -} +} \ No newline at end of file -- GitLab