From 5e8cb50a20f296fb3f631c13dc4e2f93558faeb4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jan 2022 20:01:27 +0800 Subject: [PATCH] refactor rpc --- source/libs/transport/inc/transComm.h | 2 ++ source/libs/transport/src/transCli.c | 24 +++++++++++++++++++----- source/libs/transport/src/transComm.c | 5 +++++ source/libs/transport/src/transSrv.c | 12 ++++++++++++ 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 506b085ecd..4d3a1b70cb 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -199,4 +199,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); bool transCompressMsg(char* msg, int32_t len, int32_t* flen); bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); +void transConnCtxDestroy(STransConnCtx* ctx); + #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8622db9b3f..c96a0f81e0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -63,6 +63,8 @@ static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); static void clientConnDestroy(SCliConn* pConn); +static void clientMsgDestroy(SCliMsg* pMsg); + static void* clientThread(void* arg); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); @@ -235,9 +237,22 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, } return cli; } +static void clientMsgDestroy(SCliMsg* pMsg) { + // impl later + free(pMsg); +} void taosCloseClient(void* arg) { // impl later SClientObj* cli = arg; + for (int i = 0; i < cli->numOfThreads; i++) { + SCliThrdObj* pThrd = cli->pThreadObj[i]; + pthread_join(pThrd->thread, NULL); + pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->loop); + free(pThrd); + } + free(cli->pThreadObj); + free(cli); } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { @@ -247,19 +262,18 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* SRpcInfo* pRpc = (SRpcInfo*)shandle; - int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + int32_t flen = 0; + if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { + // imp later + } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->pRpc = (SRpcInfo*)shandle; pCtx->ahandle = pMsg->ahandle; - // pContext->contLen = len; - // pContext->pCont = pMsg->pCont; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; - // pContext->epSet = *pEpSet; - // pContext->oldInUse = pEpSet->inUse; assert(pRpc->connType == TAOS_CONN_CLIENT); // atomic or not diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 27cff80586..617abeea39 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -107,6 +107,7 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { } bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { + return false; // SRpcHead* pHead = rpcHeadFromCont(pCont); bool succ = false; int overhead = sizeof(STransCompMsg); @@ -186,4 +187,8 @@ SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { return pHead; } +void transConnCtxDestroy(STransConnCtx* ctx) { + free(ctx->ip); + free(ctx); +} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 46caffd93c..f4625dc0b3 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -496,6 +496,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { @@ -530,6 +531,17 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void taosCloseServer(void* arg) { // impl later SServerObj* srv = arg; + for (int i = 0; i < srv->numOfThreads; i++) { + SWorkThrdObj* pThrd = srv->pThreadObj[i]; + pthread_join(pThrd->thread, NULL); + free(srv->pipe[i]); + free(pThrd->loop); + free(pThrd); + } + free(srv->loop); + free(srv->pipe); + free(srv->pThreadObj); + pthread_join(srv->thread, NULL); } void rpcSendResponse(const SRpcMsg* pMsg) { -- GitLab