diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2bf49fa006e4530088f411ff817e105010888aee..52d9b6f810d8e01f750160a6e212faa0828a38de 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -200,6 +200,7 @@ const char* syncGetMyRoleStr(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); +void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); bool syncEnvIsStart(); const char* syncStr(ESyncState state); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 32090b774ecced3b0cb3b6e4f98d45ff72e93c69..4c561809799492ef2ae047073007b927d40f8c12 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -144,11 +144,15 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { vnodeAccumBlockMsg(pVnode, pMsg->msgType); } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { SEpSet newEpSet = {0}; + syncGetRetryEpSet(pVnode->sync, &newEpSet); + + /* syncGetEpSet(pVnode->sync, &newEpSet); SEp *pEp = &newEpSet.eps[newEpSet.inUse]; if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) { newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; } + */ vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps, newEpSet.inUse); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 041afdb3371ebe73b0064687ece190485e741a30..c5af72c971d6310077bb091512b6b324b0878c98 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -489,12 +489,30 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; (pEpSet->numOfEps)++; - - sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + sInfo("vgId:%d sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); } pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; + sInfo("vgId:%d sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); +} - sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse); +void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + memset(pEpSet, 0, sizeof(*pEpSet)); + return; + } + ASSERT(rid == pSyncNode->rid); + pEpSet->numOfEps = 0; + for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); + pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; + (pEpSet->numOfEps)++; + sInfo("vgId:%d sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + } + pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps; + sInfo("vgId:%d sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); taosReleaseRef(tsNodeRefId, pSyncNode->rid); }