From 55f2efc6d3e5ddc4e02913464d7250097a51fa22 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 20 Dec 2021 12:43:16 +0800 Subject: [PATCH] refact the code for db msg --- source/dnode/mnode/impl/src/mndDb.c | 226 ++++++++++++++++++---------- 1 file changed, 148 insertions(+), 78 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 651d77c0d4..5ffd98f490 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -160,7 +160,7 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) { mTrace("db:%s, perform update action", pOldDb->name); - pOldDb->updateTime = pNewDb->createdTime; + pOldDb->updateTime = pNewDb->updateTime; pOldDb->cfgVersion = pNewDb->cfgVersion; pOldDb->vgVersion = pNewDb->vgVersion; memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg)); @@ -168,8 +168,12 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) { } SDbObj *mndAcquireDb(SMnode *pMnode, char *db) { - SSdb *pSdb = pMnode->pSdb; - return sdbAcquire(pSdb, SDB_DB, db); + SSdb *pSdb = pMnode->pSdb; + SDbObj *pDb = sdbAcquire(pSdb, SDB_DB, db); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + } + return pDb; } void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) { @@ -242,13 +246,13 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; } -static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { +static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { SSdbRaw *pDbRaw = mndDbActionEncode(pDb); if (pDbRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1; if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; - for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); if (pVgRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; @@ -258,13 +262,13 @@ static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgOb return 0; } -static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { +static int32_t mndSetCreateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { SSdbRaw *pDbRaw = mndDbActionEncode(pDb); if (pDbRaw == NULL) return -1; if (mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1; if (sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED) != 0) return -1; - for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); if (pVgRaw == NULL) return -1; if (mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1; @@ -274,13 +278,13 @@ static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgOb return 0; } -static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { +static int32_t mndSetCreateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { SSdbRaw *pDbRaw = mndDbActionEncode(pDb); if (pDbRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1; if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1; - for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); if (pVgRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1; @@ -290,8 +294,8 @@ static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg return 0; } -static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { - for (int vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) { +static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { + for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) { SVgObj *pVgroup = pVgroups + vg; for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { @@ -319,8 +323,8 @@ static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV return 0; } -static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { - for (int vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) { +static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { + for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) { SVgObj *pVgroup = pVgroups + vg; for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { @@ -402,29 +406,30 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat mError("db:%s, failed to create since %s", pCreate->db, terrstr()); goto CREATE_DB_OVER; } + mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); - if (mndSetRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } - if (mndSetUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } - if (mndSetCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } - if (mndSetRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } - if (mndSetUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } @@ -462,7 +467,7 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) { SDbObj *pDb = mndAcquireDb(pMnode, pCreate->db); if (pDb != NULL) { - sdbRelease(pMnode->pSdb, pDb); + mndReleaseDb(pMnode, pDb); if (pCreate->ignoreExist) { mDebug("db:%s, already exist, ignore exist is set", pCreate->db); return 0; @@ -491,57 +496,77 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) { } static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) { - bool changed = false; + terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED; if (pAlter->totalBlocks >= 0 && pAlter->totalBlocks != pDb->cfg.totalBlocks) { pDb->cfg.totalBlocks = pAlter->totalBlocks; - changed = true; + terrno = 0; } if (pAlter->daysToKeep0 >= 0 && pAlter->daysToKeep0 != pDb->cfg.daysToKeep0) { pDb->cfg.daysToKeep0 = pAlter->daysToKeep0; - changed = true; + terrno = 0; } if (pAlter->daysToKeep1 >= 0 && pAlter->daysToKeep1 != pDb->cfg.daysToKeep1) { pDb->cfg.daysToKeep1 = pAlter->daysToKeep1; - changed = true; + terrno = 0; } if (pAlter->daysToKeep2 >= 0 && pAlter->daysToKeep2 != pDb->cfg.daysToKeep2) { pDb->cfg.daysToKeep2 = pAlter->daysToKeep2; - changed = true; + terrno = 0; } if (pAlter->fsyncPeriod >= 0 && pAlter->fsyncPeriod != pDb->cfg.fsyncPeriod) { pDb->cfg.fsyncPeriod = pAlter->fsyncPeriod; - changed = true; + terrno = 0; } if (pAlter->walLevel >= 0 && pAlter->walLevel != pDb->cfg.walLevel) { pDb->cfg.walLevel = pAlter->walLevel; - changed = true; + terrno = 0; } if (pAlter->quorum >= 0 && pAlter->quorum != pDb->cfg.quorum) { pDb->cfg.quorum = pAlter->quorum; - changed = true; + terrno = 0; } if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) { pDb->cfg.cacheLastRow = pAlter->cacheLastRow; - changed = true; + terrno = 0; } - if (!changed) { - terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED; - return -1; - } + return terrno; +} + +static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { + SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1; return 0; } +static int32_t mndSetUpdateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { + SSdbRaw *pUndoRaw = mndDbActionEncode(pOldDb); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + +static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + +static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + +static int32_t mndSetUpdateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) { + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("db:%s, failed to update since %s", pOldDb->name, terrstr()); @@ -550,30 +575,41 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO mDebug("trans:%d, used to update db:%s", pTrans->id, pOldDb->name); - SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); - SSdbRaw *pUndoRaw = mndDbActionEncode(pOldDb); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetUpdateDbUndoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; + } + + if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; + } + + if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; + } + + if (mndSetUpdateDbUndoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto UPDATE_DB_OVER; } + code = 0; + +UPDATE_DB_OVER: mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) { @@ -616,46 +652,82 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } +static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdbRaw *pRedoRaw = mndDbActionEncode(pDb); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdbRaw *pUndoRaw = mndDbActionEncode(pDb); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdbRaw *pCommitRaw = mndDbActionEncode(pDb); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } + +static int32_t mndSetDropDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } + static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("db:%s, failed to drop since %s", pDb->name, terrstr()); return -1; } + mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name); - SSdbRaw *pRedoRaw = mndDbActionEncode(pDb); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); - SSdbRaw *pUndoRaw = mndDbActionEncode(pDb); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropDbUndoLogs(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); - SSdbRaw *pCommitRaw = mndDbActionEncode(pDb); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; + } + + if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; + } + + if (mndSetDropDbUndoActions(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto DROP_DB_OVER; } + code = 0; + +DROP_DB_OVER: mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg) { @@ -757,29 +829,27 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SSyncDbMsg *pSync = pMsg->rpcMsg.pCont; - - SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); if (pDb == NULL) { mError("db:%s, failed to process sync db msg since %s", pMsg->db, terrstr()); return -1; - } else { - mndReleaseDb(pMnode, pDb); - return 0; } + + mndReleaseDb(pMnode, pDb); + return 0; } static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont; - - SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); if (pDb == NULL) { mError("db:%s, failed to process compact db msg since %s", pMsg->db, terrstr()); return -1; - } else { - mndReleaseDb(pMnode, pDb); - return 0; } + + mndReleaseDb(pMnode, pDb); + return 0; } static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { -- GitLab