diff --git a/source/dnode/mgmt/implement/src/dmTransport.c b/source/dnode/mgmt/implement/src/dmTransport.c index 1dabc83f57738cb7b32c86ac70fdd5657061eec6..446894556e5a938564e9fea73e395bd7736e5060 100644 --- a/source/dnode/mgmt/implement/src/dmTransport.c +++ b/source/dnode/mgmt/implement/src/dmTransport.c @@ -320,6 +320,27 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { } } +static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { + ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT); + ASSERT(pRsp->pCont == NULL); + if (pWrapper->procType != DND_PROC_CHILD) { + SRpcMsg resp = {0}; + SMEpSet msg = {.epSet = *pNewEpSet}; + int32_t len = tSerializeSMEpSet(NULL, 0, &msg); + resp.pCont = rpcMallocCont(len); + resp.contLen = len; + tSerializeSMEpSet(resp.pCont, len, &msg); + + resp.code = TSDB_CODE_RPC_REDIRECT; + resp.handle = pRsp->handle; + resp.refId = pRsp->refId; + rpcSendResponse(&resp); + } else { + taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP); + } +} + +#if 0 static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT); if (pWrapper->procType != DND_PROC_CHILD) { @@ -328,6 +349,7 @@ static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP); } } +#endif static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { if (pWrapper->procType != DND_PROC_CHILD) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 407158283b1998d155c6fa9d4f69249053ec508d..f3d7253e716de5037194298fb2970f007474e48b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -155,6 +155,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.code = TSDB_CODE_RPC_REDIRECT; SEpSet newEpSet; syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet); + newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; tmsgSendRedirectRsp(&rsp, &newEpSet); } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 857b572cab108c70f86d7ac05da1cd5413bb0d6e..9a8dcc57d904aceb581b159aae755ba2170fbdfe 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -174,11 +174,16 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { assert(rid == pSyncNode->rid); pEpSet->numOfEps = 0; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - snprintf(pEpSet->eps->fqdn, sizeof(pEpSet->eps->fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); - pEpSet->eps->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; + snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); + pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; (pEpSet->numOfEps)++; + + sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + } pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; + + sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse); taosReleaseRef(tsNodeRefId, pSyncNode->rid); }