提交 0e2e336e 编写于 作者: C cadem

fix/cluster-isolation

上级 c4d6be13
...@@ -223,6 +223,7 @@ int32_t* taosGetErrno(); ...@@ -223,6 +223,7 @@ int32_t* taosGetErrno();
// #define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x033C) // 2.x // #define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x033C) // 2.x
// #define TSDB_CODE_MND_DNODE_ID_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033D) // 2.x // #define TSDB_CODE_MND_DNODE_ID_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033D) // 2.x
// #define TSDB_CODE_MND_DNODE_EP_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033E) // 2.x // #define TSDB_CODE_MND_DNODE_EP_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033E) // 2.x
#define TSDB_CODE_MND_DNODE_DIFF_CLUSTER TAOS_DEF_ERROR_CODE(0, 0x033F) // internal
// mnode-acct // mnode-acct
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340) #define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340)
......
...@@ -127,6 +127,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -127,6 +127,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet); dmGetMnodeEpSet(pMgmt->pData, &epSet);
/*
rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp); rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dmRotateMnodeEpSet(pMgmt->pData); dmRotateMnodeEpSet(pMgmt->pData);
...@@ -135,6 +136,23 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -135,6 +136,23 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse); dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse);
} }
dmProcessStatusRsp(pMgmt, &rpcRsp); dmProcessStatusRsp(pMgmt, &rpcRsp);
*/
for(int i =0; i < epSet.numOfEps; i++){
SEpSet epSingleSet = {0};
epSingleSet.numOfEps = 1;
epSingleSet.inUse = 1;
epSingleSet.eps[0] = epSet.eps[i];
rpcSendRecv(pMgmt->msgCb.clientRpc, &epSingleSet, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0 && rpcRsp.code != TSDB_CODE_MND_DNODE_DIFF_CLUSTER) {
dError("failed to send status req since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSingleSet.numOfEps,
epSingleSet.inUse);
}
if(rpcRsp.code != TSDB_CODE_MND_DNODE_DIFF_CLUSTER){
dmProcessStatusRsp(pMgmt, &rpcRsp);
}
}
} }
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
......
...@@ -53,7 +53,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -53,7 +53,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = mndProcessRpcMsg(pMsg); int32_t code = mndProcessRpcMsg(pMsg);
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS /*&& code != TSDB_CODE_MND_DNODE_DIFF_CLUSTER*/) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code); mmSendRsp(pMsg, code);
} else { } else {
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "tmisce.h" #include "tmisce.h"
#include "mndCluster.h"
#define TSDB_DNODE_VER_NUMBER 1 #define TSDB_DNODE_VER_NUMBER 1
#define TSDB_DNODE_RESERVE_SIZE 64 #define TSDB_DNODE_RESERVE_SIZE 64
...@@ -366,6 +367,15 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -366,6 +367,15 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
int64_t clusterid = mndGetClusterId(pMnode);
if(statusReq.clusterId != clusterid)
{
int32_t err = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
mWarn("dnode:%d, %s, its clusterid:%ld differ from current cluster:%ld, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, err);
code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
goto _OVER;
}
if (statusReq.dnodeId == 0) { if (statusReq.dnodeId == 0) {
pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
if (pDnode == NULL) { if (pDnode == NULL) {
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "tglobal.h" #include "tglobal.h"
#include "tref.h" #include "tref.h"
#include "syncUtil.h"
static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId);
...@@ -2303,6 +2304,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2303,6 +2304,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->startTime = ths->startTime; pMsgReply->startTime = ths->startTime;
pMsgReply->timeStamp = tsMs; pMsgReply->timeStamp = tsMs;
if(CID(&(pMsg->srcId)) != CID(&(ths->myRaftId)))
{
sWarn("vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current cluster:%d", ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)) , CID(&(ths->myRaftId)));
return 0;
}
if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "syncUtil.h"
// TLA+ Spec // TLA+ Spec
// HandleRequestVoteRequest(i, j, m) == // HandleRequestVoteRequest(i, j, m) ==
...@@ -90,6 +91,12 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -90,6 +91,12 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncRequestVote* pMsg = pRpcMsg->pCont; SyncRequestVote* pMsg = pRpcMsg->pCont;
if(CID(&(pMsg->srcId)) != CID(&(ths->myRaftId)))
{
sWarn("vgId:%d, drop RequestVote msg from dnode:%d, because it come from another cluster:%d, differ from current cluster:%d", ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)) , CID(&(ths->myRaftId)));
return -1;
}
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) { if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
syncLogRecvRequestVote(ths, pMsg, -1, "not in my config"); syncLogRecvRequestVote(ths, pMsg, -1, "not in my config");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册