diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 199ee43694e0a36659b34e5db9a867c453d1f65d..069ebaeb8acb427a0a70366fd7a349504d9da767 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -526,6 +526,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { if (pThrd == NULL) { return; } + uv_stop(pThrd->loop); pthread_join(pThrd->thread, NULL); pthread_mutex_destroy(&pThrd->msgMtx); free(pThrd->cliAsync); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 802f2cac9ddd229a64e05757811c11caea2534e4..475ef32b46c549d3e631a917cd7fad119feef2fb 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -70,6 +70,7 @@ typedef struct SServerObj { uv_pipe_t** pipe; uint32_t ip; uint32_t port; + uv_async_t* pAcceptAsync; // just to quit from from accept thread } SServerObj; static const char* notify = "a"; @@ -88,9 +89,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvWorkerAsyncCb(uv_async_t* handle); +static void uvAcceptAsyncCb(uv_async_t* handle); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSrvMsg* msg); + static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static bool readComplete(SConnBuffer* buf); @@ -389,7 +392,13 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("except occurred, continue"); continue; } - uvStartSendResp(msg); + if (msg->pConn == NULL) { + // + free(msg); + uv_stop(pThrd->loop); + } else { + uvStartSendResp(msg); + } // uv_buf_t wb; // uvPrepareSendData(msg, &wb); // uv_timer_stop(conn->pTimer); @@ -397,6 +406,10 @@ void uvWorkerAsyncCb(uv_async_t* handle) { // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } } +static void uvAcceptAsyncCb(uv_async_t* async) { + SServerObj* srv = async->data; + uv_stop(srv->loop); +} void uvOnAcceptCb(uv_stream_t* stream, int status) { if (status == -1) { @@ -517,8 +530,12 @@ static bool addHandleToAcceptloop(void* arg) { return false; } - struct sockaddr_in bind_addr; + // register an async here to quit server gracefully + srv->pAcceptAsync = calloc(1, sizeof(uv_async_t)); + uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb); + srv->pAcceptAsync->data = srv; + struct sockaddr_in bind_addr; uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { tError("failed to bind: %s", uv_err_name(err)); @@ -646,24 +663,43 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { if (pThrd == NULL) { return; } - uv_stop(pThrd->loop); pthread_join(pThrd->thread, NULL); - // free(srv->pipe[i]); free(pThrd->loop); - pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->workerAsync); free(pThrd); } +void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + + pthread_mutex_lock(&pThrd->msgMtx); + QUEUE_PUSH(&pThrd->msg, &srvMsg->q); + pthread_mutex_unlock(&pThrd->msgMtx); + tDebug("send quit msg to work thread"); + + uv_async_send(pThrd->workerAsync); +} + void taosCloseServer(void* arg) { // impl later SServerObj* srv = arg; for (int i = 0; i < srv->numOfThreads; i++) { + sendQuitToWorkThrd(srv->pThreadObj[i]); destroyWorkThrd(srv->pThreadObj[i]); } - uv_stop(srv->loop); + + tDebug("send quit msg to accept thread"); + uv_async_send(srv->pAcceptAsync); + pthread_join(srv->thread, NULL); + + free(srv->pThreadObj); + free(srv->pAcceptAsync); free(srv->loop); + + for (int i = 0; i < srv->numOfThreads; i++) { + free(srv->pipe[i]); + } free(srv->pipe); - free(srv->pThreadObj); - pthread_join(srv->thread, NULL); + free(srv); } diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 8b6fd9c45e0a341ab6b28b59512a609d21e74bcc..08c683590b56d6c070b28bed5702cac3a5560eec 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -29,7 +29,7 @@ class TransObj { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = (char *)label; - rpcInit.numOfThreads = 1; + rpcInit.numOfThreads = 5; rpcInit.cfp = NULL; rpcInit.sessions = 100; rpcInit.idleTime = 100; @@ -37,12 +37,22 @@ class TransObj { rpcInit.secret = (char *)secret; rpcInit.ckey = (char *)ckey; rpcInit.spi = 1; + } + bool startCli() { + trans = NULL; rpcInit.connType = TAOS_CONN_CLIENT; - trans = rpcOpen(&rpcInit); + return trans != NULL ? true : false; + } + bool startSrv() { + trans = NULL; + rpcInit.connType = TAOS_CONN_SERVER; + trans = rpcOpen(&rpcInit); + return trans != NULL ? true : false; } bool stop() { rpcClose(trans); + trans = NULL; return true; } @@ -63,4 +73,10 @@ class TransEnv : public ::testing::Test { TransObj *tr = NULL; }; -TEST_F(TransEnv, test_start_stop) { assert(tr->stop()); } +TEST_F(TransEnv, test_start_stop) { + assert(tr->startCli()); + assert(tr->stop()); + + assert(tr->startSrv()); + assert(tr->stop()); +}