From 3de71e39b421ff126a47be7cffe685a490de4a20 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Feb 2023 13:27:36 +0800 Subject: [PATCH] opt code --- source/libs/transport/src/transCli.c | 34 ++++++++++++++++++---------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8309a41abd..473467d753 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -164,6 +164,8 @@ static void cliSend(SCliConn* pConn); static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); +static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port); + // cli util func static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); @@ -1000,7 +1002,14 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { SCliBatch* pNewBatch = cliDumpBatch(pBatch); - SCliConn* conn = getConnFromPool(pThrd->pool, pBatch->ip, pBatch->port); + SCliConn* conn = getConnFromPool(pThrd->pool, pNewBatch->ip, pNewBatch->port); + + if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pNewBatch->ip, pNewBatch->port)) { + tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pNewBatch->wLen, + pNewBatch->batchSize); + cliDestroyBatch(pNewBatch); + return; + } if (conn == NULL) { conn = cliCreateConn(pThrd); conn->pBatch = pNewBatch; @@ -1064,16 +1073,17 @@ static void cliSendBatchCb(uv_write_t* req, int status) { SCliConn* conn = req->data; taosMemoryFree(req); - tDebug("%p conn %p send batch msg out, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, conn->pBatch->wLen, - conn->pBatch->batchSize); - SCliThrd* thrd = conn->hostThrd; cliDestroyBatch(conn->pBatch); conn->pBatch = NULL; if (status != 0) { + tDebug("%p conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, + conn->pBatch->wLen, conn->pBatch->batchSize, uv_err_name(status)); cliHandleExcept(conn); } else { + tDebug("%p conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, + conn->pBatch->wLen, conn->pBatch->batchSize); addConnToPool(thrd->pool, conn); } } @@ -1282,12 +1292,12 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { return; } -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, SCliMsg* pMsg) { +static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) { STrans* pTransInst = pThrd->pTransInst; - STransConnCtx* pCtx = pMsg->ctx; - char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + // STransConnCtx* pCtx = pMsg->ctx; + // char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + // int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); @@ -1306,6 +1316,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); STraceId* trace = &pMsg->msg.info.traceId; + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); if (!EPSET_IS_VALID(&pCtx->epSet)) { tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); @@ -1314,9 +1326,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { } if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - char key[TSDB_FQDN_LEN + 64] = {0}; + char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); @@ -1344,7 +1354,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } - if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, pMsg)) { + if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, ip, port)) { tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType), tstrerror(TSDB_CODE_RPC_MAX_SESSIONS)); destroyCmsg(pMsg); -- GitLab