From 2584b034df5ae8338c8cc4df88802900e3ac84dd Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 30 Apr 2022 10:29:29 +0800 Subject: [PATCH] add syncGetEpSet --- source/dnode/mgmt/implement/src/dmTransport.c | 22 +++++++++++++++++++ source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 1 + source/libs/sync/src/syncMain.c | 9 ++++++-- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/implement/src/dmTransport.c b/source/dnode/mgmt/implement/src/dmTransport.c index 1dabc83f57..446894556e 100644 --- a/source/dnode/mgmt/implement/src/dmTransport.c +++ b/source/dnode/mgmt/implement/src/dmTransport.c @@ -320,6 +320,27 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { } } +static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { + ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT); + ASSERT(pRsp->pCont == NULL); + if (pWrapper->procType != DND_PROC_CHILD) { + SRpcMsg resp = {0}; + SMEpSet msg = {.epSet = *pNewEpSet}; + int32_t len = tSerializeSMEpSet(NULL, 0, &msg); + resp.pCont = rpcMallocCont(len); + resp.contLen = len; + tSerializeSMEpSet(resp.pCont, len, &msg); + + resp.code = TSDB_CODE_RPC_REDIRECT; + resp.handle = pRsp->handle; + resp.refId = pRsp->refId; + rpcSendResponse(&resp); + } else { + taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP); + } +} + +#if 0 static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT); if (pWrapper->procType != DND_PROC_CHILD) { @@ -328,6 +349,7 @@ static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP); } } +#endif static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { if (pWrapper->procType != DND_PROC_CHILD) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 407158283b..f3d7253e71 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -155,6 +155,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.code = TSDB_CODE_RPC_REDIRECT; SEpSet newEpSet; syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet); + newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; tmsgSendRedirectRsp(&rsp, &newEpSet); } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 857b572cab..9a8dcc57d9 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -174,11 +174,16 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { assert(rid == pSyncNode->rid); pEpSet->numOfEps = 0; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - snprintf(pEpSet->eps->fqdn, sizeof(pEpSet->eps->fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); - pEpSet->eps->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; + snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); + pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; (pEpSet->numOfEps)++; + + sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + } pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; + + sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse); taosReleaseRef(tsNodeRefId, pSyncNode->rid); } -- GitLab