From 58389568319199119b461292107367e79ea88743 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 6 Jun 2022 11:49:30 +0800 Subject: [PATCH] refactor: dnode online --- source/dnode/mnode/impl/inc/mndDnode.h | 2 +- source/dnode/mnode/impl/inc/mndMnode.h | 1 + source/dnode/mnode/impl/inc/mndVgroup.h | 24 +- source/dnode/mnode/impl/src/mndDb.c | 108 +------- source/dnode/mnode/impl/src/mndDnode.c | 80 +++--- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/dnode/mnode/impl/src/mndMnode.c | 22 +- source/dnode/mnode/impl/src/mndVgroup.c | 345 ++++++++++++++++++------ source/dnode/mnode/sdb/src/sdbRaw.c | 6 +- 9 files changed, 345 insertions(+), 245 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDnode.h b/source/dnode/mnode/impl/inc/mndDnode.h index c76186c0a2..cf1e7422be 100644 --- a/source/dnode/mnode/impl/inc/mndDnode.h +++ b/source/dnode/mnode/impl/inc/mndDnode.h @@ -28,7 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId); void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); SEpSet mndGetDnodeEpset(SDnodeObj *pDnode); int32_t mndGetDnodeSize(SMnode *pMnode); -bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs); +bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index fd62b3ce75..a433af9947 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -28,6 +28,7 @@ SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId); void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); +int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 2119320de5..89f93d30b5 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -30,15 +30,21 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); -int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); -int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); -SArray *mndBuildDnodesArray(SMnode *pMnode); -int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray); -int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pVgId); - -void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, bool standby); -void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); -void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); +SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId); +int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup); +int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups); +int32_t mndAddVnodeToVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray); +int32_t mndRemoveVnodeFromVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid); +int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby); +int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup); +int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType); +int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo); +int32_t mndSetMoveVgroupInfoToTrans(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vn, SArray *pArray); +int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId); + +void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby); +void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); +void *mndBuildAlterVnodeReq(SMnode *, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 1ce1a2590a..d2b468d588 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -263,111 +263,6 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) { sdbRelease(pSdb, pDb); } -static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, - bool standby) { - STransAction action = {0}; - - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pDnode == NULL) return -1; - action.epSet = mndGetDnodeEpset(pDnode); - mndReleaseDnode(pMnode, pDnode); - - int32_t contLen = 0; - void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, standby); - if (pReq == NULL) return -1; - - action.pCont = pReq; - action.contLen = contLen; - action.msgType = TDMT_DND_CREATE_VNODE; - action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - - return 0; -} - -static int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - - int32_t contLen = sizeof(SMsgHead); - SMsgHead *pHead = taosMemoryMalloc(contLen); - if (pHead == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pHead->contLen = htonl(contLen); - pHead->vgId = htonl(pVgroup->vgId); - - action.pCont = pHead; - action.contLen = contLen; - action.msgType = TDMT_VND_ALTER_CONFIRM; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pHead); - return -1; - } - - return 0; -} - -static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) { - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - - int32_t contLen = 0; - void *pReq = mndBuildAlterVnodeReq(pMnode, pDb, pVgroup, &contLen); - if (pReq == NULL) return -1; - - action.pCont = pReq; - action.contLen = contLen; - action.msgType = msgType; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - - return 0; -} - -static int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, - bool isRedo) { - STransAction action = {0}; - - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pDnode == NULL) return -1; - action.epSet = mndGetDnodeEpset(pDnode); - mndReleaseDnode(pMnode, pDnode); - - int32_t contLen = 0; - void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); - if (pReq == NULL) return -1; - - action.pCont = pReq; - action.contLen = contLen; - action.msgType = TDMT_DND_DROP_VNODE; - action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED; - - if (isRedo) { - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } else { - if (mndTransAppendUndoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } - - return 0; -} - static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) { char *pos = strstr(dbName, TS_PATH_DELIMITER); if (pos == NULL) { @@ -795,7 +690,7 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; - SArray *pArray = mndBuildDnodesArray(pMnode); + SArray *pArray = mndBuildDnodesArray(pMnode, 0); while (1) { SVgObj *pVgroup = NULL; @@ -1742,3 +1637,4 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index aeff018aa8..9bdee92113 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -254,7 +254,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { return sdbGetSize(pSdb, SDB_DNODE); } -bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) { +bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) { int64_t interval = TABS(pDnode->lastAccessTime - curMs); if (interval > 5000 * tsStatusInterval) { if (pDnode->rebootTime > 0) { @@ -393,7 +393,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); int64_t curMs = taosGetTimestampMs(); - bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + bool online = mndIsDnodeOnline(pDnode, curMs); bool dnodeChanged = (statusReq.dnodeVer != dnodeVer); bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool needCheck = !online || dnodeChanged || reboot; @@ -559,30 +559,36 @@ CREATE_DNODE_OVER: return code; } -static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq); - if (pTrans == NULL) { - mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr()); - return -1; - } +static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj) { + int32_t code = -1; + SSdbRaw *pRaw = NULL; + STrans *pTrans = NULL; + + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq); + if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop dnode:%d", pTrans->id, pDnode->id); - SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + pRaw = mndDnodeActionEncode(pDnode); + if (pRaw == NULL || mndTransAppendRedolog(pTrans, pRaw) != 0) goto _OVER; + sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING); + pRaw = NULL; - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } + pRaw = mndDnodeActionEncode(pDnode); + if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; + sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); + pRaw = NULL; + + if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj) != 0) goto _OVER; + if (mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + mndTransSetSerial(pTrans); + code = 0; + +_OVER: mndTransDrop(pTrans); - return 0; + sdbFreeRaw(pRaw); + return code; } static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { @@ -595,42 +601,53 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto DROP_DNODE_OVER; + goto _OVER; } mDebug("dnode:%d, start to drop", dropReq.dnodeId); if (dropReq.dnodeId <= 0) { terrno = TSDB_CODE_MND_INVALID_DNODE_ID; - goto DROP_DNODE_OVER; + goto _OVER; } pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId); if (pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - goto DROP_DNODE_OVER; + goto _OVER; + } + + if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) { + terrno = TSDB_CODE_NODE_OFFLINE; + goto _OVER; } pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId); if (pMObj != NULL) { - terrno = TSDB_CODE_MND_MNODE_NOT_EXIST; - goto DROP_DNODE_OVER; + if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) { + terrno = TSDB_CODE_MND_TOO_FEW_MNODES; + goto _OVER; + } + if (pMnode->selfDnodeId == dropReq.dnodeId) { + terrno = TSDB_CODE_MND_CANT_DROP_MASTER; + goto _OVER; + } } pUser = mndAcquireUser(pMnode, pReq->conn.user); if (pUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - goto DROP_DNODE_OVER; + goto _OVER; } if (mndCheckNodeAuth(pUser)) { - goto DROP_DNODE_OVER; + goto _OVER; } - code = mndDropDnode(pMnode, pReq, pDnode); + code = mndDropDnode(pMnode, pReq, pDnode, pMObj); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; -DROP_DNODE_OVER: +_OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } @@ -638,7 +655,6 @@ DROP_DNODE_OVER: mndReleaseDnode(pMnode, pDnode); mndReleaseUser(pMnode, pUser); mndReleaseMnode(pMnode, pMObj); - return code; } @@ -736,7 +752,7 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode); if (pShow->pIter == NULL) break; - bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + bool online = mndIsDnodeOnline(pDnode, curMs); cols = 0; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index d989ba6ec3..69300c0889 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -529,7 +529,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SMonDnodeDesc desc = {0}; desc.dnode_id = pObj->id; tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep)); - if (mndIsDnodeOnline(pMnode, pObj, ms)) { + if (mndIsDnodeOnline(pObj, ms)) { tstrncpy(desc.status, "ready", sizeof(desc.status)); } else { tstrncpy(desc.status, "offline", sizeof(desc.status)); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 1cc832b3af..abd65c1724 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -408,7 +408,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (!mndIsDnodeOnline(pMnode, pDnode, taosGetTimestampMs())) { + if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) { terrno = TSDB_CODE_NODE_OFFLINE; goto _OVER; } @@ -535,20 +535,26 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode return 0; } +int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1; + if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1; + if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1; + if (mndTransAppendNullLog(pTrans) != 0) return -1; + return 0; +} + static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { int32_t code = -1; + STrans *pTrans = NULL; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq); if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - mndTransSetSerial(pTrans); - if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER; - if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; - if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; - if (mndTransAppendNullLog(pTrans) != 0) goto _OVER; + if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + mndTransSetSerial(pTrans); code = 0; _OVER: @@ -642,7 +648,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB if (pObj->id == pMnode->selfDnodeId) { roles = syncStr(TAOS_SYNC_STATE_LEADER); } - if (pObj->pDnode && mndIsDnodeOnline(pMnode, pObj->pDnode, curMs)) { + if (pObj->pDnode && mndIsDnodeOnline(pObj->pDnode, curMs)) { roles = syncStr(pObj->state); } char b2[12 + VARSTR_HEADER_SIZE] = {0}; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 4f09eda733..afca8b14f0 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -344,9 +344,14 @@ static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2 static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SDnodeObj *pDnode = pObj; SArray *pArray = p1; + int32_t exceptDnodeId = *(int32_t *)p2; + + if (exceptDnodeId == pDnode->id) { + return true; + } int64_t curMs = taosGetTimestampMs(); - bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + bool online = mndIsDnodeOnline(pDnode, curMs); bool isMnode = mndIsMnode(pMnode, pDnode->id); pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); @@ -363,7 +368,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2 return true; } -SArray *mndBuildDnodesArray(SMnode *pMnode) { +SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) { SSdb *pSdb = pMnode->pSdb; int32_t numOfDnodes = mndGetDnodeSize(pMnode); @@ -374,7 +379,7 @@ SArray *mndBuildDnodesArray(SMnode *pMnode) { } sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL); - sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL); + sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, NULL); return pArray; } @@ -422,7 +427,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr } int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) { - SArray *pArray = mndBuildDnodesArray(pMnode); + SArray *pArray = mndBuildDnodesArray(pMnode, 0); if (pArray == NULL) return -1; pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); @@ -451,7 +456,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { goto _OVER; } - pArray = mndBuildDnodesArray(pMnode); + pArray = mndBuildDnodesArray(pMnode, 0); if (pArray == NULL) goto _OVER; mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray), @@ -501,86 +506,6 @@ _OVER: return code; } -int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { - taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); - for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SDnodeObj *pDnode = taosArrayGet(pArray, i); - mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); - } - - SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica]; - for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) { - SDnodeObj *pDnode = taosArrayGet(pArray, d); - - bool used = false; - for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { - if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) { - used = true; - break; - } - } - if (used) continue; - - if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { - terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; - return -1; - } - - pVgid->dnodeId = pDnode->id; - pVgid->role = TAOS_SYNC_STATE_ERROR; - mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is added", pVgroup->dbName, pVgroup->vgId, pVgroup->replica, - pVgid->dnodeId); - - pVgroup->replica++; - pDnode->numOfVnodes++; - return 0; - } - - terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; - mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr()); - return -1; -} - -int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid) { - taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); - for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SDnodeObj *pDnode = taosArrayGet(pArray, i); - mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); - } - - int32_t code = -1; - for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) { - SDnodeObj *pDnode = taosArrayGet(pArray, d); - - for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { - SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; - if (pVgid->dnodeId == pDnode->id) { - mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId); - pDnode->numOfVnodes--; - pVgroup->replica--; - *pDelVgid = *pVgid; - *pVgid = pVgroup->vnodeGid[pVgroup->replica]; - memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid)); - code = 0; - goto _OVER; - } - } - } - -_OVER: - if (code != 0) { - terrno = TSDB_CODE_APP_ERROR; - mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr()); - return -1; - } - - for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { - SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; - mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId); - } - return 0; -} - SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) { SEpSet epset = {0}; @@ -678,7 +603,7 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p bool online = false; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId); if (pDnode != NULL) { - online = mndIsDnodeOnline(pMnode, pDnode, curMs); + online = mndIsDnodeOnline(pDnode, curMs); mndReleaseDnode(pMnode, pDnode); } @@ -797,3 +722,251 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { + taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); + for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { + SDnodeObj *pDnode = taosArrayGet(pArray, i); + mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); + } + + SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica]; + for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) { + SDnodeObj *pDnode = taosArrayGet(pArray, d); + + bool used = false; + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) { + used = true; + break; + } + } + if (used) continue; + + if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { + terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + return -1; + } + + pVgid->dnodeId = pDnode->id; + pVgid->role = TAOS_SYNC_STATE_ERROR; + mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is added", pVgroup->dbName, pVgroup->vgId, pVgroup->replica, pVgid->dnodeId); + + pVgroup->replica++; + pDnode->numOfVnodes++; + return 0; + } + + terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr()); + return -1; +} + +int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid) { + taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); + for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { + SDnodeObj *pDnode = taosArrayGet(pArray, i); + mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); + } + + int32_t code = -1; + for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) { + SDnodeObj *pDnode = taosArrayGet(pArray, d); + + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; + if (pVgid->dnodeId == pDnode->id) { + mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId); + pDnode->numOfVnodes--; + pVgroup->replica--; + *pDelVgid = *pVgid; + *pVgid = pVgroup->vnodeGid[pVgroup->replica]; + memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid)); + code = 0; + goto _OVER; + } + } + } + +_OVER: + if (code != 0) { + terrno = TSDB_CODE_APP_ERROR; + mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr()); + return -1; + } + + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; + mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId); + } + return 0; +} + +int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, + bool standby) { + STransAction action = {0}; + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + int32_t contLen = 0; + void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, standby); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_DND_CREATE_VNODE; + action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + int32_t contLen = sizeof(SMsgHead); + SMsgHead *pHead = taosMemoryMalloc(contLen); + if (pHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + action.pCont = pHead; + action.contLen = contLen; + action.msgType = TDMT_VND_ALTER_CONFIRM; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pHead); + return -1; + } + + return 0; +} + +int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) { + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + int32_t contLen = 0; + void *pReq = mndBuildAlterVnodeReq(pMnode, pDb, pVgroup, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = msgType; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, + bool isRedo) { + STransAction action = {0}; + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + int32_t contLen = 0; + void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_DND_DROP_VNODE; + action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED; + + if (isRedo) { + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + } else { + if (mndTransAppendUndoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + } + + return 0; +} + +int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex, + SArray *pArray) { + SVgObj newVgroup = {0}; + memcpy(&newVgroup, pVgroup, sizeof(SVgObj)); + + mInfo("vgId:%d, vgroup info before move, replica:%d", newVgroup.vgId, newVgroup.replica); + for (int32_t i = 0; i < newVgroup.replica; ++i) { + mInfo("vgId:%d, vnode:%d dnode:%d", newVgroup.vgId, i, newVgroup.vnodeGid[i].dnodeId); + } + + mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId); + if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1; + if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1; + if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1; + + mInfo("vgId:%d, will remove 1 vnodes", pVgroup->vgId); + newVgroup.replica--; + SVnodeGid del = newVgroup.vnodeGid[vnIndex]; + newVgroup.vnodeGid[vnIndex] = newVgroup.vnodeGid[newVgroup.replica]; + memset(&newVgroup.vnodeGid[newVgroup.replica], 0, sizeof(SVnodeGid)); + if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; + if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del, true) != 0) return -1; + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1; + + mInfo("vgId:%d, vgroup info after move, replica:%d", newVgroup.vgId, newVgroup.replica); + for (int32_t i = 0; i < newVgroup.replica; ++i) { + mInfo("vgId:%d, vnode:%d dnode:%d", newVgroup.vgId, i, newVgroup.vnodeGid[i].dnodeId); + } + return 0; +} + +int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t dropDnodeId) { + SArray *pArray = mndBuildDnodesArray(pMnode, dropDnodeId); + if (pArray == NULL) return -1; + + void *pIter = NULL; + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + int32_t vnIndex = -1; + for (int32_t i = 0; i < pVgroup->replica; ++i) { + if (pVgroup->vnodeGid[i].dnodeId == dropDnodeId) { + vnIndex = i; + break; + } + } + + if (vnIndex != -1) { + mInfo("vgId:%d, vnode:%d will be removed from dnode:%d", pVgroup->vgId, vnIndex, dropDnodeId); + SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); + mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray); + mndReleaseDb(pMnode, pDb); + } + + sdbRelease(pMnode->pSdb, pVgroup); + } + + taosArrayDestroy(pArray); + return 0; +} \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 90643a54a9..7720a8e88a 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -42,8 +42,10 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { } void sdbFreeRaw(SSdbRaw *pRaw) { - mTrace("raw:%p, is freed", pRaw); - taosMemoryFree(pRaw); + if (pRaw != NULL) { + mTrace("raw:%p, is freed", pRaw); + taosMemoryFree(pRaw); + } } int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) { -- GitLab