diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 683328db29e4a4e072549d91e5e9cc7da9e6e53d..a7bfc2d7d2224287dec353b7a40b239307d93a74 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -22,6 +22,7 @@ #include "dnodeMain.h" static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); +static sem_t exitSem; int32_t main(int32_t argc, char *argv[]) { // Set global configuration file @@ -65,6 +66,11 @@ int32_t main(int32_t argc, char *argv[]) { #endif } + if (sem_init(&exitSem, 0, 0) != 0) { + printf("failed to create exit semphore\n"); + exit(EXIT_FAILURE); + } + /* Set termination handler. */ struct sigaction act = {{0}}; act.sa_flags = SA_SIGINFO; @@ -90,9 +96,19 @@ int32_t main(int32_t argc, char *argv[]) { syslog(LOG_INFO, "Started TDengine service successfully."); - while (1) { - sleep(1000); + for (int res = sem_wait(&exitSem); res != 0; res = sem_wait(&exitSem)) { + if (res != EINTR) { + syslog(LOG_ERR, "failed to wait exit semphore: %d", res); + break; + } } + + dnodeCleanUpSystem(); + // close the syslog + syslog(LOG_INFO, "Shut down TDengine service successfully"); + dPrint("TDengine is shut down!"); + closelog(); + return EXIT_SUCCESS; } static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { @@ -104,14 +120,21 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { taosCfgDynamicOptions("resetlog"); return; } + syslog(LOG_INFO, "Shut down signal is %d", signum); syslog(LOG_INFO, "Shutting down TDengine service..."); // clean the system. dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); - dnodeCleanUpSystem(); - // close the syslog - syslog(LOG_INFO, "Shut down TDengine service successfully"); - dPrint("TDengine is shut down!"); - closelog(); - exit(EXIT_SUCCESS); + + // protect the application from receive another signal + struct sigaction act = {{0}}; + act.sa_handler = SIG_IGN; + sigaction(SIGTERM, &act, NULL); + sigaction(SIGHUP, &act, NULL); + sigaction(SIGINT, &act, NULL); + sigaction(SIGUSR1, &act, NULL); + sigaction(SIGUSR2, &act, NULL); + + // inform main thread to exit + sem_post(&exitSem); } diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index aa8cd997858c7ad27a02d751b00e765fc176d327..22505f2780bd220740b8523bca769ca8cb699a54 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -17,6 +17,7 @@ #include "os.h" #include "taoserror.h" #include "taosmsg.h" +#include "tutil.h" #include "tqueue.h" #include "trpc.h" #include "twal.h" @@ -71,11 +72,16 @@ int32_t dnodeInitRead() { } void dnodeCleanupRead() { + for (int i=0; i < readPool.max; ++i) { + SReadWorker *pWorker = readPool.readWorker + i; + if (pWorker->thread) { + taosQsetThreadResume(readQset); + } + } for (int i=0; i < readPool.max; ++i) { SReadWorker *pWorker = readPool.readWorker + i; if (pWorker->thread) { - pthread_cancel(pWorker->thread); pthread_join(pWorker->thread, NULL); } } @@ -201,15 +207,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { } static void *dnodeProcessReadQueue(void *param) { - SReadWorker *pWorker = param; SReadMsg *pReadMsg; int type; void *pVnode; while (1) { if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { - dnodeHandleIdleReadWorker(pWorker); - continue; + dTrace("dnodeProcessReadQueee: got no message from qset, exiting..."); + break; } dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); @@ -221,6 +226,8 @@ static void *dnodeProcessReadQueue(void *param) { return NULL; } + +UNUSED_FUNC static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) { int32_t num = taosGetQueueNumber(readQset); diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 879082f223e27537f8f0dbd8a4e6cc53fce5117b..a1531433ef4569634ebc530f87dd7b8b7dd640d9 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -17,6 +17,7 @@ #include "os.h" #include "taosmsg.h" #include "taoserror.h" +#include "tutil.h" #include "tqueue.h" #include "trpc.h" #include "tsdb.h" @@ -67,11 +68,16 @@ int32_t dnodeInitWrite() { } void dnodeCleanupWrite() { + for (int32_t i = 0; i < wWorkerPool.max; ++i) { + SWriteWorker *pWorker = wWorkerPool.writeWorker + i; + if (pWorker->thread) { + taosQsetThreadResume(pWorker->qset); + } + } for (int32_t i = 0; i < wWorkerPool.max; ++i) { SWriteWorker *pWorker = wWorkerPool.writeWorker + i; if (pWorker->thread) { - pthread_cancel(pWorker->thread); pthread_join(pWorker->thread, NULL); taosFreeQall(pWorker->qall); taosCloseQset(pWorker->qset); @@ -186,9 +192,9 @@ static void *dnodeProcessWriteQueue(void *param) { while (1) { numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); - if (numOfMsgs <=0) { - dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore - continue; + if (numOfMsgs ==0) { + dTrace("dnodeProcessWriteQueee: got no message from qset, exiting..."); + break; } for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -228,6 +234,7 @@ static void *dnodeProcessWriteQueue(void *param) { return NULL; } +UNUSED_FUNC static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { int32_t num = taosGetQueueNumber(pWorker->qset); diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index 40fcf834318aa90baf3b9ddf575618f402124623..35d2236001cf27f29864baf2e48e725ab5f4a507 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -53,6 +53,7 @@ extern "C" { #include #include #include +#include #include #include #include diff --git a/src/plugins/http/inc/httpHandle.h b/src/plugins/http/inc/httpHandle.h index d8b3b8eade94c86fb4a9b95a54d6195e52d39445..9be2796a9629055fa5fa5b77b5bdb689970542da 100644 --- a/src/plugins/http/inc/httpHandle.h +++ b/src/plugins/http/inc/httpHandle.h @@ -199,7 +199,7 @@ typedef struct HttpThread { pthread_t thread; HttpContext * pHead; pthread_mutex_t threadMutex; - pthread_cond_t fdReady; + bool stop; int pollFd; int numOfFds; int threadId; @@ -212,6 +212,8 @@ typedef struct HttpServer { char label[HTTP_LABEL_SIZE]; uint32_t serverIp; uint16_t serverPort; + bool online; + int fd; int cacheContext; int sessionExpire; int numOfThreads; @@ -226,7 +228,6 @@ typedef struct HttpServer { bool (*processData)(HttpContext *pContext); int requestNum; void *timerHandle; - bool online; } HttpServer; // http util method diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 51795a570d2a25360561938642cd175623244bd7..11da145463c6eb43dde0b99120889973e7346355 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -258,28 +258,45 @@ void httpCloseContextByServerForExpired(void *param, void *tmrId) { httpCloseContextByServer(pContext->pThread, pContext); } -void httpCleanUpConnect(HttpServer *pServer) { - int i; - HttpThread *pThread; - if (pServer == NULL) return; +static void httpStopThread(HttpThread* pThread) { + pThread->stop = true; - pthread_cancel(pServer->thread); - pthread_join(pServer->thread, NULL); + // signal the thread to stop, try graceful method first, + // and use pthread_cancel when failed + struct epoll_event event = { .events = EPOLLIN }; + eventfd_t fd = eventfd(1, 0); + if (fd == -1) { + pthread_cancel(pThread->thread); + } else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { + pthread_cancel(pThread->thread); + } - for (i = 0; i < pServer->numOfThreads; ++i) { - pThread = pServer->pThreads + i; - if (pThread == NULL) continue; - //taosCloseSocket(pThread->pollFd); + pthread_join(pThread->thread, NULL); + if (fd != -1) { + close(fd); + } - //while (pThread->pHead) { - // httpCleanUpContext(pThread->pHead, 0); - //} + close(pThread->pollFd); + pthread_mutex_destroy(&(pThread->threadMutex)); - pthread_cancel(pThread->thread); - pthread_join(pThread->thread, NULL); - pthread_cond_destroy(&(pThread->fdReady)); - pthread_mutex_destroy(&(pThread->threadMutex)); + //while (pThread->pHead) { + // httpCleanUpContext(pThread->pHead, 0); + //} +} + + +void httpCleanUpConnect(HttpServer *pServer) { + if (pServer == NULL) return; + + shutdown(pServer->fd, SHUT_RD); + pthread_join(pServer->thread, NULL); + + for (int i = 0; i < pServer->numOfThreads; ++i) { + HttpThread* pThread = pServer->pThreads + i; + if (pThread != NULL) { + httpStopThread(pThread); + } } tfree(pServer->pThreads); @@ -412,15 +429,13 @@ void httpProcessHttpData(void *param) { pthread_sigmask(SIG_SETMASK, &set, NULL); while (1) { - pthread_mutex_lock(&pThread->threadMutex); - if (pThread->numOfFds < 1) { - pthread_cond_wait(&pThread->fdReady, &pThread->threadMutex); - } - pthread_mutex_unlock(&pThread->threadMutex); - struct epoll_event events[HTTP_MAX_EVENTS]; //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1); + if (pThread->stop) { + httpTrace("%p, http thread get stop event, exiting...", pThread); + break; + } if (fdNum <= 0) continue; for (int i = 0; i < fdNum; ++i) { @@ -485,10 +500,9 @@ void httpProcessHttpData(void *param) { } } -void httpAcceptHttpConnection(void *arg) { +void* httpAcceptHttpConnection(void *arg) { int connFd = -1; struct sockaddr_in clientAddr; - int sockFd; int threadId = 0; HttpThread * pThread; HttpServer * pServer; @@ -502,12 +516,12 @@ void httpAcceptHttpConnection(void *arg) { sigaddset(&set, SIGPIPE); pthread_sigmask(SIG_SETMASK, &set, NULL); - sockFd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); + pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); - if (sockFd < 0) { + if (pServer->fd < 0) { httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp), pServer->serverPort, strerror(errno)); - return; + return NULL; } else { httpPrint("http service init success at %u", pServer->serverPort); pServer->online = true; @@ -515,9 +529,12 @@ void httpAcceptHttpConnection(void *arg) { while (1) { socklen_t addrlen = sizeof(clientAddr); - connFd = (int)accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen); - - if (connFd < 3) { + connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen); + if (connFd == -1) { + if (errno == EINVAL) { + httpTrace("%s HTTP server socket was shutdown, exiting...", pServer->label); + break; + } httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno)); continue; } @@ -579,7 +596,6 @@ void httpAcceptHttpConnection(void *arg) { pThread->pHead = pContext; pThread->numOfFds++; - pthread_cond_signal(&pThread->fdReady); pthread_mutex_unlock(&(pThread->threadMutex)); @@ -587,6 +603,9 @@ void httpAcceptHttpConnection(void *arg) { threadId++; threadId = threadId % pServer->numOfThreads; } + + close(pServer->fd); + return NULL; } bool httpInitConnect(HttpServer *pServer) { @@ -612,11 +631,6 @@ bool httpInitConnect(HttpServer *pServer) { return false; } - if (pthread_cond_init(&(pThread->fdReady), NULL) != 0) { - httpError("http thread:%s, init HTTP condition variable failed, reason:%s", pThread->label, strerror(errno)); - return false; - } - pThread->pollFd = epoll_create(HTTP_MAX_EVENTS); // size does not matter if (pThread->pollFd < 0) { httpError("http thread:%s, failed to create HTTP epoll", pThread->label); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index dcbfbcf9ac873ddeb741a0b70b089b1a47a373a0..ecd2ec83674b0869d2333e037192dd0ba563bbd8 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -889,12 +889,12 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { rpcSendErrorMsgToPeer(pRecv, code); tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } - } else { // parsing OK + } else { // msg is passed to app only parsing is ok rpcProcessIncomingMsg(pConn, pHead); } } - if (code) rpcFreeMsg(pRecv->msg); + if (code) rpcFreeMsg(pRecv->msg); // parsing failed, msg shall be freed return pConn; } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 38458c71d21fed1b0eadd67afd80a9d542934de6..739b86d7a57ab0027cb3cea4382d93add3db7ab1 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -39,8 +39,8 @@ typedef struct SThreadObj { pthread_t thread; SFdObj * pHead; pthread_mutex_t mutex; - pthread_cond_t fdReady; uint32_t ip; + bool stop; int pollFd; int numOfFds; int threadId; @@ -50,6 +50,7 @@ typedef struct SThreadObj { } SThreadObj; typedef struct { + int fd; uint32_t ip; uint16_t port; char label[12]; @@ -63,7 +64,7 @@ static void *taosProcessTcpData(void *param); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd); static void taosFreeFdObj(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj); -static void taosAcceptTcpConnection(void *arg); +static void* taosAcceptTcpConnection(void *arg); void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { SServerObj *pServerObj; @@ -95,12 +96,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread break;; } - code = pthread_cond_init(&(pThreadObj->fdReady), NULL); - if (code != 0) { - tError("%s init TCP condition variable failed(%s)", label, strerror(errno)); - break; - } - pThreadObj->pollFd = epoll_create(10); // size does not matter if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP epoll", label); @@ -144,28 +139,45 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread return (void *)pServerObj; } +static void taosStopTcpThread(SThreadObj* pThreadObj) { + pThreadObj->stop = true; + + // signal the thread to stop, try graceful method first, + // and use pthread_cancel when failed + struct epoll_event event = { .events = EPOLLIN }; + eventfd_t fd = eventfd(1, 0); + if (fd == -1) { + pthread_cancel(pThreadObj->thread); + } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { + pthread_cancel(pThreadObj->thread); + } + + pthread_join(pThreadObj->thread, NULL); + close(pThreadObj->pollFd); + if (fd != -1) { + close(fd); + } + + while (pThreadObj->pHead) { + SFdObj *pFdObj = pThreadObj->pHead; + pThreadObj->pHead = pFdObj->next; + taosFreeFdObj(pFdObj); + } +} + + void taosCleanUpTcpServer(void *handle) { SServerObj *pServerObj = handle; SThreadObj *pThreadObj; if (pServerObj == NULL) return; - pthread_cancel(pServerObj->thread); + shutdown(pServerObj->fd, SHUT_RD); pthread_join(pServerObj->thread, NULL); for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj + i; - - while (pThreadObj->pHead) { - SFdObj *pFdObj = pThreadObj->pHead; - pThreadObj->pHead = pFdObj->next; - taosFreeFdObj(pFdObj); - } - - close(pThreadObj->pollFd); - pthread_cancel(pThreadObj->thread); - pthread_join(pThreadObj->thread, NULL); - pthread_cond_destroy(&(pThreadObj->fdReady)); + taosStopTcpThread(pThreadObj); pthread_mutex_destroy(&(pThreadObj->mutex)); } @@ -175,26 +187,28 @@ void taosCleanUpTcpServer(void *handle) { tfree(pServerObj); } -static void taosAcceptTcpConnection(void *arg) { +static void* taosAcceptTcpConnection(void *arg) { int connFd = -1; struct sockaddr_in caddr; - int sockFd; int threadId = 0; SThreadObj *pThreadObj; SServerObj *pServerObj; pServerObj = (SServerObj *)arg; - sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); - if (sockFd < 0) return; + pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); + if (pServerObj->fd < 0) return NULL; tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); while (1) { socklen_t addrlen = sizeof(caddr); - connFd = accept(sockFd, (struct sockaddr *)&caddr, &addrlen); - - if (connFd < 0) { + connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen); + if (connFd == -1) { + if (errno == EINVAL) { + tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label); + break; + } tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno)); continue; } @@ -220,6 +234,9 @@ static void taosAcceptTcpConnection(void *arg) { threadId++; threadId = threadId % pServerObj->numOfThreads; } + + close(pServerObj->fd); + return NULL; } void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) { @@ -237,11 +254,6 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * return NULL; } - if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) { - tError("%s init TCP condition variable failed(%s)", label, strerror(errno)); - return NULL; - } - pThreadObj->pollFd = epoll_create(10); // size does not matter if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP client epoll", label); @@ -268,17 +280,7 @@ void taosCleanUpTcpClient(void *chandle) { SThreadObj *pThreadObj = chandle; if (pThreadObj == NULL) return; - while (pThreadObj->pHead) { - SFdObj *pFdObj = pThreadObj->pHead; - pThreadObj->pHead = pFdObj->next; - taosFreeFdObj(pFdObj); - } - - close(pThreadObj->pollFd); - - pthread_cancel(pThreadObj->thread); - pthread_join(pThreadObj->thread, NULL); - + taosStopTcpThread(pThreadObj); tTrace (":%s, all connections are cleaned up", pThreadObj->label); tfree(pThreadObj); @@ -350,13 +352,11 @@ static void *taosProcessTcpData(void *param) { SRpcHead rpcHead; while (1) { - pthread_mutex_lock(&pThreadObj->mutex); - if (pThreadObj->numOfFds < 1) { - pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->mutex); - } - pthread_mutex_unlock(&pThreadObj->mutex); - int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); + if (pThreadObj->stop) { + tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label); + break; + } if (fdNum < 0) continue; for (int i = 0; i < fdNum; ++i) { @@ -444,7 +444,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) { if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; pThreadObj->pHead = pFdObj; pThreadObj->numOfFds++; - pthread_cond_signal(&pThreadObj->fdReady); pthread_mutex_unlock(&(pThreadObj->mutex)); return pFdObj; @@ -492,5 +491,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) { tfree(pFdObj); } - - diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 3a40f27e26f124a5c3caa40c27481f21a187eb14..6f6014b678f6f3440b5193e74abc12710119b956 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -135,14 +135,15 @@ void taosCleanUpUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; pConn->signature = NULL; - free(pConn->buffer); - pthread_cancel(pConn->thread); - taosCloseSocket(pConn->fd); + // shutdown to signal the thread to exit + shutdown(pConn->fd, SHUT_RD); } for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; pthread_join(pConn->thread, NULL); + free(pConn->buffer); + taosCloseSocket(pConn->fd); tTrace("chandle:%p is closed", pConn); } @@ -177,6 +178,11 @@ static void *taosRecvUdpData(void *param) { while (1) { dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); + if(dataLen == 0) { + tTrace("data length is 0, socket was closed, exiting"); + break; + } + port = ntohs(sourceAdd.sin_port); if (dataLen < sizeof(SRpcHead)) { diff --git a/src/util/inc/tqueue.h b/src/util/inc/tqueue.h index f4086dcd126e5961383feb5368517f670b561489..8493a64315966aa32b58359652d0f429e8e0916a 100644 --- a/src/util/inc/tqueue.h +++ b/src/util/inc/tqueue.h @@ -39,6 +39,7 @@ void taosResetQitems(taos_qall); taos_qset taosOpenQset(); void taosCloseQset(); +void taosQsetThreadResume(taos_qset param); int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); void taosRemoveFromQset(taos_qset, taos_queue); int taosGetQueueNumber(taos_qset); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 8a203ed9176687ecf5ea03973b979bc3607b8630..1e248c9e459c8615fc61fc8368a794bd3a9ad3a1 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -230,6 +230,14 @@ void taosCloseQset(taos_qset param) { free(qset); } +// tsem_post 'qset->sem', so that reader threads waiting for it +// resumes execution and return, should only be used to signal the +// thread to exit. +void taosQsetThreadResume(taos_qset param) { + STaosQset *qset = (STaosQset *)param; + tsem_post(&qset->sem); +} + int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { STaosQueue *queue = (STaosQueue *)p2; STaosQset *qset = (STaosQset *)p1; diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index 39d8816310edda5d931ade50af03f21e9859931b..25893969e47c69abe11cc5651b1f02eccb5e61bf 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -31,7 +31,7 @@ typedef struct { int numOfThreads; pthread_t * qthread; SSchedMsg * queue; - + bool stop; void* pTmrCtrl; void* pTimer; } SSchedQueue; @@ -85,6 +85,7 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { return NULL; } + pSched->stop = false; for (int i = 0; i < numOfThreads; ++i) { pthread_attr_t attr; pthread_attr_init(&attr); @@ -128,6 +129,9 @@ void *taosProcessSchedQueue(void *param) { } uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); } + if (pSched->stop) { + break; + } if (pthread_mutex_lock(&pSched->queueMutex) != 0) uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); @@ -185,13 +189,16 @@ void taosCleanUpScheduler(void *param) { SSchedQueue *pSched = (SSchedQueue *)param; if (pSched == NULL) return; + pSched->stop = true; for (int i = 0; i < pSched->numOfThreads; ++i) { - if (pSched->qthread[i]) - pthread_cancel(pSched->qthread[i]); + if (pSched->qthread[i]) { + tsem_post(&pSched->fullSem); + } } for (int i = 0; i < pSched->numOfThreads; ++i) { - if (pSched->qthread[i]) + if (pSched->qthread[i]) { pthread_join(pSched->qthread[i], NULL); + } } tsem_destroy(&pSched->emptySem);