diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5106196ccdc0b96903eeb10fc93e4d2d002fc9a5..0d2d557623f6ad47acf54448fc1f2743f6a2de05 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -223,6 +223,7 @@ int32_t* taosGetErrno(); // #define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x033C) // 2.x // #define TSDB_CODE_MND_DNODE_ID_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033D) // 2.x // #define TSDB_CODE_MND_DNODE_EP_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033E) // 2.x +#define TSDB_CODE_MND_DNODE_DIFF_CLUSTER TAOS_DEF_ERROR_CODE(0, 0x033F) // internal // mnode-acct #define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 228f301aec7712fce516ddad2d22f4b2823a36b0..8348980e72fbe7f0a653dc25aba65a5195256f07 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -127,6 +127,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); +/* rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); @@ -135,6 +136,23 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse); } dmProcessStatusRsp(pMgmt, &rpcRsp); +*/ + + for(int i =0; i < epSet.numOfEps; i++){ + SEpSet epSingleSet = {0}; + epSingleSet.numOfEps = 1; + epSingleSet.inUse = 1; + epSingleSet.eps[0] = epSet.eps[i]; + + rpcSendRecv(pMgmt->msgCb.clientRpc, &epSingleSet, &rpcMsg, &rpcRsp); + if (rpcRsp.code != 0 && rpcRsp.code != TSDB_CODE_MND_DNODE_DIFF_CLUSTER) { + dError("failed to send status req since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSingleSet.numOfEps, + epSingleSet.inUse); + } + if(rpcRsp.code != TSDB_CODE_MND_DNODE_DIFF_CLUSTER){ + dmProcessStatusRsp(pMgmt, &rpcRsp); + } + } } int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index b0810d528f761d6362b27a818636d52e8421429c..d73e76ea12fc7efa5451f70d10e2f903cc925ae4 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -53,7 +53,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = mndProcessRpcMsg(pMsg); - if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) { + if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS /*&& code != TSDB_CODE_MND_DNODE_DIFF_CLUSTER*/) { if (code != 0 && terrno != 0) code = terrno; mmSendRsp(pMsg, code); } else { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 1d9db37a7d68403781b72b1d8c9d8b43951d0261..1b199c6c38a8c0ce3e66a9bacfed99a07471930d 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -25,6 +25,7 @@ #include "mndUser.h" #include "mndVgroup.h" #include "tmisce.h" +#include "mndCluster.h" #define TSDB_DNODE_VER_NUMBER 1 #define TSDB_DNODE_RESERVE_SIZE 64 @@ -366,6 +367,15 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { goto _OVER; } + int64_t clusterid = mndGetClusterId(pMnode); + if(statusReq.clusterId != clusterid) + { + int32_t err = TSDB_CODE_MND_DNODE_DIFF_CLUSTER; + mWarn("dnode:%d, %s, its clusterid:%ld differ from current cluster:%ld, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, err); + code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER; + goto _OVER; + } + if (statusReq.dnodeId == 0) { pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); if (pDnode == NULL) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 04e4859c5e3578453431dffa635367117c372f06..ab3d58b7504a85b3557a579fae0544b38f3deb08 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -37,6 +37,7 @@ #include "syncVoteMgr.h" #include "tglobal.h" #include "tref.h" +#include "syncUtil.h" static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); @@ -2303,6 +2304,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->startTime = ths->startTime; pMsgReply->timeStamp = tsMs; + if(CID(&(pMsg->srcId)) != CID(&(ths->myRaftId))) + { + sWarn("vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current cluster:%d", ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)) , CID(&(ths->myRaftId))); + return 0; + } + if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 2fda2a19b8dad08a49d35e8d0492f7e3259fbffe..0c3cac21ad126dc65525c63001bc182dc19bbf62 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -20,6 +20,7 @@ #include "syncRaftStore.h" #include "syncUtil.h" #include "syncVoteMgr.h" +#include "syncUtil.h" // TLA+ Spec // HandleRequestVoteRequest(i, j, m) == @@ -90,6 +91,12 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncRequestVote* pMsg = pRpcMsg->pCont; + if(CID(&(pMsg->srcId)) != CID(&(ths->myRaftId))) + { + sWarn("vgId:%d, drop RequestVote msg from dnode:%d, because it come from another cluster:%d, differ from current cluster:%d", ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)) , CID(&(ths->myRaftId))); + return -1; + } + // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) { syncLogRecvRequestVote(ths, pMsg, -1, "not in my config");