diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 317f80c48d00b0da50505a059e56b3ea05ff309a..9d0fba488510a5c5492fda3795b007760b5d0936 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -144,7 +144,7 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } -void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { rpcSendResponse(msg); } +void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { transRegisterMsg(msg); } void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8415b4d83d13816c591a609f9ec240944e7b658b..fe5d8bd7f5de8ac85a8c4f34c98e3f0d2b3efb8c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -132,27 +132,29 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tDebug("cli conn %p receive release request", conn); \ - if (T_REF_VAL_GET(conn) == 1) { \ - SCliThrdObj* thrd = conn->hostThrd; \ - addConnToPool(thrd->pool, conn); \ - } \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \ + while (T_REF_VAL_GET(conn) > 1) { \ + transUnrefCliHandle(conn); \ + } \ + if (T_REF_VAL_GET(conn) == 1) { \ + SCliThrdObj* thrd = conn->hostThrd; \ + addConnToPool(thrd->pool, conn); \ + } \ + return; \ + } \ } while (0) -#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ - do { \ - if (thrd->quit) { \ - cliHandleExcept(conn); \ - return; \ - } \ +#define CONN_HANDLE_THREAD_QUIT(thrd) \ + do { \ + if (thrd->quit) { \ + return; \ + } \ } while (0) #define CONN_HANDLE_BROKEN(conn) \ @@ -375,6 +377,9 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { return conn; } static void addConnToPool(void* pool, SCliConn* conn) { + SCliThrdObj* thrd = conn->hostThrd; + CONN_HANDLE_THREAD_QUIT(thrd); + char key[128] = {0}; transCtxDestroy(&conn->ctx); @@ -539,7 +544,6 @@ void cliSend(SCliConn* pConn) { } pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; - pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); @@ -594,12 +598,17 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = pMsg->msg.handle; tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); - transUnrefCliHandle(conn); - taosArrayPush(conn->cliMsgs, &pMsg); - if (taosArrayGetSize(conn->cliMsgs) >= 2) { - return; // send one by one + if (T_REF_VAL_GET(conn) == 2) { + transUnrefCliHandle(conn); + taosArrayPush(conn->cliMsgs, &pMsg); + if (taosArrayGetSize(conn->cliMsgs) >= 2) { + return; // send one by one + } + cliSend(conn); + } else { + // conn already broken down + transUnrefCliHandle(conn); } - cliSend(conn); } SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { @@ -836,11 +845,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p if (index == -1) { index = cliRBChoseIdx(pTransInst); } - int32_t flen = 0; - if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { - // imp later - } - tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port); + STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; @@ -851,9 +856,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p if (ctx != NULL) { pCtx->appCtx = *ctx; } - assert(pTransInst->connType == TAOS_CONN_CLIENT); - // atomic or not SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; @@ -862,6 +865,8 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; + + tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port); transSendAsync(thrd->asyncPool, &(cliMsg->q)); } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index de96883b77d5a8c57fe927d7f27a4ef1e7fc7c3f..3daac3e6f5cbc44250363de1ba2aa98fa9ba1ccb 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -114,10 +114,6 @@ static const char* notify = "a"; return; \ } \ } while (0) -// refactor later -static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen); - -static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); @@ -144,9 +140,9 @@ static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/) static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); -static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd); +static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); -static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease, +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister}; static void uvDestroyConn(uv_handle_t* handle); @@ -165,59 +161,6 @@ void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b transAllocBuffer(pBuf, buf); } -static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) { - STransMsgHead* pHead = (STransMsgHead*)msg; - - int code = 0; - - if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) { - // secured link, or no authentication - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - // tTrace("%s, secured link, no auth is required", pConn->info); - return 0; - } - - if (!rpcIsReq(pHead->msgType)) { - // for response, if code is auth failure, it shall bypass the auth process - code = htonl(pHead->code); - if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || - code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || - code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); - return 0; - } - } - - code = 0; - if (pHead->spi == pConn->spi) { - // authentication - SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest)); - - int32_t delta; - delta = (int32_t)htonl(pDigest->timeStamp); - delta -= (int32_t)taosGetTimestampSec(); - if (abs(delta) > 900) { - tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); - code = TSDB_CODE_RPC_INVALID_TIME_STAMP; - } else { - if (transAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - // tDebug("%s, authentication failed, msg discarded", pConn->info); - code = TSDB_CODE_RPC_AUTH_FAILURE; - } else { - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); - if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client - // tTrace("%s, message is authenticated", pConn->info); - } - } - } else { - tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); - code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED; - } - - return code; -} - // refers specifically to query or insert timeout static void uvHandleActivityTimeout(uv_timer_t* handle) { SSrvConn* conn = handle->data; @@ -225,34 +168,20 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { } static void uvHandleReq(SSrvConn* pConn) { - SRecvInfo info; - SRecvInfo* p = &info; SConnBuffer* pBuf = &pConn->readBuf; - p->msg = pBuf->buf; - p->msgLen = pBuf->len; - p->ip = 0; - p->port = 0; - p->shandle = pConn->pTransInst; // - p->thandle = pConn; - p->chandle = NULL; - - STransMsgHead* pHead = (STransMsgHead*)p->msg; + char* msg = pBuf->buf; + uint32_t msgLen = pBuf->len; + + STransMsgHead* pHead = (STransMsgHead*)msg; if (pHead->secured == 1) { - STransUserMsg* uMsg = (STransUserMsg*)((char*)p->msg + p->msgLen - sizeof(STransUserMsg)); + STransUserMsg* uMsg = (STransUserMsg*)((char*)msg + msgLen - sizeof(STransUserMsg)); memcpy(pConn->user, uMsg->user, tListLen(uMsg->user)); memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret)); } pHead->code = htonl(pHead->code); - - int32_t dlen = 0; - if (transDecompressMsg(NULL, 0, NULL)) { - // add compress later - // pHead = rpcDecompresSTransMsg(pHead); - } else { - pHead->msgLen = htonl(pHead->msgLen); - if (pHead->secured == 1) { - pHead->msgLen -= sizeof(STransUserMsg); - } + pHead->msgLen = htonl(pHead->msgLen); + if (pHead->secured == 1) { + pHead->msgLen -= sizeof(STransUserMsg); } CONN_SHOULD_RELEASE(pConn, pHead); @@ -289,10 +218,9 @@ static void uvHandleReq(SSrvConn* pConn) { transMsg.handle = pConn; } - STrans* pTransInst = (STrans*)p->shandle; + STrans* pTransInst = pConn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); - // auth } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -351,24 +279,26 @@ void uvOnSendCb(uv_write_t* req, int status) { if (msg->type == Release && conn->status != ConnNormal) { conn->status = ConnNormal; transUnrefSrvHandle(conn); - } else if (msg->type == Register && conn->status == ConnAcquire) { - conn->regArg.notifyCount = 0; - conn->regArg.init = 1; - conn->regArg.msg = msg->msg; - if (conn->broken) { - STrans* pTransInst = conn->pTransInst; - (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); - memset(&conn->regArg, 0, sizeof(conn->regArg)); - } - free(msg); - return; } destroySmsg(msg); // send second data, just use for push if (taosArrayGetSize(conn->srvMsgs) > 0) { tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); - uvStartSendRespInternal(msg); + if (msg->type == Register && conn->status == ConnAcquire) { + conn->regArg.notifyCount = 0; + conn->regArg.init = 1; + conn->regArg.msg = msg->msg; + if (conn->broken) { + STrans* pTransInst = conn->pTransInst; + (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + taosArrayRemove(conn->srvMsgs, 0); + free(msg); + } else { + uvStartSendRespInternal(msg); + } } } } else { @@ -387,7 +317,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { - // impl later; tTrace("server conn %p prepare to send resp", smsg->pConn); SSrvConn* pConn = smsg->pConn; @@ -398,21 +327,27 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - pHead->secured = pMsg->code == 0 ? 1 : 0; // - pHead->msgType = smsg->pConn->inType + 1; + // pHead->secured = pMsg->code == 0 ? 1 : 0; // + if (!pConn->secured) { + pConn->secured = pMsg->code == 0 ? 1 : 0; + } + pHead->secured = pConn->secured; + + if (pConn->status == ConnNormal) { + pHead->msgType = pConn->inType + 1; + } else { + pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType; + } pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); - // add more info + char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); - if (transCompressMsg(msg, len, NULL)) { - // impl later - } tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); - pHead->msgLen = htonl(len); + wb->base = msg; wb->len = len; } @@ -438,13 +373,12 @@ static void uvStartSendResp(SSrvMsg* smsg) { transUnrefSrvHandle(pConn); } - if (taosArrayGetSize(pConn->srvMsgs) > 0) { + taosArrayPush(pConn->srvMsgs, &smsg); + if (taosArrayGetSize(pConn->srvMsgs) > 1) { tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); - taosArrayPush(pConn->srvMsgs, &smsg); return; } - taosArrayPush(pConn->srvMsgs, &smsg); uvStartSendRespInternal(smsg); return; } @@ -675,7 +609,7 @@ static SSrvConn* createConn(void* hThrd) { QUEUE_PUSH(&pThrd->conn, &pConn->queue); pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // - tTrace("conn %p created", pConn); + tTrace("server conn %p created", pConn); memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; @@ -697,7 +631,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { } conn->srvMsgs = taosArrayDestroy(conn->srvMsgs); if (clear) { - tTrace("try to destroy conn %p", conn); + tTrace("server conn %p to be destroyed", conn); uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); } @@ -720,25 +654,6 @@ static void uvDestroyConn(uv_handle_t* handle) { uv_stop(thrd->loop); } } -static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) { - STransMsgHead* pHead = (STransMsgHead*)msg; - - if (pConn->spi && pConn->secured == 0) { - // add auth part - pHead->spi = pConn->spi; - STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen); - pDigest->timeStamp = htonl(taosGetTimestampSec()); - msgLen += sizeof(SRpcDigest); - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); - // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); - } else { - pHead->spi = 0; - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - } - - return msgLen; -} void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = calloc(1, sizeof(SServerObj)); @@ -815,20 +730,19 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { } uvStartSendRespInternal(msg); return; - } else if (conn->status == ConnRelease) { - // already release by server app, do nothing - } else if (conn->status == ConnNormal) { - // no nothing - // user should not call this rpcRelease handle; + } else if (conn->status == ConnRelease || conn->status == ConnNormal) { + tDebug("server conn %p already released, ignore release-msg", conn); } - free(msg); + destroySmsg(msg); } -void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) { +void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) { // send msg to client + tDebug("server conn %p start to send resp", msg->pConn); uvStartSendResp(msg); } void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; + tDebug("server conn %p register brokenlink callback", conn); if (conn->status == ConnAcquire) { if (taosArrayGetSize(conn->srvMsgs) > 0) { taosArrayPush(conn->srvMsgs, &msg); @@ -901,12 +815,10 @@ void transUnrefSrvHandle(void* handle) { return; } int ref = T_REF_DEC((SSrvConn*)handle); - tDebug("handle %p ref count: %d", handle, ref); - + tDebug("server conn %p ref count: %d", handle, ref); if (ref == 0) { destroyConn((SSrvConn*)handle, true); } - // unref srv handle } void transReleaseSrvHandle(void* handle) { @@ -951,7 +863,7 @@ void transRegisterMsg(const STransMsg* msg) { srvMsg->pConn = pConn; srvMsg->msg = *msg; srvMsg->type = Register; - tTrace("server conn %p start to send resp", pConn); + tTrace("server conn %p start to register brokenlink callback", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); } int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) { diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 490437ac0db8c9ed148629e56b08b9d384436909..0b1b1834dfb0fae47e666f7806705a6658816b8e 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -35,6 +35,8 @@ int port = 7000; typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); // client process; static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); @@ -167,6 +169,35 @@ static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { rpcSendResponse(&rpcMsg); } } +static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = rpcMallocCont(100); + rpcMsg.contLen = 100; + rpcMsg.handle = pMsg->handle; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); + + rpcReleaseHandle(pMsg->handle, TAOS_CONN_SERVER); +} +static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + void *handle = pMsg->handle; + { + SRpcMsg rpcMsg1 = {0}; + rpcMsg1.pCont = rpcMallocCont(100); + rpcMsg1.contLen = 100; + rpcMsg1.handle = handle; + rpcMsg1.code = 0; + rpcRegisterBrokenLinkArg(&rpcMsg1); + } + taosMsleep(10); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = rpcMallocCont(100); + rpcMsg.contLen = 100; + rpcMsg.handle = pMsg->handle; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); +} // client process; static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { Client *client = (Client *)parent; @@ -225,7 +256,7 @@ class TransObj { srv->SetSrvContinueSend(cfp); } void RestartSrv() { srv->Restart(); } - void cliStop() { + void StopCli() { /////// cli->Stop(); } @@ -329,32 +360,35 @@ TEST_F(TransEnv, cliPersistHandle) { ////////////////// } -TEST_F(TransEnv, cliReleaseHandle) { +TEST_F(TransEnv, srvReleaseHandle) { SRpcMsg resp = {0}; - for (int i = 0; i < 10; i++) { + tr->SetSrvContinueSend(processReleaseHandleCb); + // tr->Restart(processReleaseHandleCb); + void *handle = NULL; + for (int i = 0; i < 1; i++) { SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; - tr->cliSendAndRecvNoHandle(&req, &resp); + tr->cliSendAndRecv(&req, &resp); + // tr->cliSendAndRecvNoHandle(&req, &resp); EXPECT_TRUE(resp.code == 0); - //} } ////////////////// } TEST_F(TransEnv, cliReleaseHandleExcept) { SRpcMsg resp = {0}; - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 3; i++) { SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; - tr->cliSendAndRecvNoHandle(&req, &resp); - if (i == 5) { + tr->cliSendAndRecv(&req, &resp); + if (i == 1) { std::cout << "stop server" << std::endl; tr->StopSrv(); } - if (i >= 6) { + if (i > 1) { EXPECT_TRUE(resp.code != 0); } } @@ -383,7 +417,7 @@ TEST_F(TransEnv, srvPersistHandleExcept) { req.contLen = 10; tr->cliSendAndRecv(&req, &resp); if (i > 2) { - tr->cliStop(); + tr->StopCli(); break; } } @@ -413,7 +447,23 @@ TEST_F(TransEnv, cliPersistHandleExcept) { TEST_F(TransEnv, multiCliPersistHandleExcept) { // conn broken } -TEST_F(TransEnv, queryExcept) {} +TEST_F(TransEnv, queryExcept) { + tr->SetSrvContinueSend(processRegisterFailure); + SRpcMsg resp = {0}; + for (int i = 0; i < 5; i++) { + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + if (i == 2) { + rpcReleaseHandle(resp.handle, TAOS_CONN_CLIENT); + tr->StopCli(); + break; + } + } + taosMsleep(4 * 1000); +} TEST_F(TransEnv, noResp) { SRpcMsg resp = {0}; for (int i = 0; i < 5; i++) {