diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 5795cdd91921fd4ad8c21dfda492b65282999df4..3e420365676f7b0dd4bd00f75c1782cc473ca3f8 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -43,6 +43,7 @@ typedef struct SRpcMsg { int32_t code; void * handle; // rpc handle returned to app void * ahandle; // app handle set by client + int noResp; // has response or not(default 0 indicate resp); } SRpcMsg; typedef struct SRpcInit { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2c5c4810af746e4569df9eec14769359d4515d18..cf1fad083566eb21e5ec62c8a78918adf5db9405 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -158,9 +158,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } while (0) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) +#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) + static void* cliWorkThread(void* arg); -bool cliMayContinueSendMsg(SCliConn* conn) { +bool cliMaySendCachedMsg(SCliConn* conn) { if (taosArrayGetSize(conn->cliMsgs) > 0) { cliSend(conn); return true; @@ -226,7 +228,7 @@ void cliHandleResp(SCliConn* conn) { } destroyCmsg(pMsg); - if (cliMayContinueSendMsg(conn) == true) { + if (cliMaySendCachedMsg(conn) == true) { return; } @@ -441,7 +443,24 @@ static void cliDestroy(uv_handle_t* handle) { tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); } - +static bool cliHandleNoResp(SCliConn* conn) { + bool res = false; + SArray* msgs = conn->cliMsgs; + if (taosArrayGetSize(msgs) > 0) { + SCliMsg* pMsg = taosArrayGetP(msgs, 0); + if (REQUEST_NO_RESP(&pMsg->msg)) { + taosArrayRemove(msgs, 0); + destroyCmsg(pMsg); + res = true; + } + if (res == true) { + if (cliMaySendCachedMsg(conn) == false) { + addConnToPool(conn->hostThrd, conn); + } + } + } + return res; +} static void cliSendCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; @@ -452,6 +471,10 @@ static void cliSendCb(uv_write_t* req, int status) { cliHandleExcept(pConn); return; } + if (cliHandleNoResp(pConn) == true) { + tTrace("%s cli conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn); + return; + } uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } @@ -489,6 +512,7 @@ void cliSend(SCliConn* pConn) { msgLen += sizeof(STransUserMsg); } + pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0; pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index ec42ab6402e45d4c0381a249b34cfd4a374be830..2efdb109aae693ddd074de8a02de4d8477e1ca6a 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -226,15 +226,22 @@ static void uvHandleReq(SSrvConn* pConn) { transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; transMsg.ahandle = NULL; - transMsg.handle = pConn; + transMsg.handle = NULL; transClearBuffer(&pConn->readBuf); pConn->inType = pHead->msgType; - transRefSrvHandle(pConn); - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), - inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port), transMsg.contLen); + if (pHead->resflag == 0) { + transRefSrvHandle(pConn); + transMsg.handle = pConn; + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), + inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port), transMsg.contLen); + } else { + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn, + TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); + } STrans* pTransInst = (STrans*)p->shandle; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index bdbfb4a0ae9209c80c2896766fc8b0d3765d2088..ec89d695a2bac264b4ee1f844c3baa7752c046f5 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -361,7 +361,7 @@ TEST_F(TransEnv, cliPersistHandle) { tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle}; + SRpcMsg req = {.handle = resp.handle, .noResp = 0}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -448,6 +448,25 @@ TEST_F(TransEnv, srvPersistHandleExcept) { // conn broken // } +TEST_F(TransEnv, cliPersistHandleExcept) { + tr->SetSrvContinueSend(processContinueSend); + tr->SetCliPersistFp(cliPersistHandle); + SRpcMsg resp = {0}; + for (int i = 0; i < 5; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + if (i > 2) { + tr->StopSrv(); + break; + } + } + taosMsleep(2000); + // conn broken + // +} TEST_F(TransEnv, multiCliPersistHandleExcept) { // conn broken @@ -458,5 +477,15 @@ TEST_F(TransEnv, queryExcept) { // query and conn is broken } TEST_F(TransEnv, noResp) { + SRpcMsg resp = {0}; + for (int i = 0; i < 5; i++) { + SRpcMsg req = {.noResp = 1}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + } + taosMsleep(2000); + // no resp }