提交 1100ec3f 编写于 作者: S Shengliang Guan

refacor: alter mnode

上级 077ea1b6
...@@ -55,15 +55,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption); ...@@ -55,15 +55,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption);
*/ */
void mndClose(SMnode *pMnode); void mndClose(SMnode *pMnode);
/**
* @brief Close a mnode.
*
* @param pMnode The mnode object to close.
* @param pOption Options of the mnode.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption);
/** /**
* @brief Start mnode * @brief Start mnode
* *
......
...@@ -46,7 +46,6 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); ...@@ -46,7 +46,6 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c // mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
int32_t mmAcquire(SMnodeMgmt *pMgmt); int32_t mmAcquire(SMnodeMgmt *pMgmt);
void mmRelease(SMnodeMgmt *pMgmt); void mmRelease(SMnodeMgmt *pMgmt);
......
...@@ -124,22 +124,6 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -124,22 +124,6 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pMgmt->pData->dnodeId != 0 && alterReq.dnodeId != pMgmt->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMgmt->pData->dnodeId);
return -1;
} else {
return mmAlter(pMgmt, &alterReq);
}
}
SArray *mmGetMsgHandles() { SArray *mmGetMsgHandles() {
int32_t code = -1; int32_t code = -1;
SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle));
......
...@@ -73,46 +73,6 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { ...@@ -73,46 +73,6 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
} }
} }
static int32_t mmBuildOptionForAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
pOption->standby = false;
pOption->deploy = false;
pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
pOption->replica = pCreate->replica;
pOption->selfIndex = -1;
for (int32_t i = 0; i < pCreate->replica; ++i) {
SReplica *pReplica = &pOption->replicas[i];
pReplica->id = pCreate->replicas[i].id;
pReplica->port = pCreate->replicas[i].port;
memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pMgmt->pData->dnodeId) {
pOption->selfIndex = i;
}
}
if (pOption->selfIndex == -1) {
dError("failed to build mnode options since %s", terrstr());
return -1;
}
return 0;
}
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
SMnodeOpt option = {0};
if (mmBuildOptionForAlter(pMgmt, &option, pMsg) != 0) {
return -1;
}
if (mndAlter(pMgmt->pMnode, &option) != 0) {
return -1;
}
return 0;
}
static void mmClose(SMnodeMgmt *pMgmt) { static void mmClose(SMnodeMgmt *pMgmt) {
if (pMgmt->pMnode != NULL) { if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt); mmStopWorker(pMgmt);
......
...@@ -32,9 +32,6 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -32,9 +32,6 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dTrace("msg:%p, get from mnode queue", pMsg); dTrace("msg:%p, get from mnode queue", pMsg);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_DND_ALTER_MNODE:
code = mmProcessAlterReq(pMgmt, pMsg);
break;
case TDMT_MON_MM_INFO: case TDMT_MON_MM_INFO:
code = mmProcessGetMonitorInfoReq(pMgmt, pMsg); code = mmProcessGetMonitorInfoReq(pMgmt, pMsg);
break; break;
......
...@@ -31,6 +31,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj); ...@@ -31,6 +31,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew); static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq); static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq);
static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq);
static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp);
...@@ -51,6 +52,7 @@ int32_t mndInitMnode(SMnode *pMnode) { ...@@ -51,6 +52,7 @@ int32_t mndInitMnode(SMnode *pMnode) {
}; };
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE, mndProcessAlterMnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndProcessCreateMnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndProcessCreateMnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp);
...@@ -658,3 +660,52 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) { ...@@ -658,3 +660,52 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (alterReq.dnodeId != pMnode->selfDnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
mError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMnode->selfDnodeId);
return -1;
}
SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
for (int32_t i = 0; i < alterReq.replica; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i];
tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = alterReq.replicas[i].port;
if (alterReq.replicas[i].id == pMnode->selfDnodeId) cfg.myIndex = i;
}
if (cfg.myIndex == -1) {
mError("failed to alter mnode since myindex is -1");
return -1;
} else {
mInfo("start to alter mnode sync, replica:%d myindex:%d", cfg.replicaNum, cfg.myIndex);
for (int32_t i = 0; i < alterReq.replica; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i];
mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
}
}
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->standby = 0;
int32_t code = syncReconfig(pMgmt->sync, &cfg);
if (code != 0) {
mError("failed to alter mnode sync since %s", terrstr());
return code;
} else {
pMgmt->errCode = 0;
tsem_wait(&pMgmt->syncSem);
mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode));
terrno = pMgmt->errCode;
return pMgmt->errCode;
}
}
...@@ -74,14 +74,13 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char ...@@ -74,14 +74,13 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char
sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf); sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf);
return 0; return 0;
} }
void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%ld, cbMeta.term:%ld, cbMeta.index:%ld", cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index); mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%" PRId64 ", cbMeta.term:%" PRId64 ", cbMeta.index:%" PRId64,
if (cbMeta.code == 0) { cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index);
// config change success SMnode *pMnode = pFsm->data;
} else { pMnode->syncMgmt.errCode = cbMeta.code;
// config change failed tsem_post(&pMnode->syncMgmt.syncSem);
}
} }
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
...@@ -207,18 +206,3 @@ bool mndIsMaster(SMnode *pMnode) { ...@@ -207,18 +206,3 @@ bool mndIsMaster(SMnode *pMnode) {
ESyncState state = syncGetMyRole(pMgmt->sync); ESyncState state = syncGetMyRole(pMgmt->sync);
return (state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored); return (state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
} }
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex};
mInfo("start to alter mnode sync, replica:%d myindex:%d standby:%d", cfg.replicaNum, cfg.myIndex, pOption->standby);
for (int32_t i = 0; i < pOption->replica; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pOption->replicas[i].port;
mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
}
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->standby = pOption->standby;
return syncReconfig(pMgmt->sync, &cfg);
}
\ No newline at end of file
...@@ -60,7 +60,7 @@ endi ...@@ -60,7 +60,7 @@ endi
if $data(2)[0] != 2 then if $data(2)[0] != 2 then
return -1 return -1
endi endi
if $data(2)[2] != FOLLOWER then if $data(2)[2] == LEADER then
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册