From 3cc9979a9979ced5d9816fc6998e0d6b308a5ed2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 18 Mar 2022 09:17:56 +0800 Subject: [PATCH] handle except --- include/libs/transport/trpc.h | 13 +-- source/client/src/clientEnv.c | 1 - source/libs/qcom/src/queryUtil.c | 4 + source/libs/transport/inc/transComm.h | 12 ++- source/libs/transport/src/trans.c | 2 - source/libs/transport/src/transCli.c | 75 ++++++++------ source/libs/transport/src/transSrv.c | 135 ++++++++++++++++++-------- source/libs/transport/test/transUT.cc | 23 +---- 8 files changed, 163 insertions(+), 102 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 6bc28e3ea0..d8dcf72bed 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -42,9 +42,10 @@ typedef struct SRpcMsg { void * pCont; int contLen; int32_t code; - void * handle; // rpc handle returned to app - void * ahandle; // app handle set by client - int noResp; // has response or not(default 0 indicate resp); + void * handle; // rpc handle returned to app + void * ahandle; // app handle set by client + int noResp; // has response or not(default 0 indicate resp); + int persistHandle; // persist handle or not } SRpcMsg; @@ -69,15 +70,9 @@ typedef struct SRpcInit { // call back to retrieve the client auth info, for server app only int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); - // call back to keep conn or not - bool (*pfp)(void *parent, tmsg_t msgType); - // to support Send messages multiple times on a link void *(*mfp)(void *parent, tmsg_t msgType); - // call back to handle except when query/fetch in progress - bool (*efp)(void *parent, tmsg_t msgType); - void *parent; } SRpcInit; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 525c5f9fb8..2e9a707dd3 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -90,7 +90,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.label = "TSC"; rpcInit.numOfThreads = numOfThread; rpcInit.cfp = processMsgFromServer; - rpcInit.pfp = persistConnForSpecificMsg; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)user; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 63fbf59c06..b39d3e6e37 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -155,6 +155,10 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp .ahandle = (void*)pInfo, .handle = pInfo->msgInfo.handle, .code = 0}; + if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH || + pInfo->msgType == TDMT_VND_QUERY_CONTINUE) { + rpcMsg.persistHandle = 1; + } assert(pInfo->fp != NULL); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 8ea65b193d..a60531a429 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -150,11 +150,12 @@ typedef struct { typedef struct { char version : 4; // RPC version - char comp : 4; // compression algorithm, 0:no compression 1:lz4 - char resflag : 2; // reserved bits - char spi : 1; // security parameter index + char comp : 2; // compression algorithm, 0:no compression 1:lz4 + char noResp : 2; // noResp bits, 0: resp, 1: resp + char persist : 2; // persist handle,0: no persit, 1: persist handle + char release : 2; char secured : 2; - char encrypt : 3; // encrypt algorithm, 0: no encryption + char spi : 2; uint32_t code; // del later uint32_t msgType; @@ -179,6 +180,9 @@ typedef struct { #pragma pack(pop) +typedef enum { Normal, Quit, Release } STransMsgType; +typedef enum { ConnNormal, ConnAcquire, ConnRelease } ConnStatus; + #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 2cab03f133..a688e9981e 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,9 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; - pRpc->pfp = pInit->pfp; pRpc->mfp = pInit->mfp; - pRpc->efp = pInit->efp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 31097a591f..7100c34845 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,11 +17,6 @@ #include "transComm.h" -// Normal(default): send/recv msg -// Quit: quit rpc inst -// Release: release handle to rpc inst -typedef enum { Normal, Quit, Release } SCliMsgType; - typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -36,7 +31,8 @@ typedef struct SCliConn { int hThrdIdx; bool broken; // link broken or not - int persist; // + ConnStatus status; // + int release; // 1: release // spi configure char spi; char secured; @@ -55,7 +51,7 @@ typedef struct SCliMsg { STransMsg msg; queue q; uint64_t st; - SCliMsgType type; + STransMsgType type; } SCliMsg; typedef struct SCliThrdObj { @@ -113,10 +109,12 @@ static void cliSend(SCliConn* pConn); static void cliHandleResp(SCliConn* conn); // handle except about conn static void cliHandleExcept(SCliConn* conn); + // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease}; static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(STransMsg* userdata); @@ -133,6 +131,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + if (T_REF_VAL_GET(conn) == 1) { \ + SCliThrdObj* thrd = conn->hostThrd; \ + addConnToPool(thrd->pool, conn); \ + } \ + goto _RETURN; \ + } \ + } while (0) + #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ @@ -151,14 +163,15 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_SET_PERSIST_BY_APP(conn) \ do { \ - if (conn->persist == false) { \ - conn->persist = true; \ + if (conn->status == ConnNormal) { \ + conn->status = ConnAcquire; \ transRefCliHandle(conn); \ } \ } while (0) -#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) +#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) +#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) static void* cliWorkThread(void* arg); @@ -177,7 +190,6 @@ void cliHandleResp(SCliConn* conn) { STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); - STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -185,6 +197,8 @@ void cliHandleResp(SCliConn* conn) { transMsg.msgType = pHead->msgType; transMsg.ahandle = NULL; + CONN_SHOULD_RELEASE(conn, pHead); + SCliMsg* pMsg = NULL; if (taosArrayGetSize(conn->cliMsgs) > 0) { pMsg = taosArrayGetP(conn->cliMsgs, 0); @@ -200,9 +214,8 @@ void cliHandleResp(SCliConn* conn) { // buf's mem alread translated to transMsg.pCont transClearBuffer(&conn->readBuf); - if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, transMsg.msgType)) { + if (!CONN_NO_PERSIST_BY_APP(conn)) { transMsg.handle = conn; - CONN_SET_PERSIST_BY_APP(conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } @@ -241,6 +254,8 @@ void cliHandleResp(SCliConn* conn) { if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } +_RETURN: + return; } void cliHandleExcept(SCliConn* pConn) { @@ -367,6 +382,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + conn->status = ConnNormal; // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -423,8 +439,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; - conn->persist = false; - conn->broken = false; + conn->status = ConnNormal; + conn->broken = 0; transRefCliHandle(conn); return conn; } @@ -513,7 +529,9 @@ void cliSend(SCliConn* pConn) { msgLen += sizeof(STransUserMsg); } - pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0; + pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; + + pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); @@ -522,6 +540,9 @@ void cliSend(SCliConn* pConn) { TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + if (pHead->persist == 1) { + CONN_SET_PERSIST_BY_APP(pConn); + } pConn->writeReq.data = pConn; uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); @@ -571,12 +592,12 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { } transDestroyBuffer(&conn->readBuf); - if (conn->persist && T_REF_VAL_GET(conn) >= 2) { - conn->persist = false; + conn->status = ConnRelease; + int ref = T_REF_VAL_GET(conn); + if (ref == 2) { transUnrefCliHandle(conn); + } else if (ref == 1) { addConnToPool(pThrd->pool, conn); - } else { - transUnrefCliHandle(conn); } } @@ -652,14 +673,10 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - - if (pMsg->type == Normal) { - cliHandleReq(pMsg, pThrd); - } else if (pMsg->type == Quit) { - cliHandleQuit(pMsg, pThrd); - } else if (pMsg->type == Release) { - cliHandleRelease(pMsg, pThrd); + if (pMsg == NULL) { + continue; } + (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); count++; } if (count >= 2) { @@ -802,8 +819,8 @@ void transReleaseCliHandle(void* handle) { STransMsg tmsg = {.handle = handle}; SCliMsg* cmsg = calloc(1, sizeof(SCliMsg)); - cmsg->type = Release; cmsg->msg = tmsg; + cmsg->type = Release; transSendAsync(thrd->asyncPool, &cmsg->q); } @@ -833,6 +850,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p cliMsg->ctx = pCtx; cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); + cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); @@ -858,6 +876,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); + cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 2efdb109aa..321a3489b7 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -35,6 +35,7 @@ typedef struct SSrvConn { bool broken; // conn broken; + ConnStatus status; struct sockaddr_in addr; struct sockaddr_in locaddr; @@ -47,18 +48,18 @@ typedef struct SSrvConn { } SSrvConn; typedef struct SSrvMsg { - SSrvConn* pConn; - STransMsg msg; - queue q; + SSrvConn* pConn; + STransMsg msg; + queue q; + STransMsgType type; } SSrvMsg; typedef struct SWorkThrdObj { - pthread_t thread; - uv_pipe_t* pipe; - uv_os_fd_t fd; - uv_loop_t* loop; - SAsyncPool* asyncPool; - + pthread_t thread; + uv_pipe_t* pipe; + uv_os_fd_t fd; + uv_loop_t* loop; + SAsyncPool* asyncPool; queue msg; pthread_mutex_t msgMtx; @@ -113,6 +114,11 @@ static void destroySmsg(SSrvMsg* smsg); static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); +static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); +static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); +static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd); +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease}; + static void uvDestroyConn(uv_handle_t* handle); // server and worker thread @@ -217,7 +223,6 @@ static void uvHandleReq(SSrvConn* pConn) { if (pHead->secured == 1) { pHead->msgLen -= sizeof(STransUserMsg); } - // } STransMsg transMsg; @@ -230,24 +235,32 @@ static void uvHandleReq(SSrvConn* pConn) { transClearBuffer(&pConn->readBuf); pConn->inType = pHead->msgType; - - if (pHead->resflag == 0) { + if (pConn->status == ConnNormal) { + if (pHead->persist == 1) { + pConn->status = ConnAcquire; + transRefSrvHandle(pConn); + } + } + if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - transMsg.handle = pConn; tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); } else { - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn, + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); + inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); + // no ref here + } + + if (pHead->noResp == 0) { + transMsg.handle = pConn; } STrans* pTransInst = (STrans*)p->shandle; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth - // validate msg type } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -272,7 +285,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tError("server conn %p read error: %s", conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; - uvNotifyLinkBrokenToApp(conn); + // uvNotifyLinkBrokenToApp(conn); // STrans* pTransInst = conn->pTransInst; // if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) { @@ -301,8 +314,11 @@ void uvOnSendCb(uv_write_t* req, int status) { SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); taosArrayRemove(conn->srvMsgs, 0); + if (msg->type == Release && conn->status != ConnNormal) { + conn->status = ConnNormal; + transUnrefSrvHandle(conn); + } destroySmsg(msg); - // send second data, just use for push if (taosArrayGetSize(conn->srvMsgs) > 0) { tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); @@ -339,6 +355,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { pHead->secured = pMsg->code == 0 ? 1 : 0; // pHead->msgType = smsg->pConn->inType + 1; + pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); // add more info char* msg = (char*)pHead; @@ -371,10 +388,12 @@ static void uvStartSendResp(SSrvMsg* smsg) { transUnrefSrvHandle(pConn); return; } - transUnrefSrvHandle(pConn); + if (pConn->status == ConnNormal) { + transUnrefSrvHandle(pConn); + } if (taosArrayGetSize(pConn->srvMsgs) > 0) { - tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), + tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); taosArrayPush(pConn->srvMsgs, &smsg); return; @@ -408,6 +427,9 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { QUEUE_INIT(h); SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); + while (T_REF_VAL_GET(c) >= 2) { + transUnrefSrvHandle(c); + } transUnrefSrvHandle(c); } } @@ -431,20 +453,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("unexcept occurred, continue"); continue; } - if (msg->pConn == NULL) { - free(msg); - bool noConn = QUEUE_IS_EMPTY(&pThrd->conn); - if (noConn == true) { - uv_loop_close(pThrd->loop); - uv_stop(pThrd->loop); - } else { - destroyAllConn(pThrd); - // uv_loop_close(pThrd->loop); - pThrd->quit = true; - } - } else { - uvStartSendResp(msg); - } + (*transAsyncHandle[msg->type])(msg, pThrd); } } static void uvAcceptAsyncCb(uv_async_t* async) { @@ -633,6 +642,7 @@ static SSrvConn* createConn(void* hThrd) { tTrace("conn %p created", pConn); pConn->broken = false; + pConn->status = ConnNormal; transRefSrvHandle(pConn); return pConn; @@ -748,7 +758,38 @@ End: transCloseServer(srv); return NULL; } - +void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { + if (QUEUE_IS_EMPTY(&thrd->conn)) { + uv_loop_close(thrd->loop); + uv_stop(thrd->loop); + } else { + destroyAllConn(thrd); + thrd->quit = true; + } + free(msg); +} +void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { + // release handle to rpc init + SSrvConn* conn = msg->pConn; + if (conn->status == ConnAcquire) { + if (taosArrayGetSize(conn->srvMsgs) > 0) { + taosArrayPush(conn->srvMsgs, &msg); + } + taosArrayPush(conn->srvMsgs, &msg); + uvStartSendRespInternal(msg); + return; + } else if (conn->status == ConnRelease) { + // already release by server app, do nothing + } else if (conn->status == ConnNormal) { + // no nothing + // user should not call this rpcRelease handle; + } + free(msg); +} +void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) { + // send msg to client + uvStartSendResp(msg); +} void destroyWorkThrd(SWorkThrdObj* pThrd) { if (pThrd == NULL) { return; @@ -759,10 +800,10 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { free(pThrd); } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { - SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + SSrvMsg* msg = calloc(1, sizeof(SSrvMsg)); + msg->type = Quit; tDebug("server send quit msg to work thread"); - - transSendAsync(pThrd->asyncPool, &srvMsg->q); + transSendAsync(pThrd->asyncPool, &msg->q); } void transCloseServer(void* arg) { @@ -813,8 +854,21 @@ void transUnrefSrvHandle(void* handle) { } void transReleaseSrvHandle(void* handle) { - // do nothing currently - // + if (handle == NULL) { + return; + } + SSrvConn* pConn = handle; + SWorkThrdObj* pThrd = pConn->hostThrd; + + STransMsg tmsg = {.handle = handle, .code = 0}; + + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + srvMsg->msg = tmsg; + srvMsg->type = Release; + srvMsg->pConn = pConn; + + tTrace("server conn %p start to release", pConn); + transSendAsync(pThrd->asyncPool, &srvMsg->q); } void transSendResponse(const STransMsg* pMsg) { if (pMsg->handle == NULL) { @@ -826,6 +880,7 @@ void transSendResponse(const STransMsg* pMsg) { SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); srvMsg->pConn = pConn; srvMsg->msg = *pMsg; + srvMsg->type = Normal; tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); } diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index ec89d695a2..31015359f4 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -31,11 +31,6 @@ class Server; int port = 7000; // server process -static bool cliPersistHandle(void *parent, tmsg_t msgType) { - // client persist handle - return msgType == 2 || msgType == 4; -} - typedef struct CbArgs { tmsg_t msgType; } CbArgs; @@ -93,7 +88,6 @@ class Client { } void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); - rpcInit_.pfp = pfp; this->transCli = rpcOpen(&rpcInit_); } void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { @@ -103,8 +97,6 @@ class Client { } void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); - - rpcInit_.pfp = pfp; rpcInit_.mfp = mfp; this->transCli = rpcOpen(&rpcInit_); } @@ -149,7 +141,6 @@ class Server { rpcInit_.label = (char *)label; rpcInit_.numOfThreads = 5; rpcInit_.cfp = processReq; - rpcInit_.efp = NULL; rpcInit_.user = (char *)user; rpcInit_.secret = (char *)secret; rpcInit_.ckey = (char *)ckey; @@ -167,7 +158,6 @@ class Server { } void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { this->Stop(); - rpcInit_.efp = efp; this->Start(); } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { @@ -358,10 +348,10 @@ TEST_F(TransEnv, clientUserDefined) { } TEST_F(TransEnv, cliPersistHandle) { - tr->SetCliPersistFp(cliPersistHandle); + // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle, .noResp = 0}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -378,11 +368,9 @@ TEST_F(TransEnv, cliPersistHandle) { } TEST_F(TransEnv, cliReleaseHandle) { - tr->SetCliPersistFp(cliPersistHandle); - SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -398,7 +386,7 @@ TEST_F(TransEnv, cliReleaseHandle) { ////////////////// } TEST_F(TransEnv, cliReleaseHandleExcept) { - tr->SetCliPersistFp(cliPersistHandle); + // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { @@ -431,7 +419,7 @@ TEST_F(TransEnv, srvContinueSend) { TEST_F(TransEnv, srvPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); - tr->SetCliPersistFp(cliPersistHandle); + // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 5; i++) { SRpcMsg req = {.handle = resp.handle}; @@ -450,7 +438,6 @@ TEST_F(TransEnv, srvPersistHandleExcept) { } TEST_F(TransEnv, cliPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); - tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 5; i++) { SRpcMsg req = {.handle = resp.handle}; -- GitLab