diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 86533fddbe8f6ce19f68c25cf455450a3cd193fe..126b6cb91f5caa9229ce826132c2c24b81b3e596 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -173,6 +173,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) +#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) static void* cliWorkThread(void* arg); @@ -509,7 +510,10 @@ void cliSend(SCliConn* pConn) { STrans* pTransInst = pThrd->pTransInst; STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); - + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); int msgLen = transMsgLenFromCont(pMsg->contLen); @@ -537,6 +541,7 @@ void cliSend(SCliConn* pConn) { pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, @@ -546,6 +551,7 @@ void cliSend(SCliConn* pConn) { if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); } + pConn->writeReq.data = pConn; uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); @@ -586,22 +592,13 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { } static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = pMsg->msg.handle; - tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn); - - while (taosArrayGetSize(conn->cliMsgs) > 0) { - SCliMsg* pMsg = taosArrayGetP(conn->cliMsgs, 0); - destroyCmsg(pMsg); - taosArrayRemove(conn->cliMsgs, 0); - } + tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); - transDestroyBuffer(&conn->readBuf); - conn->status = ConnRelease; - int ref = T_REF_VAL_GET(conn); - if (ref == 2) { - transUnrefCliHandle(conn); - } else if (ref == 1) { - addConnToPool(pThrd->pool, conn); + taosArrayPush(conn->cliMsgs, &pMsg); + if (taosArrayGetSize(conn->cliMsgs) >= 2) { + return; // send one by one } + cliSend(conn); } SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 79d1ab85a828ef1d64611aa0e57851ef851a1228..108b12542c1b89f5672f60f5242d8f98cd6baf70 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -93,6 +93,15 @@ typedef struct SServerObj { static const char* notify = "a"; +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + goto _RETURE; \ + } \ + } while (0) // refactor later static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen); @@ -233,6 +242,7 @@ static void uvHandleReq(SSrvConn* pConn) { pHead->msgLen -= sizeof(STransUserMsg); } } + CONN_SHOULD_RELEASE(pConn, pHead); STransMsg transMsg; transMsg.contLen = transContLenFromMsg(pHead->msgLen); @@ -257,8 +267,8 @@ static void uvHandleReq(SSrvConn* pConn) { ntohs(pConn->locaddr.sin_port), transMsg.contLen); } else { tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, - TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); + TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); // no ref here } @@ -270,6 +280,8 @@ static void uvHandleReq(SSrvConn* pConn) { (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth +_RETURE: + return; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -350,7 +362,7 @@ void uvOnSendCb(uv_write_t* req, int status) { } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); - conn->broken = false; + conn->broken = true; transUnrefSrvHandle(conn); } } @@ -407,6 +419,7 @@ static void uvStartSendResp(SSrvMsg* smsg) { SSrvConn* pConn = smsg->pConn; if (pConn->broken == true) { + // persist by transUnrefSrvHandle(pConn); return; } @@ -415,8 +428,8 @@ static void uvStartSendResp(SSrvMsg* smsg) { } if (taosArrayGetSize(pConn->srvMsgs) > 0) { - tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); taosArrayPush(pConn->srvMsgs, &smsg); return; }