未验证 提交 d8f72ec0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18611 from taosdata/enh/refatorRetry

enh: refactor retry code
......@@ -151,8 +151,8 @@ typedef struct {
int64_t retryNextInterval;
bool retryInit;
int32_t retryStep;
int8_t epsetRetryCnt;
int8_t epsetRetryCnt;
int32_t retryCode;
int hThrdIdx;
} STransConnCtx;
......
......@@ -1020,7 +1020,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
tDebug("current epset %s", tbuf);
if (!EPSET_IS_VALID(&pCtx->epSet)) {
tError("invalid epset");
......@@ -1500,34 +1499,46 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pCtx->retryNextInterval = pCtx->retryMinInterval;
pCtx->retryStep = 0;
pCtx->retryInit = true;
pCtx->retryCode = TSDB_CODE_SUCCESS;
}
if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
return false;
}
// code, msgType
// A: epset, leader, not self
// B: epset, not know leader
// C: no epset, leader but not serivce
bool noDelay = false;
if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false);
transFreeMsg(pResp->pCont);
transUnrefCliHandle(pConn);
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) {
tDebug("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont);
addConnToPool(pThrd->pool, pConn);
} else if (code == TSDB_CODE_SYN_RESTORING) {
tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false);
addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont);
} else {
tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false);
addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont);
}
if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
// save one internal code
pCtx->retryCode = code;
}
if (noDelay == false) {
pCtx->epsetRetryCnt = 1;
......@@ -1556,29 +1567,36 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STrans* pTransInst = pThrd->pTransInst;
if (pMsg == NULL || pMsg->ctx == NULL) {
tDebug("%s conn %p handle resp", pTransInst->label, pConn);
tTrace("%s conn %p handle resp", pTransInst->label, pConn);
pTransInst->cfp(pTransInst->parent, pResp, NULL);
return 0;
}
STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
bool retry = cliGenRetryRule(pConn, pResp, pMsg);
if (retry == true) {
return -1;
}
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
int32_t code = pResp->code;
// return internal code app
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
pResp->code = pCtx->retryCode;
}
}
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) {
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
tGDebug("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
if (pCtx->pSem != NULL) {
tGDebug("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pRsp == NULL) {
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
} else {
......@@ -1587,11 +1605,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tsem_post(pCtx->pSem);
pCtx->pRsp = NULL;
} else {
tGDebug("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (retry == false && hasEpSet == true) {
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
} else {
if (!cliIsEpsetUpdated(code, pCtx)) {
if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
pTransInst->cfp(pTransInst->parent, pResp, NULL);
} else {
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册