From affa8a7ef275b7304cfc7f7d321d0dc5b3971234 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 21 Jan 2022 22:51:59 +0800 Subject: [PATCH] refactor rpc --- source/libs/transport/src/transCli.c | 35 +++++++++++++++++----------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 00bc1b621f..bf395d62e5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,6 +17,8 @@ #include "transComm.h" +#define CONN_PERSIST_TIME(para) (para * 1000 * 100) + typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; @@ -102,6 +104,9 @@ static void clientProcessData(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn); + if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { + uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + } free(pCtx->ip); free(pCtx); // impl @@ -112,6 +117,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; + tDebug("timeout, try to remove expire conn from connCache"); SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); while (p != NULL) { @@ -128,8 +134,8 @@ static void clientTimeoutCb(uv_timer_t* handle) { p = taosHashIterate((SHashObj*)pThrd->cache, p); } - pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; - uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0); + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); + uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } static void* connCacheCreate(int size) { SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -158,8 +164,7 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { SConnList* plist = taosHashGet(pCache, key, strlen(key)); if (plist == NULL) { SConnList list; - plist = &list; - taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist)); + taosHashPut(pCache, key, strlen(key), (void*)&list, sizeof(list)); plist = taosHashGet(pCache, key, strlen(key)); QUEUE_INIT(&plist->conn); } @@ -177,7 +182,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; - conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); // list already create before assert(plist != NULL); @@ -374,14 +379,13 @@ static void clientAsyncCb(uv_async_t* handle) { clientHandleReq(pMsg, pThrd); count++; if (count >= 2) { - tError("send batch size: %d", count); + tDebug("send batch size: %d", count); } } } static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; - uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -409,8 +413,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, pThrd->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pThrd->pTimer); pThrd->pTimer->data = pThrd; - pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->cache = connCacheCreate(1); pThrd->pTransInst = shandle; @@ -426,16 +430,19 @@ static void clientMsgDestroy(SCliMsg* pMsg) { // impl later free(pMsg); } +static void destroyThrdObj(SCliThrdObj* pThrd) { + pthread_join(pThrd->thread, NULL); + pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->cliAsync); + free(pThrd->loop); + free(pThrd); +} +// void taosCloseClient(void* arg) { // impl later SClientObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrdObj* pThrd = cli->pThreadObj[i]; - pthread_join(pThrd->thread, NULL); - pthread_mutex_destroy(&pThrd->msgMtx); - free(pThrd->cliAsync); - free(pThrd->loop); - free(pThrd); + destroyThrdObj(cli->pThreadObj[i]); } free(cli->pThreadObj); free(cli); -- GitLab