提交 62bf1c02 编写于 作者: dengyihao's avatar dengyihao

refactor retry

上级 ba23ed23
......@@ -88,7 +88,7 @@ typedef struct SRpcInit {
int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval
int32_t retryMaxTimouet;
int64_t retryMaxTimouet;
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not
......
......@@ -148,6 +148,10 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.dfp = destroyAhandle;
rpcInit.retryLimit = tsRpcRetryLimit;
rpcInit.retryInterval = tsRpcRetryInterval;
rpcInit.retryMinInterval = 100;
rpcInit.retryStepFactor = 5;
rpcInit.retryMaxInterval = 10240;
rpcInit.retryMaxTimouet = -1;
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
......
......@@ -137,9 +137,6 @@ typedef struct {
tmsg_t msgType; // message type
int8_t connType; // connection type cli/srv
int8_t retryCnt;
int8_t retryLimit;
STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
......@@ -149,12 +146,14 @@ typedef struct {
int32_t retryMinInterval;
int32_t retryMaxInterval;
int32_t retryStepFactor;
int32_t retryMaxTimeout;
int64_t retryMaxTimeout;
int64_t retryInitTimestamp;
int64_t retryNextInterval;
bool retryInit;
int32_t retryStep;
int8_t epsetRetryCnt;
int hThrdIdx;
} STransConnCtx;
......
......@@ -959,7 +959,7 @@ FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
if (code != 0) return false;
if (pCtx->retryCnt == 0) return false;
// if (pCtx->retryCnt == 0) return false;
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
return true;
}
......@@ -1365,8 +1365,8 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
tGDebug("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf,
pCtx->retryCnt + 1, pCtx->retryLimit);
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
pCtx->retryStep, pCtx->retryNextInterval);
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
......@@ -1406,82 +1406,103 @@ FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
*dst = epset;
return true;
}
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp) {
bool noDelay = true;
if (pResp->contLen == 0) {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
noDelay = false;
} else {
EPSET_FORWARD_INUSE(&pCtx->epSet);
}
} else {
SEpSet epset;
if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
// invalid epset
EPSET_FORWARD_INUSE(&pCtx->epSet);
} else if (!transEpSetIsEqual(&pCtx->epSet, &epset)) {
noDelay = false;
} else {
EPSET_FORWARD_INUSE(&pCtx->epSet);
}
}
return noDelay;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (pMsg == NULL || pMsg->ctx == NULL) {
tDebug("%s conn %p handle resp", pTransInst->label, pConn);
pTransInst->cfp(pTransInst->parent, pResp, NULL);
return 0;
}
STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
if (retry == true) {
if (!pCtx->retryInit) {
pCtx->retryMinInterval = pTransInst->retryMinInterval;
pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
pCtx->retryStepFactor = pTransInst->retryStepFactor;
pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
pCtx->retryInit = true;
pCtx->retryStep = 1;
pCtx->retryInitTimestamp = taosGetTimestampMs();
pCtx->retryNextInterval = pCtx->retryMinInterval;
} else {
bool retry = pTransInst->retry(code, pResp->msgType - 1);
if (retry == false) {
return false;
}
bool noDelay = false;
if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
noDelay = cliResetEpset(pCtx, pResp);
transFreeMsg(pResp->pCont);
transUnrefCliHandle(pConn);
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR) {
noDelay = cliResetEpset(pCtx, pResp);
transFreeMsg(pResp->pCont);
addConnToPool(pThrd->pool, pConn);
} else {
noDelay = cliResetEpset(pCtx, pResp);
addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont);
}
if (!pCtx->retryInit) {
pCtx->retryMinInterval = pTransInst->retryMinInterval;
pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
pCtx->retryStepFactor = pTransInst->retryStepFactor;
pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
pCtx->retryInitTimestamp = taosGetTimestampMs();
pCtx->retryNextInterval = pCtx->retryMinInterval;
pCtx->retryStep = 1;
pCtx->retryInit = true;
} else {
if (noDelay == false) {
pCtx->epsetRetryCnt = 0;
pCtx->retryStep++;
int64_t factor = 1;
for (int i = 0; i < pCtx->retryStep - 1; i++) {
factor *= pCtx->retryStepFactor;
}
int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1);
pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
pCtx->retryNextInterval = pCtx->retryMaxInterval;
}
}
if (taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
retry = false;
if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
return false;
}
} else {
pCtx->retryNextInterval = 0;
pCtx->epsetRetryCnt++;
}
}
if (retry) {
pMsg->sent = 0;
pCtx->retryCnt += 1;
cliSchedMsgToNextNode(pMsg, pThrd);
return false;
}
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, EPSET_GET_SIZE(&pCtx->epSet) * 3);
// if (pCtx->retryCnt < pCtx->retryLimit) {
transUnrefCliHandle(pConn);
EPSET_FORWARD_INUSE(&pCtx->epSet);
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd);
return -1;
//}
} else {
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, pTransInst->retryLimit);
if (pCtx->retryCnt < pCtx->retryLimit) {
if (pResp->contLen == 0) {
EPSET_FORWARD_INUSE(&pCtx->epSet);
} else {
if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet) < 0) {
tError("%s conn %p failed to deserialize epset", CONN_GET_INST_LABEL(pConn), pConn);
}
}
addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd);
return -1;
} else {
// change error code for taos client driver if retryCnt exceeds limit
if (0 == strncmp(pTransInst->label, "TSC", strlen("TSC"))) pResp->code = TSDB_CODE_APP_NOT_READY;
}
}
if (pMsg == NULL || pMsg->ctx == NULL) {
tDebug("%s conn %p handle resp", pTransInst->label, pConn);
pTransInst->cfp(pTransInst->parent, pResp, NULL);
return 0;
}
STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
bool retry = cliGenRetryRule(pConn, pResp, pMsg);
if (retry == true) {
return -1;
}
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册