diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index cea84f048a9cd1db4173aecc39ed144454b9cd4a..69aa269ce3236024c02de325a09269d608d01bbb 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -38,17 +38,19 @@ typedef struct { SSchedMsg * queue; } SSchedQueue; -void (*taosSchedFp[128])(SSchedMsg *msg) = {0}; void *taosProcessSchedQueue(void *param); void taosCleanUpScheduler(void *param); void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { pthread_attr_t attr; SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue)); + if (pSched == NULL) { + pError("%s: no enough memory for pSched, reason: %s", label, strerror(errno)); + goto _error; + } memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; - pSched->numOfThreads = numOfThreads; strncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow pSched->label[sizeof(pSched->label)-1] = '\0'; @@ -76,16 +78,21 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { pSched->fullSlot = 0; pSched->emptySlot = 0; - pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads); + pSched->qthread = malloc(sizeof(pthread_t) * (size_t)numOfThreads); + if (pSched->qthread == NULL) { + pError("%s: no enough memory for qthread, reason: %s", pSched->label, strerror(errno)); + goto _error; + } pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - for (int i = 0; i < pSched->numOfThreads; ++i) { + for (int i = 0; i < numOfThreads; ++i) { if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno)); goto _error; } + ++pSched->numOfThreads; } pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads); @@ -103,11 +110,12 @@ void *taosProcessSchedQueue(void *param) { while (1) { if (sem_wait(&pSched->fullSem) != 0) { - pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno)); if (errno == EINTR) { /* sem_wait is interrupted by interrupt, ignore and continue */ + pTrace("wait %s fullSem was interrupted", pSched->label); continue; } + pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno)); } if (pthread_mutex_lock(&pSched->queueMutex) != 0) @@ -137,7 +145,13 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { return 0; } - if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno)); + while (sem_wait(&pSched->emptySem) != 0) { + if (errno != EINTR) { + pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno)); + break; + } + pTrace("wait %s emptySem was interrupted", pSched->label); + } if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); @@ -159,6 +173,8 @@ void taosCleanUpScheduler(void *param) { for (int i = 0; i < pSched->numOfThreads; ++i) { pthread_cancel(pSched->qthread[i]); + } + for (int i = 0; i < pSched->numOfThreads; ++i) { pthread_join(pSched->qthread[i], NULL); }