diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a41cc0068c3e906ffba65f4bad0feb8847665a55..5ff67c87ca92e85869b885f65c69ab9bc43ea1c2 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -22,6 +22,7 @@ extern "C" { #include "os.h" #include "taoserror.h" #include "theap.h" +#include "tmisce.h" #include "transLog.h" #include "transportInt.h" #include "trpc.h" diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5d6751a260ba5ed0fe1882e4e1a0f0c3c6f8cd22..38189f90dbb3e3da63a5ea25ed6c2546390de6f7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -11,7 +11,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - #include "transComm.h" typedef struct SConnList { @@ -224,9 +223,13 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } while (0); // snprintf may cause performance problem -#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ - do { \ - snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ +#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ + do { \ + char* p = key; \ + int32_t len = strlen(ip); \ + if (p != NULL) memcpy(p, ip, len); \ + p[len] = ':'; \ + titoa(port, 10, &p[len + 1]); \ } while (0) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) @@ -664,7 +667,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; - tDebug("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn); + tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn); transAllocBuffer(pBuf, buf); } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { @@ -677,7 +680,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { if (nread > 0) { pBuf->len += nread; while (transReadComplete(pBuf)) { - tDebug("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); + tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); if (pBuf->invalid) { cliHandleExcept(conn); break; @@ -1949,11 +1952,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; - STraceId* trace = &pMsg->msg.info.traceId; - char tbuf[256] = {0}; - EPSET_DEBUG_STR(&pCtx->epSet, tbuf); - tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, - pCtx->retryStep, pCtx->retryNextInterval); + if (rpcDebugFlag & DEBUG_DEBUG) { + STraceId* trace = &pMsg->msg.info.traceId; + char tbuf[256] = {0}; + EPSET_DEBUG_STR(&pCtx->epSet, tbuf); + tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, + pCtx->retryStep, pCtx->retryNextInterval); + } STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg; @@ -1990,7 +1995,7 @@ FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { pResp->pCont = buf; pResp->contLen = len; - *dst = epset; + epsetAssign(dst, &epset); return true; } bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) { @@ -2015,7 +2020,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) { } else { if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) { tDebug("epset not equal, retry new epset"); - pCtx->epSet = epSet; + epsetAssign(&pCtx->epSet, &epSet); noDelay = false; } else { if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { @@ -2040,7 +2045,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) { } else { if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) { tDebug("epset not equal, retry new epset"); - pCtx->epSet = epSet; + epsetAssign(&pCtx->epSet, &epSet); noDelay = false; } else { if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { @@ -2130,10 +2135,6 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) { pCtx->retryNextInterval = pCtx->retryMaxInterval; } - - // if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { - // return false; - // } } else { pCtx->retryNextInterval = 0; pCtx->epsetRetryCnt++; @@ -2181,9 +2182,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STraceId* trace = &pResp->info.traceId; bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); if (hasEpSet) { - char tbuf[256] = {0}; - EPSET_DEBUG_STR(&pCtx->epSet, tbuf); - tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); + if (rpcDebugFlag & DEBUG_TRACE) { + char tbuf[256] = {0}; + EPSET_DEBUG_STR(&pCtx->epSet, tbuf); + tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); + } } if (pCtx->pSem != NULL) { @@ -2310,8 +2313,9 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); - pCtx->epSet = *pEpSet; - pCtx->origEpSet = *pEpSet; + epsetAssign(&pCtx->epSet, pEpSet); + epsetAssign(&pCtx->origEpSet, pEpSet); + pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; @@ -2356,8 +2360,8 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); - pCtx->epSet = *pEpSet; - pCtx->origEpSet = *pEpSet; + epsetAssign(&pCtx->epSet, pEpSet); + epsetAssign(&pCtx->origEpSet, pEpSet); pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; pCtx->pSem = sem;