diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 479c1a5af7587b70cb9e1f8943d1c0af6790c137..bf9a6c005103c76a6e8201d4833870527d107f36 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -151,8 +151,8 @@ typedef struct { int64_t retryNextInterval; bool retryInit; int32_t retryStep; - - int8_t epsetRetryCnt; + int8_t epsetRetryCnt; + int32_t retryCode; int hThrdIdx; } STransConnCtx; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fbcc1fb525c5cc02410bd3f88a8530aa820bd01b..e92b44f8c229f01d9214b924928de4311b123d3e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1020,7 +1020,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { char tbuf[256] = {0}; EPSET_DEBUG_STR(&pCtx->epSet, tbuf); - tDebug("current epset %s", tbuf); if (!EPSET_IS_VALID(&pCtx->epSet)) { tError("invalid epset"); @@ -1500,34 +1499,46 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pCtx->retryNextInterval = pCtx->retryMinInterval; pCtx->retryStep = 0; pCtx->retryInit = true; + pCtx->retryCode = TSDB_CODE_SUCCESS; } + if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { return false; } + // code, msgType + + // A: epset, leader, not self + // B: epset, not know leader + // C: no epset, leader but not serivce + bool noDelay = false; if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); + tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, false); transFreeMsg(pResp->pCont); transUnrefCliHandle(pConn); } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) { - tDebug("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); + tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, true); transFreeMsg(pResp->pCont); addConnToPool(pThrd->pool, pConn); } else if (code == TSDB_CODE_SYN_RESTORING) { - tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); + tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, false); addConnToPool(pThrd->pool, pConn); transFreeMsg(pResp->pCont); } else { - tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); + tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, false); addConnToPool(pThrd->pool, pConn); transFreeMsg(pResp->pCont); } + if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) { + // save one internal code + pCtx->retryCode = code; + } if (noDelay == false) { pCtx->epsetRetryCnt = 1; @@ -1556,29 +1567,36 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STrans* pTransInst = pThrd->pTransInst; if (pMsg == NULL || pMsg->ctx == NULL) { - tDebug("%s conn %p handle resp", pTransInst->label, pConn); + tTrace("%s conn %p handle resp", pTransInst->label, pConn); pTransInst->cfp(pTransInst->parent, pResp, NULL); return 0; } STransConnCtx* pCtx = pMsg->ctx; - int32_t code = pResp->code; bool retry = cliGenRetryRule(pConn, pResp, pMsg); if (retry == true) { return -1; } - STraceId* trace = &pResp->info.traceId; - bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); + if (pCtx->retryCode != TSDB_CODE_SUCCESS) { + int32_t code = pResp->code; + // return internal code app + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) { + pResp->code = pCtx->retryCode; + } + } + + STraceId* trace = &pResp->info.traceId; + bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); if (hasEpSet) { char tbuf[256] = {0}; EPSET_DEBUG_STR(&pCtx->epSet, tbuf); - tGDebug("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); + tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } if (pCtx->pSem != NULL) { - tGDebug("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); + tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pRsp == NULL) { tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn); } else { @@ -1587,11 +1605,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tsem_post(pCtx->pSem); pCtx->pRsp = NULL; } else { - tGDebug("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); + tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (retry == false && hasEpSet == true) { pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet); } else { - if (!cliIsEpsetUpdated(code, pCtx)) { + if (!cliIsEpsetUpdated(pResp->code, pCtx)) { pTransInst->cfp(pTransInst->parent, pResp, NULL); } else { pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);