提交 190a7985 编写于 作者: S Shengliang Guan

TD-10431 save vgroup

上级 9dbefbe5
...@@ -188,8 +188,8 @@ typedef struct { ...@@ -188,8 +188,8 @@ typedef struct {
int32_t daysToKeep0; int32_t daysToKeep0;
int32_t daysToKeep1; int32_t daysToKeep1;
int32_t daysToKeep2; int32_t daysToKeep2;
int32_t minRowsPerFileBlock; int32_t minRows;
int32_t maxRowsPerFileBlock; int32_t maxRows;
int32_t commitTime; int32_t commitTime;
int32_t fsyncPeriod; int32_t fsyncPeriod;
int8_t walLevel; int8_t walLevel;
......
...@@ -22,11 +22,13 @@ ...@@ -22,11 +22,13 @@
extern "C" { extern "C" {
#endif #endif
int32_t mndInitVgroup(SMnode *pMnode); int32_t mndInitVgroup(SMnode *pMnode);
void mndCleanupVgroup(SMnode *pMnode); void mndCleanupVgroup(SMnode *pMnode);
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -85,8 +85,8 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { ...@@ -85,8 +85,8 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep2) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep2)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.minRowsPerFileBlock) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.minRows)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.maxRowsPerFileBlock) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.maxRows)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.commitTime) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.commitTime)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.fsyncPeriod) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.fsyncPeriod)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.walLevel) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.walLevel)
...@@ -132,8 +132,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { ...@@ -132,8 +132,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep0) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep0)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep1) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep1)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep2) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep2)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.minRowsPerFileBlock) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.minRows)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.maxRowsPerFileBlock) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.maxRows)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.commitTime) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.commitTime)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.fsyncPeriod) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.fsyncPeriod)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.walLevel) SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.walLevel)
...@@ -161,6 +161,9 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { ...@@ -161,6 +161,9 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) {
mTrace("db:%s, perform update action", pOldDb->name); mTrace("db:%s, perform update action", pOldDb->name);
pOldDb->updateTime = pNewDb->createdTime; pOldDb->updateTime = pNewDb->createdTime;
pOldDb->cfgVersion = pNewDb->cfgVersion;
pOldDb->vgVersion = pNewDb->vgVersion;
pOldDb->numOfVgroups = pNewDb->numOfVgroups;
memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg)); memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg));
return 0; return 0;
} }
...@@ -191,133 +194,30 @@ static int32_t mndCheckDbName(char *dbName, SUserObj *pUser) { ...@@ -191,133 +194,30 @@ static int32_t mndCheckDbName(char *dbName, SUserObj *pUser) {
return 0; return 0;
} }
static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg, char *errMsg, int32_t len) { static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->cacheBlockSize < TSDB_MIN_CACHE_BLOCK_SIZE || pCfg->cacheBlockSize > TSDB_MAX_CACHE_BLOCK_SIZE) { if (pCfg->cacheBlockSize < TSDB_MIN_CACHE_BLOCK_SIZE || pCfg->cacheBlockSize > TSDB_MAX_CACHE_BLOCK_SIZE) return -1;
terrno = TSDB_CODE_MND_INVALID_DB_OPTION; if (pCfg->totalBlocks < TSDB_MIN_TOTAL_BLOCKS || pCfg->totalBlocks > TSDB_MAX_TOTAL_BLOCKS) return -1;
tstrncpy(errMsg, "Invalid database cache block size option", len); if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1;
return -1; if (pCfg->daysToKeep0 < pCfg->daysPerFile) return -1;
} if (pCfg->daysToKeep0 < TSDB_MIN_KEEP || pCfg->daysToKeep0 > TSDB_MAX_KEEP) return -1;
if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > TSDB_MAX_KEEP) return -1;
if (pCfg->totalBlocks < TSDB_MIN_TOTAL_BLOCKS || pCfg->totalBlocks > TSDB_MAX_TOTAL_BLOCKS) { if (pCfg->daysToKeep2 < TSDB_MIN_KEEP || pCfg->daysToKeep2 > TSDB_MAX_KEEP) return -1;
terrno = TSDB_CODE_MND_INVALID_DB_OPTION; if (pCfg->daysToKeep0 > pCfg->daysToKeep1) return -1;
tstrncpy(errMsg, "Invalid database total blocks option", len); if (pCfg->daysToKeep1 > pCfg->daysToKeep2) return -1;
return -1; if (pCfg->minRows < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRows > TSDB_MAX_MIN_ROW_FBLOCK) return -1;
} if (pCfg->maxRows < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRows > TSDB_MAX_MAX_ROW_FBLOCK) return -1;
if (pCfg->minRows > pCfg->maxRows) return -1;
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) { if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) return -1;
terrno = TSDB_CODE_MND_INVALID_DB_OPTION; if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) return -1;
tstrncpy(errMsg, "Invalid database days option", len); if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) return -1;
return -1; if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1;
} if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1;
if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) return -1;
if (pCfg->daysToKeep0 < pCfg->daysPerFile) { if (pCfg->replications > mndGetDnodeSize(pMnode)) return -1;
terrno = TSDB_CODE_MND_INVALID_DB_OPTION; if (pCfg->quorum < TSDB_MIN_DB_QUORUM_OPTION || pCfg->quorum > TSDB_MAX_DB_QUORUM_OPTION) return -1;
tstrncpy(errMsg, "Invalid database days option", len); if (pCfg->quorum > pCfg->replications) return -1;
return -1; if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) return -1;
} if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
if (pCfg->daysToKeep0 < TSDB_MIN_KEEP || pCfg->daysToKeep0 > TSDB_MAX_KEEP || pCfg->daysToKeep0 > pCfg->daysToKeep1) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database keep0 option", len);
return -1;
}
if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > TSDB_MAX_KEEP || pCfg->daysToKeep1 > pCfg->daysToKeep2) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database keep1 option", len);
return -1;
}
if (pCfg->daysToKeep2 < TSDB_MIN_KEEP || pCfg->daysToKeep2 > TSDB_MAX_KEEP) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database keep2 option", len);
return -1;
}
if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database minrows option", len);
return -1;
}
if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database maxrows option", len);
return -1;
}
if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database minrows option", len);
return -1;
}
if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database commit option", len);
return -1;
}
if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database fsync option", len);
return -1;
}
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database wal level option", len);
return -1;
}
if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid precision option", len);
return -1;
}
if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database compression option", len);
return -1;
}
if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database replication option", len);
return -1;
}
if (pCfg->replications > mndGetDnodeSize(pMnode)) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database replication option", len);
return -1;
}
if (pCfg->quorum < TSDB_MIN_DB_QUORUM_OPTION || pCfg->quorum > TSDB_MAX_DB_QUORUM_OPTION) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database quorum option", len);
return -1;
}
if (pCfg->quorum > pCfg->replications) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database quorum option", len);
return -1;
}
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database update option", len);
return -1;
}
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database cachelast option", len);
return -1;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -328,8 +228,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -328,8 +228,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->daysToKeep0 < 0) pCfg->daysToKeep0 = TSDB_DEFAULT_KEEP; if (pCfg->daysToKeep0 < 0) pCfg->daysToKeep0 = TSDB_DEFAULT_KEEP;
if (pCfg->daysToKeep1 < 0) pCfg->daysToKeep1 = TSDB_DEFAULT_KEEP; if (pCfg->daysToKeep1 < 0) pCfg->daysToKeep1 = TSDB_DEFAULT_KEEP;
if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = TSDB_DEFAULT_KEEP; if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = TSDB_DEFAULT_KEEP;
if (pCfg->minRowsPerFileBlock < 0) pCfg->minRowsPerFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK; if (pCfg->minRows < 0) pCfg->minRows = TSDB_DEFAULT_MIN_ROW_FBLOCK;
if (pCfg->maxRowsPerFileBlock < 0) pCfg->maxRowsPerFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK; if (pCfg->maxRows < 0) pCfg->maxRows = TSDB_DEFAULT_MAX_ROW_FBLOCK;
if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME; if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME;
if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL; if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL;
...@@ -341,6 +241,48 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -341,6 +241,48 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
} }
static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL || mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING);
for (int v = 0; v < pDb->numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING);
}
return 0;
}
static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL || mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED);
for (int v = 0; v < pDb->numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED);
}
return 0;
}
static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL || mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_READY);
for (int v = 0; v < pDb->numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
}
return 0;
}
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) { static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) {
SDbObj dbObj = {0}; SDbObj dbObj = {0};
tstrncpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN); tstrncpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN);
...@@ -358,8 +300,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat ...@@ -358,8 +300,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
.daysToKeep0 = pCreate->daysToKeep0, .daysToKeep0 = pCreate->daysToKeep0,
.daysToKeep1 = pCreate->daysToKeep1, .daysToKeep1 = pCreate->daysToKeep1,
.daysToKeep2 = pCreate->daysToKeep2, .daysToKeep2 = pCreate->daysToKeep2,
.minRowsPerFileBlock = pCreate->minRowsPerFileBlock, .minRows = pCreate->minRowsPerFileBlock,
.maxRowsPerFileBlock = pCreate->maxRowsPerFileBlock, .maxRows = pCreate->maxRowsPerFileBlock,
.fsyncPeriod = pCreate->fsyncPeriod, .fsyncPeriod = pCreate->fsyncPeriod,
.commitTime = pCreate->commitTime, .commitTime = pCreate->commitTime,
.precision = pCreate->precision, .precision = pCreate->precision,
...@@ -377,8 +319,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat ...@@ -377,8 +319,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
return -1; return -1;
} }
char errMsg[TSDB_ERROR_MSG_LEN] = {0}; if (mndCheckDbCfg(pMnode, &dbObj.cfg) != 0) {
if (mndCheckDbCfg(pMnode, &dbObj.cfg, errMsg, TSDB_ERROR_MSG_LEN) != 0) { terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
mError("db:%s, failed to create since %s", pCreate->db, terrstr()); mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1; return -1;
} }
...@@ -390,7 +332,6 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat ...@@ -390,7 +332,6 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
} }
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr()); mError("db:%s, failed to create since %s", pCreate->db, terrstr());
...@@ -398,26 +339,20 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat ...@@ -398,26 +339,20 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
} }
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
SSdbRaw *pRedoRaw = mndDbActionEncode(&dbObj); if (mndSetRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER; goto CREATE_DB_OVER;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
SSdbRaw *pUndoRaw = mndDbActionEncode(&dbObj); if (mndSetUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER; goto CREATE_DB_OVER;
} }
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj); if (mndSetCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER; goto CREATE_DB_OVER;
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
...@@ -907,11 +842,11 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 ...@@ -907,11 +842,11 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.minRowsPerFileBlock; *(int32_t *)pWrite = pDb->cfg.minRows;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxRowsPerFileBlock; *(int32_t *)pWrite = pDb->cfg.maxRows;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
...@@ -24,8 +24,6 @@ ...@@ -24,8 +24,6 @@
#define TSDB_VGROUP_VER_NUM 1 #define TSDB_VGROUP_VER_NUM 1
#define TSDB_VGROUP_RESERVE_SIZE 64 #define TSDB_VGROUP_RESERVE_SIZE 64
static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup);
...@@ -70,8 +68,8 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -70,8 +68,8 @@ int32_t mndInitVgroup(SMnode *pMnode) {
void mndCleanupVgroup(SMnode *pMnode) {} void mndCleanupVgroup(SMnode *pMnode) {}
static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_VGROUP_VER_NUM, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE); SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUM, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
...@@ -94,7 +92,7 @@ static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { ...@@ -94,7 +92,7 @@ static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
return pRaw; return pRaw;
} }
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
...@@ -217,7 +215,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { ...@@ -217,7 +215,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
for (int32_t v = 0; v < pDb->numOfVgroups; v++) { for (int32_t v = 0; v < pDb->numOfVgroups; v++) {
SVgObj *pVgroup = &pVgroups[v]; SVgObj *pVgroup = &pVgroups[v];
pVgroup->vgId == maxVgId++; pVgroup->vgId = maxVgId++;
pVgroup->createdTime = taosGetTimestampMs(); pVgroup->createdTime = taosGetTimestampMs();
pVgroup->updateTime = pVgroups->createdTime; pVgroup->updateTime = pVgroups->createdTime;
pVgroup->version = 0; pVgroup->version = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册