From 1100ec3fb3f6c51c6230a03f9cd8b3d4be882e2d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 25 May 2022 17:44:43 +0800 Subject: [PATCH] refacor: alter mnode --- include/dnode/mnode/mnode.h | 9 ---- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 1 - source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 16 ------- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 40 ---------------- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 3 -- source/dnode/mnode/impl/src/mndMnode.c | 51 +++++++++++++++++++++ source/dnode/mnode/impl/src/mndSync.c | 30 +++--------- tests/script/tsim/mnode/basic2.sim | 2 +- 8 files changed, 59 insertions(+), 93 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index bc14dc3210..ddd6f1c05f 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -55,15 +55,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption); */ void mndClose(SMnode *pMnode); -/** - * @brief Close a mnode. - * - * @param pMnode The mnode object to close. - * @param pOption Options of the mnode. - * @return int32_t 0 for success, -1 for failure. - */ -int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption); - /** * @brief Start mnode * diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 854c06a0c4..bd034fe7d6 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -46,7 +46,6 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); // mmInt.c -int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg); int32_t mmAcquire(SMnodeMgmt *pMgmt); void mmRelease(SMnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index a894a4962d..90d7b88859 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -124,22 +124,6 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { return 0; } -int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SDAlterMnodeReq alterReq = {0}; - if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (pMgmt->pData->dnodeId != 0 && alterReq.dnodeId != pMgmt->pData->dnodeId) { - terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMgmt->pData->dnodeId); - return -1; - } else { - return mmAlter(pMgmt, &alterReq); - } -} - SArray *mmGetMsgHandles() { int32_t code = -1; SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 964c2d42b7..1b973f3045 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -73,46 +73,6 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { } } -static int32_t mmBuildOptionForAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { - pOption->standby = false; - pOption->deploy = false; - pOption->msgCb = pMgmt->msgCb; - pOption->dnodeId = pMgmt->pData->dnodeId; - - pOption->replica = pCreate->replica; - pOption->selfIndex = -1; - - for (int32_t i = 0; i < pCreate->replica; ++i) { - SReplica *pReplica = &pOption->replicas[i]; - pReplica->id = pCreate->replicas[i].id; - pReplica->port = pCreate->replicas[i].port; - memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); - if (pReplica->id == pMgmt->pData->dnodeId) { - pOption->selfIndex = i; - } - } - - if (pOption->selfIndex == -1) { - dError("failed to build mnode options since %s", terrstr()); - return -1; - } - - return 0; -} - -int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) { - SMnodeOpt option = {0}; - if (mmBuildOptionForAlter(pMgmt, &option, pMsg) != 0) { - return -1; - } - - if (mndAlter(pMgmt->pMnode, &option) != 0) { - return -1; - } - - return 0; -} - static void mmClose(SMnodeMgmt *pMgmt) { if (pMgmt->pMnode != NULL) { mmStopWorker(pMgmt); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 71cc2d2693..85120102bc 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -32,9 +32,6 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dTrace("msg:%p, get from mnode queue", pMsg); switch (pMsg->msgType) { - case TDMT_DND_ALTER_MNODE: - code = mmProcessAlterReq(pMgmt, pMsg); - break; case TDMT_MON_MM_INFO: code = mmProcessGetMonitorInfoReq(pMgmt, pMsg); break; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 93284a95c5..a47221ea39 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -31,6 +31,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew); static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq); +static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq); static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp); @@ -51,6 +52,7 @@ int32_t mndInitMnode(SMnode *pMnode) { }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq); + mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE, mndProcessAlterMnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndProcessCreateMnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp); @@ -658,3 +660,52 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SDAlterMnodeReq alterReq = {0}; + + if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if (alterReq.dnodeId != pMnode->selfDnodeId) { + terrno = TSDB_CODE_INVALID_OPTION; + mError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMnode->selfDnodeId); + return -1; + } + + SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1}; + for (int32_t i = 0; i < alterReq.replica; ++i) { + SNodeInfo *pNode = &cfg.nodeInfo[i]; + tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = alterReq.replicas[i].port; + if (alterReq.replicas[i].id == pMnode->selfDnodeId) cfg.myIndex = i; + } + + if (cfg.myIndex == -1) { + mError("failed to alter mnode since myindex is -1"); + return -1; + } else { + mInfo("start to alter mnode sync, replica:%d myindex:%d", cfg.replicaNum, cfg.myIndex); + for (int32_t i = 0; i < alterReq.replica; ++i) { + SNodeInfo *pNode = &cfg.nodeInfo[i]; + mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); + } + } + + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + pMgmt->standby = 0; + int32_t code = syncReconfig(pMgmt->sync, &cfg); + if (code != 0) { + mError("failed to alter mnode sync since %s", terrstr()); + return code; + } else { + pMgmt->errCode = 0; + tsem_wait(&pMgmt->syncSem); + mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode)); + terrno = pMgmt->errCode; + return pMgmt->errCode; + } +} diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 47f11b652e..ca25133c96 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -74,14 +74,13 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf); return 0; } - -void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { - mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%ld, cbMeta.term:%ld, cbMeta.index:%ld", cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index); - if (cbMeta.code == 0) { - // config change success - } else { - // config change failed - } + +void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { + mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%" PRId64 ", cbMeta.term:%" PRId64 ", cbMeta.index:%" PRId64, + cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index); + SMnode *pMnode = pFsm->data; + pMnode->syncMgmt.errCode = cbMeta.code; + tsem_post(&pMnode->syncMgmt.syncSem); } SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { @@ -207,18 +206,3 @@ bool mndIsMaster(SMnode *pMnode) { ESyncState state = syncGetMyRole(pMgmt->sync); return (state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored); } - -int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { - SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex}; - mInfo("start to alter mnode sync, replica:%d myindex:%d standby:%d", cfg.replicaNum, cfg.myIndex, pOption->standby); - for (int32_t i = 0; i < pOption->replica; ++i) { - SNodeInfo *pNode = &cfg.nodeInfo[i]; - tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); - pNode->nodePort = pOption->replicas[i].port; - mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); - } - - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - pMgmt->standby = pOption->standby; - return syncReconfig(pMgmt->sync, &cfg); -} \ No newline at end of file diff --git a/tests/script/tsim/mnode/basic2.sim b/tests/script/tsim/mnode/basic2.sim index f293718285..f1a3a8c251 100644 --- a/tests/script/tsim/mnode/basic2.sim +++ b/tests/script/tsim/mnode/basic2.sim @@ -60,7 +60,7 @@ endi if $data(2)[0] != 2 then return -1 endi -if $data(2)[2] != FOLLOWER then +if $data(2)[2] == LEADER then return -1 endi -- GitLab