diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index 31b06714d62986cb422dbe87b7442f5c9c00a9a1..00a7bcf6c5869ed9be847c8aa5f22017267a533e 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -177,6 +177,8 @@ typedef struct HttpServer { char label[HTTP_LABEL_SIZE]; uint32_t serverIp; uint16_t serverPort; + int8_t stop; + int8_t reserve; SOCKET fd; int32_t numOfThreads; int32_t methodScannerLen; diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index ee6addd6fa74e878b0748635ace5adc09b4bbace..5a04a021cda3a9e6e174af0c5ab3d715b8566123 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -171,6 +171,11 @@ static void *httpAcceptHttpConnection(void *arg) { while (1) { socklen_t addrlen = sizeof(clientAddr); connFd = (int32_t)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen); + if (pServer->stop) { + httpDebug("http server:%s socket stop, exiting...", pServer->label); + break; + } + if (connFd == -1) { if (errno == EINVAL) { httpDebug("http server:%s socket was shutdown, exiting...", pServer->label); diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 34a70a658b49085312d046fe8799ac75b733c5d8..df39c080a5b0303f26034c66162f51181b081f40 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -89,7 +89,12 @@ int32_t httpStartSystem() { void httpStopSystem() { tsHttpServer.status = HTTP_SERVER_CLOSING; + tsHttpServer.stop = 1; +#ifdef WINDOWS + closesocket(tsHttpServer.fd); +#else shutdown(tsHttpServer.fd, SHUT_RD); +#endif tgCleanupHandle(); } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 08ae4b5b8bc5d12bcbf2bc99dae3212d9251a81b..4cc7530784a0aee50f4c57f6bd9fb86ec6c42759 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -59,6 +59,8 @@ typedef struct { SOCKET fd; uint32_t ip; uint16_t port; + int8_t stop; + int8_t reserve; char label[TSDB_LABEL_LEN]; int numOfThreads; void * shandle; @@ -188,8 +190,15 @@ void taosStopTcpServer(void *handle) { SServerObj *pServerObj = handle; if (pServerObj == NULL) return; - if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); + pServerObj->stop = 1; + if (pServerObj->fd >= 0) { +#ifdef WINDOWS + closesocket(pServerObj->fd); +#else + shutdown(pServerObj->fd, SHUT_RD); +#endif + } if (taosCheckPthreadValid(pServerObj->thread)) { if (taosComparePthread(pServerObj->thread, pthread_self())) { pthread_detach(pthread_self()); @@ -230,6 +239,11 @@ static void *taosAcceptTcpConnection(void *arg) { while (1) { socklen_t addrlen = sizeof(caddr); connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen); + if (pServerObj->stop) { + tDebug("%s TCP server stop accepting new connections", pServerObj->label); + break; + } + if (connFd == -1) { if (errno == EINVAL) { tDebug("%s TCP server stop accepting new connections, exiting", pServerObj->label); diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index 829c9ceec6fb2271c58509c90d49cd55d5df9bed..72ba8e26b28ee44b2151aa83a50d95ff57b56fba 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -46,6 +46,7 @@ typedef struct SPoolObj { pthread_t thread; int32_t nextId; SOCKET acceptFd; // FD for accept new connection + int8_t stop; } SPoolObj; typedef struct { @@ -106,7 +107,14 @@ void syncCloseTcpThreadPool(void *param) { SPoolObj * pPool = param; SThreadObj *pThread; + pPool->stop = 1; + +#ifdef WINDOWS + closesocket(pPool->acceptFd); +#else shutdown(pPool->acceptFd, SHUT_RD); +#endif + pthread_join(pPool->thread, NULL); for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) { @@ -257,6 +265,11 @@ static void *syncAcceptPeerTcpConnection(void *argv) { struct sockaddr_in clientAddr; socklen_t addrlen = sizeof(clientAddr); SOCKET connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); + if (pPool->stop) { + sDebug("%p TCP server accept is stopped", pPool); + break; + } + if (connFd < 0) { if (errno == EINVAL) { sDebug("%p TCP server accept is exiting...", pPool);