From 5335c084498078d5e41f13db2bd504780507a4e6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 23 Dec 2021 14:09:05 +0800 Subject: [PATCH] TD-10431 refact drop mnode msg --- source/dnode/mnode/impl/src/mndMnode.c | 240 ++++++++++++++++--------- 1 file changed, 154 insertions(+), 86 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 9120b7fe27..68f47cf392 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -202,52 +202,6 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { } } -static SCreateMnodeInMsg *mndBuildCreateMnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SMnodeObj *pObj) { - SCreateMnodeInMsg *pCreate = calloc(1, sizeof(SCreateMnodeInMsg)); - if (pCreate == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pCreate->dnodeId = htonl(pObj->id); - - int32_t numOfReplicas = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - - while (numOfReplicas < TSDB_MAX_REPLICA - 1) { - SMnodeObj *pObj = NULL; - pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); - if (pIter == NULL) break; - if (pObj->pDnode == NULL) break; - - SReplica *pReplica = &pCreate->replicas[numOfReplicas]; - pReplica->id = htonl(pObj->id); - pReplica->port = htons(pObj->pDnode->port); - memcpy(pReplica->fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN); - numOfReplicas++; - } - - numOfReplicas++; - SReplica *pReplica = &pCreate->replicas[numOfReplicas]; - pReplica->id = htonl(pObj->id); - pReplica->port = htons(pDnode->port); - memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - - return pCreate; -} - -static SDropMnodeInMsg *mndBuildDropMnodeMsg(SMnode *pMnode, SMnodeObj *pObj) { - SDropMnodeInMsg *pDrop = calloc(1, sizeof(SDropMnodeInMsg)); - if (pDrop == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pDrop->dnodeId = htonl(pObj->id); - return pDrop; -} - static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); if (pRedoRaw == NULL) return -1; @@ -272,29 +226,86 @@ static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnod return 0; } -static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { - STransAction action = {0}; +static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int32_t numOfReplicas = 0; - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pObj->id); - if (pDnode == NULL) return -1; - action.epSet = mndGetDnodeEpset(pDnode); - mndReleaseDnode(pMnode, pDnode); + SCreateMnodeInMsg createMsg = {0}; + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; - SCreateMnodeInMsg *pMsg = mndBuildCreateMnodeMsg(pMnode, pDnode, pObj); - if (pMsg == NULL) return -1; + SReplica *pReplica = &createMsg.replicas[numOfReplicas]; + pReplica->id = htonl(pMObj->id); + pReplica->port = htons(pMObj->pDnode->port); + memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + numOfReplicas++; - action.pCont = pMsg; - action.contLen = sizeof(SCreateMnodeInMsg); - action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); - return -1; + sdbRelease(pSdb, pMObj); + } + + SReplica *pReplica = &createMsg.replicas[numOfReplicas]; + pReplica->id = htonl(pDnode->id); + pReplica->port = htons(pDnode->port); + memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + numOfReplicas++; + + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; + + STransAction action = {0}; + + SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg)); + if (pMsg == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pMObj); + return -1; + } + memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg)); + + pMsg->dnodeId = htonl(pMObj->id); + action.epSet = mndGetDnodeEpset(pMObj->pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SAlterMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_ALTER_MNODE_IN; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pMObj); + return -1; + } + + sdbRelease(pSdb, pMObj); + } + + { + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + + SCreateMnodeInMsg *pMsg = malloc(sizeof(SCreateMnodeInMsg)); + if (pMsg == NULL) return -1; + memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg)); + pMsg->dnodeId = htonl(pObj->id); + + action.epSet = mndGetDnodeEpset(pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SCreateMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } } return 0; } -static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *pCreate) { +static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SCreateMnodeMsg *pCreate) { SMnodeObj mnodeObj = {0}; mnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_MNODE); mnodeObj.createdTime = taosGetTimestampMs(); @@ -318,7 +329,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg * goto CREATE_MNODE_OVER; } - if (mndSetCreateMnodeRedoActions(pMnode, pTrans, &mnodeObj) != 0) { + if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) { mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); goto CREATE_MNODE_OVER; } @@ -343,22 +354,23 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { mDebug("mnode:%d, start to create", pCreate->dnodeId); - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); - if (pDnode == NULL) { - mError("mnode:%d, dnode not exist", pDnode->id); - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - return -1; - } - mndReleaseDnode(pMnode, pDnode); - SMnodeObj *pObj = mndAcquireMnode(pMnode, pCreate->dnodeId); if (pObj != NULL) { + mndReleaseMnode(pMnode, pObj); mError("mnode:%d, mnode already exist", pObj->id); terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; return -1; } - int32_t code = mndCreateMnode(pMnode, pMsg, pCreate); + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); + if (pDnode == NULL) { + mError("mnode:%d, dnode not exist", pDnode->id); + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + return -1; + } + + int32_t code = mndCreateMnode(pMnode, pMsg, pDnode, pCreate); + mndReleaseDnode(pMnode, pDnode); if (code != 0) { mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); @@ -384,23 +396,79 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO return 0; } -static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { - STransAction action = {0}; +static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int32_t numOfReplicas = 0; - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pObj->id); - if (pDnode == NULL) return -1; - action.epSet = mndGetDnodeEpset(pDnode); - mndReleaseDnode(pMnode, pDnode); + SAlterMnodeInMsg alterMsg = {0}; + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; - SDropMnodeInMsg *pMsg = mndBuildDropMnodeMsg(pMnode, pObj); - if (pMsg == NULL) return -1; + if (pMObj->id != pObj->id) { + SReplica *pReplica = &alterMsg.replicas[numOfReplicas]; + pReplica->id = htonl(pMObj->id); + pReplica->port = htons(pMObj->pDnode->port); + memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + numOfReplicas++; + } - action.pCont = pMsg; - action.contLen = sizeof(SDropMnodeInMsg); - action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); - return -1; + sdbRelease(pSdb, pMObj); + } + + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; + if (pMObj->id != pObj->id) { + STransAction action = {0}; + + SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg)); + if (pMsg == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pMObj); + return -1; + } + memcpy(pMsg, &alterMsg, sizeof(SAlterMnodeInMsg)); + + pMsg->dnodeId = htonl(pMObj->id); + action.epSet = mndGetDnodeEpset(pMObj->pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SAlterMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_ALTER_MNODE_IN; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pMObj); + return -1; + } + } + + sdbRelease(pSdb, pMObj); + } + + { + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + + SDropMnodeInMsg *pMsg = malloc(sizeof(SDropMnodeInMsg)); + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pMsg->dnodeId = htonl(pObj->id); + + action.epSet = mndGetDnodeEpset(pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SDropMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_DROP_MNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } } return 0; @@ -426,7 +494,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { goto DROP_MNODE_OVER; } - if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pObj) != 0) { + if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) { mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); goto DROP_MNODE_OVER; } -- GitLab