提交 9dbefbe5 编写于 作者: S Shengliang Guan

TD-10431 alloc vgroup

上级 a2a03a4d
......@@ -207,8 +207,10 @@ typedef struct {
int64_t createdTime;
int64_t updateTime;
int64_t uid;
int32_t version;
int32_t cfgVersion;
int32_t vgVersion;
int32_t numOfVgroups;
int8_t hashMethod; // default is 1
SDbCfg cfg;
} SDbObj;
......@@ -222,6 +224,8 @@ typedef struct {
int64_t createdTime;
int64_t updateTime;
int32_t version;
int32_t hashBegin;
int32_t hashEnd;
char dbName[TSDB_FULL_DB_NAME_LEN];
int32_t numOfTables;
int32_t numOfTimeSeries;
......
......@@ -26,7 +26,7 @@ int32_t mndInitVgroup(SMnode *pMnode);
void mndCleanupVgroup(SMnode *pMnode);
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
#ifdef __cplusplus
}
......
......@@ -75,8 +75,10 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT64(pRaw, dataPos, pDb->createdTime)
SDB_SET_INT64(pRaw, dataPos, pDb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pDb->uid)
SDB_SET_INT32(pRaw, dataPos, pDb->version)
SDB_SET_INT32(pRaw, dataPos, pDb->cfgVersion)
SDB_SET_INT32(pRaw, dataPos, pDb->vgVersion)
SDB_SET_INT32(pRaw, dataPos, pDb->numOfVgroups)
SDB_SET_INT8(pRaw, dataPos, pDb->hashMethod)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile)
......@@ -120,8 +122,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfgVersion)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->vgVersion)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->numOfVgroups)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->hashMethod)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.cacheBlockSize)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.totalBlocks)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysPerFile)
......@@ -345,6 +349,9 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
dbObj.updateTime = dbObj.createdTime;
dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN);
dbObj.numOfVgroups = pCreate->numOfVgroups;
dbObj.hashMethod = 1;
dbObj.cfgVersion = 0;
dbObj.vgVersion = 0;
dbObj.cfg = (SDbCfg){.cacheBlockSize = pCreate->cacheBlockSize,
.totalBlocks = pCreate->totalBlocks,
.daysPerFile = pCreate->daysPerFile,
......@@ -376,50 +383,53 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
return -1;
}
if (mndAllocVgroup(pMnode, &dbObj) != 0) {
SVgObj *pVgroups = NULL;
if (mndAllocVgroup(pMnode, &dbObj, &pVgroups) != 0) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1;
}
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1;
goto CREATE_DB_OVER;
}
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
SSdbRaw *pRedoRaw = mndDbActionEncode(&dbObj);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
goto CREATE_DB_OVER;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
SSdbRaw *pUndoRaw = mndDbActionEncode(&dbObj);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
goto CREATE_DB_OVER;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
goto CREATE_DB_OVER;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
goto CREATE_DB_OVER;
}
code = 0;
CREATE_DB_OVER:
free(pVgroups);
mndTransDrop(pTrans);
return 0;
return code;
}
static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) {
......@@ -583,7 +593,8 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) {
return code;
}
dbObj.version++;
dbObj.cfgVersion++;
dbObj.updateTime = taosGetTimestampMs();
code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj);
mndReleaseDb(pMnode, pDb);
......
......@@ -79,6 +79,8 @@ static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime)
SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime)
SDB_SET_INT32(pRaw, dataPos, pVgroup->version)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd)
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN)
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica)
for (int8_t i = 0; i < pVgroup->replica; ++i) {
......@@ -111,6 +113,8 @@ static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->updateTime)
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashBegin)
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashEnd)
SDB_GET_BINARY(pRaw, pRow, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN)
SDB_GET_INT8(pRaw, pRow, dataPos, &pVgroup->replica)
for (int8_t i = 0; i < pVgroup->replica; ++i) {
......@@ -184,7 +188,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) {
return 0;
}
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb) {
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
if (pDb->numOfVgroups != -1 &&
(pDb->numOfVgroups < TSDB_MIN_VNODES_PER_DB || pDb->numOfVgroups > TSDB_MAX_VNODES_PER_DB)) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
......@@ -199,26 +203,44 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb) {
}
}
SVgObj *pVgroups = calloc(pDb->numOfVgroups, sizeof(SVgObj));
if (pVgroups == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t alloceVgroups = 0;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
int32_t hashMin = 0;
int32_t hashMax = INT32_MAX;
int32_t hashInterval = (hashMax - hashMin) / pDb->numOfVgroups;
for (int32_t v = 0; v < pDb->numOfVgroups; v++) {
SVgObj *pVgroup = &pVgroups[v];
pVgroup->vgId == maxVgId++;
pVgroup->createdTime = taosGetTimestampMs();
pVgroup->updateTime = pVgroups->createdTime;
pVgroup->version = 0;
pVgroup->hashBegin = hashMin + hashInterval * v;
if (v == pDb->numOfVgroups - 1) {
pVgroup->hashEnd = hashMax;
} else {
pVgroup->hashEnd = hashMin + hashInterval * (v + 1);
}
while (alloceVgroups < pDb->numOfVgroups) {
SVgObj vgObj = {0};
vgObj.vgId == maxVgId++;
vgObj.createdTime = taosGetTimestampMs();
vgObj.updateTime = vgObj.createdTime;
vgObj.version = 0;
memcpy(vgObj.dbName, pDb->name, TSDB_FULL_DB_NAME_LEN);
vgObj.replica = pDb->cfg.replications;
memcpy(pVgroup->dbName, pDb->name, TSDB_FULL_DB_NAME_LEN);
pVgroup->replica = pDb->cfg.replications;
if (mndGetAvailableDnode(pMnode, &vgObj) != 0) {
if (mndGetAvailableDnode(pMnode, pVgroup) != 0) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
free(pVgroups);
return -1;
}
alloceVgroups++;
}
*ppVgroups = pVgroups;
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册