diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d64df9b0f3aea486560479f55a02274d3ff47162..199ee43694e0a36659b34e5db9a867c453d1f65d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -45,13 +45,13 @@ typedef struct SCliThrdObj { pthread_t thread; uv_loop_t* loop; uv_async_t* cliAsync; // - uv_timer_t* pTimer; + uv_timer_t* timer; void* pool; // conn pool queue msg; pthread_mutex_t msgMtx; uint64_t nextTimeout; // next timeout void* pTransInst; // - + bool quit; } SCliThrdObj; typedef struct SClientObj { @@ -94,6 +94,8 @@ static void clientHandleResp(SCliConn* conn); static void clientHandleExcept(SCliConn* conn); // handle req from app static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientSendQuit(SCliThrdObj* thrd); static void destroyUserdata(SRpcMsg* userdata); @@ -136,8 +138,8 @@ static void clientHandleResp(SCliConn* conn) { destroyCmsg(pMsg); conn->data = NULL; // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { - uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { + uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } static void clientHandleExcept(SCliConn* pConn) { @@ -155,7 +157,7 @@ static void clientHandleExcept(SCliConn* pConn) { SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; - rpcMsg.code = -1; + rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; // SRpcInfo* pRpc = pMsg->ctx->pRpc; (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); pConn->notifyCount += 1; @@ -332,9 +334,8 @@ static void clientWriteCb(uv_write_t* req, int status) { tDebug("conn %p data already was written out", pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { - destroy - // handle - return; + // handle + return; } destroyUserdata(&pMsg->msg); } else { @@ -375,6 +376,15 @@ static void clientConnCb(uv_connect_t* req, int status) { clientWrite(pConn); } +static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { + tDebug("thread %p start to quit", pThrd); + destroyCmsg(pMsg); + uv_close((uv_handle_t*)pThrd->cliAsync, NULL); + uv_timer_stop(pThrd->timer); + pThrd->quit = true; + // uv__async_stop(pThrd->cliAsync); + uv_stop(pThrd->loop); +} static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; @@ -389,7 +399,13 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->data = pMsg; conn->writeReq->data = conn; transDestroyBuffer(&conn->readBuf); + + if (pThrd->quit) { + clientHandleExcept(conn); + return; + } clientWrite(conn); + } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); conn->ref++; @@ -430,7 +446,12 @@ static void clientAsyncCb(uv_async_t* handle) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - clientHandleReq(pMsg, pThrd); + if (pMsg->ctx == NULL) { + clientHandleQuit(pMsg, pThrd); + } else { + clientHandleReq(pMsg, pThrd); + } + // clientHandleReq(pMsg, pThrd); count++; } if (count >= 2) { @@ -458,7 +479,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); if (err == 0) { - tDebug("sucess to create tranport-client thread %d", i); + tDebug("success to create tranport-client thread %d", i); } cli->pThreadObj[i] = pThrd; } @@ -492,11 +513,13 @@ static SCliThrdObj* createThrdObj() { uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); pThrd->cliAsync->data = pThrd; - pThrd->pTimer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pThrd->loop, pThrd->pTimer); - pThrd->pTimer->data = pThrd; + pThrd->timer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pThrd->timer); + pThrd->timer->data = pThrd; pThrd->pool = creatConnPool(1); + + pThrd->quit = false; return pThrd; } static void destroyThrdObj(SCliThrdObj* pThrd) { @@ -506,6 +529,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { pthread_join(pThrd->thread, NULL); pthread_mutex_destroy(&pThrd->msgMtx); free(pThrd->cliAsync); + free(pThrd->timer); free(pThrd->loop); free(pThrd); } @@ -517,10 +541,22 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { free(ctx); } // +static void clientSendQuit(SCliThrdObj* thrd) { + // cli can stop gracefully + SCliMsg* msg = calloc(1, sizeof(SCliMsg)); + msg->ctx = NULL; // + + pthread_mutex_lock(&thrd->msgMtx); + QUEUE_PUSH(&thrd->msg, &msg->q); + pthread_mutex_unlock(&thrd->msgMtx); + + uv_async_send(thrd->cliAsync); +} void taosCloseClient(void* arg) { // impl later SClientObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { + clientSendQuit(cli->pThreadObj[i]); destroyThrdObj(cli->pThreadObj[i]); } free(cli->pThreadObj); diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index c61f688060b2e0ec85ddaf2b31534e7830e961bf..3d9c396336a0c8a17cc0e43996f276ba7843c5db 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -1,6 +1,12 @@ add_executable(transportTest "") add_executable(client "") add_executable(server "") +add_executable(transUT "") + +target_sources(transUT + PRIVATE + "transUT.cc" +) target_sources(transportTest PRIVATE @@ -28,6 +34,13 @@ target_link_libraries (transportTest gtest_main transport ) +target_link_libraries (transUT + os + util + common + gtest_main + transport +) target_include_directories(client PUBLIC @@ -48,6 +61,13 @@ target_include_directories(server "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(transUT + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + + target_link_libraries (server os util diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc new file mode 100644 index 0000000000000000000000000000000000000000..8b6fd9c45e0a341ab6b28b59512a609d21e74bcc --- /dev/null +++ b/source/libs/transport/test/transUT.cc @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include +#include +#include "trpc.h" +using namespace std; + +class TransObj { + public: + TransObj() { + const char *label = "APP"; + const char *secret = "secret"; + const char *user = "user"; + const char *ckey = "ckey"; + + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = (char *)label; + rpcInit.numOfThreads = 1; + rpcInit.cfp = NULL; + rpcInit.sessions = 100; + rpcInit.idleTime = 100; + rpcInit.user = (char *)user; + rpcInit.secret = (char *)secret; + rpcInit.ckey = (char *)ckey; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_CLIENT; + + trans = rpcOpen(&rpcInit); + } + bool stop() { + rpcClose(trans); + return true; + } + + private: + void * trans; + SRpcInit rpcInit; +}; +class TransEnv : public ::testing::Test { + protected: + virtual void SetUp() { + // set up trans obj + tr = new TransObj(); + } + virtual void TearDown() { + // tear down + delete tr; + } + + TransObj *tr = NULL; +}; +TEST_F(TransEnv, test_start_stop) { assert(tr->stop()); }