From c78517549edb501a1ab5a57bb551a2b9b394f59f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 16 May 2022 14:13:07 +0800 Subject: [PATCH] refactor: adjust SRpcMsg --- include/libs/transport/trpc.h | 52 +++++++++-------- source/libs/transport/src/transCli.c | 75 +++++++++++++------------ source/libs/transport/src/transSrv.c | 74 ++++++++++++------------ source/libs/transport/test/pushServer.c | 6 +- source/libs/transport/test/rclient.c | 4 +- source/libs/transport/test/rserver.c | 2 +- source/libs/transport/test/transUT.cpp | 45 ++++++++------- 7 files changed, 133 insertions(+), 125 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index a7d1522d12..c6896def22 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -26,39 +26,47 @@ extern "C" { #define TAOS_CONN_SERVER 0 #define TAOS_CONN_CLIENT 1 +#define IsReq(pMsg) (pMsg->msgType & 1U) extern int tsRpcHeadSize; -typedef struct SRpcConnInfo { +typedef struct { uint32_t clientIp; uint16_t clientPort; - uint32_t serverIp; char user[TSDB_USER_LEN]; } SRpcConnInfo; -typedef struct SRpcMsg { - tmsg_t msgType; - void * pCont; - int contLen; - int32_t code; - void * handle; // rpc handle returned to app - void * ahandle; // app handle set by client - int64_t refId; // refid, used by server - int noResp; // has response or not(default 0, 0: resp, 1: no resp); - int persistHandle; // persist handle or not +typedef struct { + // rpc info + struct { + void *handle; // rpc handle returned to app + int64_t refId; // refid, used by server + int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); + int32_t persistHandle; // persist handle or not + }; + // app info + struct { + void *ahandle; // app handle set by client + void *proc; // proc handle + void *wrapper; // wrapper handle + void *node; // node mgmt handle + }; + // resp info + struct { + void *rsp; + int32_t rspLen; + }; +} SRpcHandleInfo; +typedef struct SRpcMsg { + tmsg_t msgType; + void *pCont; + int32_t contLen; + int32_t code; + SRpcHandleInfo info; + SRpcConnInfo conn; } SRpcMsg; -typedef struct { - char user[TSDB_USER_LEN]; - uint32_t clientIp; - uint16_t clientPort; - SRpcMsg rpcMsg; - int32_t rspLen; - void * pRsp; - void * pNode; -} SNodeMsg; - typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); /// diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 26f3f689aa..d2d38bd9bc 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -223,8 +223,8 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); #define CONN_RELEASE_BY_SERVER(conn) \ (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) -#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) -#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) +#define REQUEST_NO_RESP(msg) ((msg)->info.noResp == 1) +#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) #define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) @@ -255,7 +255,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.pCont = transContFromHead((char*)pHead); transMsg.code = pHead->code; transMsg.msgType = pHead->msgType; - transMsg.ahandle = NULL; + transMsg.info.ahandle = NULL; SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; @@ -265,37 +265,38 @@ void cliHandleResp(SCliConn* conn) { pMsg = transQueuePop(&conn->cliMsgs); pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); - if (transMsg.ahandle == NULL) { - transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); + if (transMsg.info.ahandle == NULL) { + transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); } - tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle); + tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.info.ahandle); } else { - transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; - tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle); + transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; + tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.info.ahandle); } } else { uint64_t ahandle = (uint64_t)pHead->ahandle; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); if (pMsg == NULL) { - transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); - tDebug("cli conn %p construct ahandle %p by %s, persist: 1", conn, transMsg.ahandle, TMSG_INFO(transMsg.msgType)); - if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.ahandle == NULL) { + transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); + tDebug("cli conn %p construct ahandle %p by %s, persist: 1", conn, transMsg.info.ahandle, + TMSG_INFO(transMsg.msgType)); + if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); - tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle); + transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.info.ahandle); } } else { pCtx = pMsg ? pMsg->ctx : NULL; - transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; - tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.ahandle); + transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; + tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.info.ahandle); } } // buf's mem alread translated to transMsg.pCont transClearBuffer(&conn->readBuf); if (!CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.handle = conn; + transMsg.info.handle = conn; tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } @@ -308,7 +309,7 @@ void cliHandleResp(SCliConn* conn) { // transUnrefCliHandle(conn); return; } - if (CONN_RELEASE_BY_SERVER(conn) && transMsg.ahandle == NULL) { + if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { tTrace("except, server continue send while cli ignore it"); // transUnrefCliHandle(conn); return; @@ -357,24 +358,24 @@ void cliHandleExcept(SCliConn* pConn) { STransMsg transMsg = {0}; transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; - transMsg.ahandle = NULL; - transMsg.handle = pConn; + transMsg.info.ahandle = NULL; + transMsg.info.handle = pConn; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { - transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); - tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, + transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); + tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle, TMSG_INFO(transMsg.msgType)); - if (transMsg.ahandle == NULL) { - transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); + if (transMsg.info.ahandle == NULL) { + transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, - transMsg.ahandle); + transMsg.info.ahandle); } } else { - transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; } if (pCtx == NULL || pCtx->pSem == NULL) { - if (transMsg.ahandle == NULL) { + if (transMsg.info.ahandle == NULL) { once = true; continue; } @@ -668,7 +669,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { // uv_stop(pThrd->loop); } static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { - SCliConn* conn = pMsg->msg.handle; + SCliConn* conn = pMsg->msg.info.handle; tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); if (T_REF_VAL_GET(conn) == 2) { @@ -685,8 +686,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; - if (pMsg->msg.handle != NULL) { - conn = (SCliConn*)(pMsg->msg.handle); + if (pMsg->msg.info.handle != NULL) { + conn = (SCliConn*)(pMsg->msg.info.handle); if (conn != NULL) { tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn); } @@ -995,7 +996,7 @@ void transReleaseCliHandle(void* handle) { return; } - STransMsg tmsg = {.handle = handle}; + STransMsg tmsg = {.info.handle = handle}; SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cmsg->msg = tmsg; cmsg->type = Release; @@ -1005,14 +1006,14 @@ void transReleaseCliHandle(void* handle) { void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { STrans* pTransInst = (STrans*)shandle; - int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->handle); + int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->info.handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); } STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; - pCtx->ahandle = pReq->ahandle; + pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; pCtx->hThrdIdx = index; @@ -1030,20 +1031,20 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; tDebug("send request at thread:%d, threadID: %" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq, - EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)shandle; - int index = CONN_HOST_THREAD_INDEX(pReq->handle); + int index = CONN_HOST_THREAD_INDEX(pReq->info.handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); } STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; - pCtx->ahandle = pReq->ahandle; + pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; pCtx->hThrdIdx = index; pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); @@ -1058,7 +1059,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; tDebug("send request at thread:%d, threadID:%" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq, - EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index fc840691b6..8b3ed50c71 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -167,26 +167,26 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("server conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - reallocConnRefHandle(conn); \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \ + SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + reallocConnRefHandle(conn); \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ } while (0) #define SRV_RELEASE_UV(loop) \ @@ -266,8 +266,8 @@ static void uvHandleReq(SSrvConn* pConn) { transMsg.pCont = pHead->content; transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - transMsg.ahandle = (void*)pHead->ahandle; - transMsg.handle = NULL; + transMsg.info.ahandle = (void*)pHead->ahandle; + transMsg.info.handle = NULL; // transDestroyBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf); @@ -296,12 +296,12 @@ static void uvHandleReq(SSrvConn* pConn) { // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist - transMsg.handle = (void*)uvAcquireExHandle(pConn->refId); - tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.handle, pConn, pConn->refId); - transMsg.refId = pConn->refId; - assert(transMsg.handle != NULL); + transMsg.info.handle = (void*)uvAcquireExHandle(pConn->refId); + tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId); + transMsg.info.refId = pConn->refId; + assert(transMsg.info.handle != NULL); if (pHead->noResp == 1) { - transMsg.refId = -1; + transMsg.info.refId = -1; } uvReleaseExHandle(pConn->refId); @@ -421,7 +421,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - pHead->ahandle = (uint64_t)pMsg->ahandle; + pHead->ahandle = (uint64_t)pMsg->info.ahandle; if (pConn->status == ConnNormal) { pHead->msgType = pConn->inType + 1; @@ -525,8 +525,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } else { STransMsg transMsg = msg->msg; - SExHandle* exh1 = transMsg.handle; - int64_t refId = transMsg.refId; + SExHandle* exh1 = transMsg.info.handle; + int64_t refId = transMsg.info.refId; SExHandle* exh2 = uvAcquireExHandle(refId); if (exh2 == NULL || exh1 != exh2) { tTrace("server handle except msg %p, ignore it", exh1); @@ -1103,7 +1103,7 @@ void transReleaseSrvHandle(void* handle) { SWorkThrdObj* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); - STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = refId}; + STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId}; SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); srvMsg->msg = tmsg; @@ -1122,13 +1122,13 @@ _return2: return; } void transSendResponse(const STransMsg* msg) { - SExHandle* exh = msg->handle; - int64_t refId = msg->refId; + SExHandle* exh = msg->info.handle; + int64_t refId = msg->info.refId; ASYNC_CHECK_HANDLE(exh, refId); assert(refId != 0); STransMsg tmsg = *msg; - tmsg.refId = refId; + tmsg.info.refId = refId; SWorkThrdObj* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); @@ -1151,12 +1151,12 @@ _return2: return; } void transRegisterMsg(const STransMsg* msg) { - SExHandle* exh = msg->handle; - int64_t refId = msg->refId; + SExHandle* exh = msg->info.handle; + int64_t refId = msg->info.refId; ASYNC_CHECK_HANDLE(exh, refId); STransMsg tmsg = *msg; - tmsg.refId = refId; + tmsg.info.refId = refId; SWorkThrdObj* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index d8d78c2842..61f3431b77 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -69,11 +69,11 @@ void processShellMsg() { memset(&rpcMsg, 0, sizeof(rpcMsg)); rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; - rpcMsg.handle = pRpcMsg->handle; + rpcMsg.info = pRpcMsg->info; rpcMsg.code = 0; rpcSendResponse(&rpcMsg); - void *handle = pRpcMsg->handle; + void *handle = pRpcMsg->info.handle; taosFreeQitem(pRpcMsg); { @@ -81,7 +81,7 @@ void processShellMsg() { SRpcMsg nRpcMsg = {0}; nRpcMsg.pCont = rpcMallocCont(msgSize); nRpcMsg.contLen = msgSize; - nRpcMsg.handle = handle; + nRpcMsg.info.handle = handle; nRpcMsg.code = TSDB_CODE_CTG_NOT_READY; rpcSendResponse(&nRpcMsg); } diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 2b1c4fa623..78964e5324 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -32,7 +32,7 @@ typedef struct { void * pRpc; } SInfo; static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SInfo *pInfo = (SInfo *)pMsg->ahandle; + SInfo *pInfo = (SInfo *)pMsg->info.ahandle; // tError("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, // pMsg->code); @@ -61,7 +61,7 @@ static void *sendRequest(void *param) { pInfo->num++; rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.contLen = pInfo->msgSize; - rpcMsg.ahandle = pInfo; + rpcMsg.info.ahandle = pInfo; rpcMsg.msgType = 1; // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); int64_t start = taosGetTimestampUs(); diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 80785340d1..e852b1e6e2 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -69,7 +69,7 @@ void processShellMsg() { memset(&rpcMsg, 0, sizeof(rpcMsg)); rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; - rpcMsg.handle = pRpcMsg->handle; + rpcMsg.info = pRpcMsg->info; rpcMsg.code = 0; rpcSendResponse(&rpcMsg); diff --git a/source/libs/transport/test/transUT.cpp b/source/libs/transport/test/transUT.cpp index 9dbebd6cfe..51a0299374 100644 --- a/source/libs/transport/test/transUT.cpp +++ b/source/libs/transport/test/transUT.cpp @@ -83,9 +83,9 @@ class Client { *resp = this->resp; } void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { - if (req->handle != NULL) { - rpcReleaseHandle(req->handle, TAOS_CONN_CLIENT); - req->handle = NULL; + if (req->info.handle != NULL) { + rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT); + req->info.handle = NULL; } SendAndRecv(req, resp); } @@ -154,7 +154,7 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg rpcMsg = {0}; rpcMsg.pCont = rpcMallocCont(100); rpcMsg.contLen = 100; - rpcMsg.handle = pMsg->handle; + rpcMsg.info = pMsg->info; rpcMsg.code = 0; rpcSendResponse(&rpcMsg); } @@ -164,7 +164,7 @@ static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg rpcMsg = {0}; rpcMsg.pCont = rpcMallocCont(100); rpcMsg.contLen = 100; - rpcMsg.handle = pMsg->handle; + rpcMsg.info = pMsg->info; rpcMsg.code = 0; rpcSendResponse(&rpcMsg); } @@ -173,19 +173,18 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) SRpcMsg rpcMsg = {0}; rpcMsg.pCont = rpcMallocCont(100); rpcMsg.contLen = 100; - rpcMsg.handle = pMsg->handle; + rpcMsg.info = pMsg->info; rpcMsg.code = 0; rpcSendResponse(&rpcMsg); - rpcReleaseHandle(pMsg->handle, TAOS_CONN_SERVER); + rpcReleaseHandle(pMsg->info.handle, TAOS_CONN_SERVER); } static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { - void *handle = pMsg->handle; { SRpcMsg rpcMsg1 = {0}; rpcMsg1.pCont = rpcMallocCont(100); rpcMsg1.contLen = 100; - rpcMsg1.handle = handle; + rpcMsg1.info = pMsg->info; rpcMsg1.code = 0; rpcRegisterBrokenLinkArg(&rpcMsg1); } @@ -194,7 +193,7 @@ static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) SRpcMsg rpcMsg = {0}; rpcMsg.pCont = rpcMallocCont(100); rpcMsg.contLen = 100; - rpcMsg.handle = pMsg->handle; + rpcMsg.info = pMsg->info; rpcMsg.code = 0; rpcSendResponse(&rpcMsg); } @@ -334,8 +333,8 @@ TEST_F(TransEnv, cliPersistHandle) { void * handle = NULL; for (int i = 0; i < 10; i++) { SRpcMsg req = {0}; - req.handle = resp.handle; - req.persistHandle = 1; + req.info = resp.info; + req.info.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); @@ -348,7 +347,7 @@ TEST_F(TransEnv, cliPersistHandle) { // if (i >= 6) { // EXPECT_TRUE(resp.code != 0); //} - handle = resp.handle; + handle = resp.info.handle; } rpcReleaseHandle(handle, TAOS_CONN_CLIENT); for (int i = 0; i < 10; i++) { @@ -371,8 +370,8 @@ TEST_F(TransEnv, srvReleaseHandle) { SRpcMsg req = {0}; for (int i = 0; i < 1; i++) { memset(&req, 0, sizeof(req)); - req.handle = resp.handle; - req.persistHandle = 1; + req.info = resp.info; + req.info.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -387,8 +386,8 @@ TEST_F(TransEnv, cliReleaseHandleExcept) { SRpcMsg req = {0}; for (int i = 0; i < 3; i++) { memset(&req, 0, sizeof(req)); - req.handle = resp.handle; - req.persistHandle = 1; + req.info = resp.info; + req.info.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -424,7 +423,7 @@ TEST_F(TransEnv, srvPersistHandleExcept) { SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { memset(&req, 0, sizeof(req)); - req.handle = resp.handle; + req.info = resp.info; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -444,7 +443,7 @@ TEST_F(TransEnv, cliPersistHandleExcept) { SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { memset(&req, 0, sizeof(req)); - req.handle = resp.handle; + req.info = resp.info; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -468,14 +467,14 @@ TEST_F(TransEnv, queryExcept) { SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { memset(&req, 0, sizeof(req)); - req.handle = resp.handle; - req.persistHandle = 1; + req.info = resp.info; + req.info.persistHandle = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; tr->cliSendAndRecv(&req, &resp); if (i == 2) { - rpcReleaseHandle(resp.handle, TAOS_CONN_CLIENT); + rpcReleaseHandle(resp.info.handle, TAOS_CONN_CLIENT); tr->StopCli(); break; } @@ -487,7 +486,7 @@ TEST_F(TransEnv, noResp) { SRpcMsg req = {0}; for (int i = 0; i < 5; i++) { memset(&req, 0, sizeof(req)); - req.noResp = 1; + req.info.noResp = 1; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; -- GitLab