diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 3083e70574379d2316b4cdd72c42a9eb13369e36..d761813db1b5cf10058ada050e2647c184dd5e1c 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -88,7 +88,7 @@ typedef struct SRpcInit { int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor int32_t retryMaxInterval; // retry max interval - int32_t retryMaxTimouet; + int64_t retryMaxTimouet; int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 3f4e1bb513ef6ef4db76710aa388df6de23ec4d1..48ecb5caac3acf8be124702336a22ed8d405ada1 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -148,6 +148,10 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.dfp = destroyAhandle; rpcInit.retryLimit = tsRpcRetryLimit; rpcInit.retryInterval = tsRpcRetryInterval; + rpcInit.retryMinInterval = 100; + rpcInit.retryStepFactor = 5; + rpcInit.retryMaxInterval = 10240; + rpcInit.retryMaxTimouet = -1; void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 1102ddd259f9bc7cf7ca645c614c079f2bb064ee..9add91cdebae1a3ab8719b84e6e4e5f23d50fdcb 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -137,9 +137,6 @@ typedef struct { tmsg_t msgType; // message type int8_t connType; // connection type cli/srv - int8_t retryCnt; - int8_t retryLimit; - STransCtx appCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API @@ -149,12 +146,14 @@ typedef struct { int32_t retryMinInterval; int32_t retryMaxInterval; int32_t retryStepFactor; - int32_t retryMaxTimeout; + int64_t retryMaxTimeout; int64_t retryInitTimestamp; int64_t retryNextInterval; bool retryInit; int32_t retryStep; + int8_t epsetRetryCnt; + int hThrdIdx; } STransConnCtx; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 75816116549257947c74e1ab5071a019acfd7b23..7ffddda77771b2d6f27f34ccb6b5e4251378c580 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -959,7 +959,7 @@ FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { if (code != 0) return false; - if (pCtx->retryCnt == 0) return false; + // if (pCtx->retryCnt == 0) return false; if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false; return true; } @@ -1365,8 +1365,8 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STraceId* trace = &pMsg->msg.info.traceId; char tbuf[256] = {0}; EPSET_DEBUG_STR(&pCtx->epSet, tbuf); - tGDebug("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf, - pCtx->retryCnt + 1, pCtx->retryLimit); + tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, + pCtx->retryStep, pCtx->retryNextInterval); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg; @@ -1406,82 +1406,103 @@ FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { *dst = epset; return true; } -int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { +bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp) { + bool noDelay = true; + if (pResp->contLen == 0) { + if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { + noDelay = false; + } else { + EPSET_FORWARD_INUSE(&pCtx->epSet); + } + } else { + SEpSet epset; + if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) { + // invalid epset + EPSET_FORWARD_INUSE(&pCtx->epSet); + } else if (!transEpSetIsEqual(&pCtx->epSet, &epset)) { + noDelay = false; + } else { + EPSET_FORWARD_INUSE(&pCtx->epSet); + } + } + return noDelay; +} +bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - if (pMsg == NULL || pMsg->ctx == NULL) { - tDebug("%s conn %p handle resp", pTransInst->label, pConn); - pTransInst->cfp(pTransInst->parent, pResp, NULL); - return 0; - } - STransConnCtx* pCtx = pMsg->ctx; int32_t code = pResp->code; - bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false; - if (retry == true) { - if (!pCtx->retryInit) { - pCtx->retryMinInterval = pTransInst->retryMinInterval; - pCtx->retryMaxInterval = pTransInst->retryMaxInterval; - pCtx->retryStepFactor = pTransInst->retryStepFactor; - pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet; - pCtx->retryInit = true; - pCtx->retryStep = 1; - pCtx->retryInitTimestamp = taosGetTimestampMs(); - pCtx->retryNextInterval = pCtx->retryMinInterval; - } else { + bool retry = pTransInst->retry(code, pResp->msgType - 1); + if (retry == false) { + return false; + } + + bool noDelay = false; + if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + noDelay = cliResetEpset(pCtx, pResp); + transFreeMsg(pResp->pCont); + transUnrefCliHandle(pConn); + } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR) { + noDelay = cliResetEpset(pCtx, pResp); + transFreeMsg(pResp->pCont); + addConnToPool(pThrd->pool, pConn); + } else { + noDelay = cliResetEpset(pCtx, pResp); + addConnToPool(pThrd->pool, pConn); + transFreeMsg(pResp->pCont); + } + + if (!pCtx->retryInit) { + pCtx->retryMinInterval = pTransInst->retryMinInterval; + pCtx->retryMaxInterval = pTransInst->retryMaxInterval; + pCtx->retryStepFactor = pTransInst->retryStepFactor; + pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet; + pCtx->retryInitTimestamp = taosGetTimestampMs(); + pCtx->retryNextInterval = pCtx->retryMinInterval; + pCtx->retryStep = 1; + pCtx->retryInit = true; + } else { + if (noDelay == false) { + pCtx->epsetRetryCnt = 0; pCtx->retryStep++; - int64_t factor = 1; - for (int i = 0; i < pCtx->retryStep - 1; i++) { - factor *= pCtx->retryStepFactor; - } + int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1); pCtx->retryNextInterval = factor * pCtx->retryMinInterval; if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) { pCtx->retryNextInterval = pCtx->retryMaxInterval; } - } - if (taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { - retry = false; + if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { + return false; + } + } else { + pCtx->retryNextInterval = 0; + pCtx->epsetRetryCnt++; } } - if (retry) { - pMsg->sent = 0; - pCtx->retryCnt += 1; + cliSchedMsgToNextNode(pMsg, pThrd); + return false; +} +int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; - if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) { - cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, EPSET_GET_SIZE(&pCtx->epSet) * 3); - // if (pCtx->retryCnt < pCtx->retryLimit) { - transUnrefCliHandle(pConn); - EPSET_FORWARD_INUSE(&pCtx->epSet); - transFreeMsg(pResp->pCont); - cliSchedMsgToNextNode(pMsg, pThrd); - return -1; - //} - } else { - cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, pTransInst->retryLimit); - if (pCtx->retryCnt < pCtx->retryLimit) { - if (pResp->contLen == 0) { - EPSET_FORWARD_INUSE(&pCtx->epSet); - } else { - if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet) < 0) { - tError("%s conn %p failed to deserialize epset", CONN_GET_INST_LABEL(pConn), pConn); - } - } - addConnToPool(pThrd->pool, pConn); - transFreeMsg(pResp->pCont); - cliSchedMsgToNextNode(pMsg, pThrd); - return -1; - } else { - // change error code for taos client driver if retryCnt exceeds limit - if (0 == strncmp(pTransInst->label, "TSC", strlen("TSC"))) pResp->code = TSDB_CODE_APP_NOT_READY; - } - } + if (pMsg == NULL || pMsg->ctx == NULL) { + tDebug("%s conn %p handle resp", pTransInst->label, pConn); + pTransInst->cfp(pTransInst->parent, pResp, NULL); + return 0; } + STransConnCtx* pCtx = pMsg->ctx; + int32_t code = pResp->code; + + bool retry = cliGenRetryRule(pConn, pResp, pMsg); + if (retry == true) { + return -1; + } STraceId* trace = &pResp->info.traceId; bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);