提交 ae23dd23 编写于 作者: S Shengliang Guan

fix: alter db replications

上级 666b0b67
......@@ -27,10 +27,13 @@ void mndCleanupVgroup(SMnode *pMnode);
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SVnodeGid *new1, SVnodeGid *new2, SVnodeGid *exist);
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SVnodeGid *del1, SVnodeGid *del2, SVnodeGid *exist);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
......
......@@ -261,6 +261,104 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease(pSdb, pDb);
}
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
bool isRedo) {
STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_VNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
if (isRedo) {
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
} else {
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
}
return 0;
}
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
bool isRedo) {
STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_ALTER_VNODE;
if (isRedo) {
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
} else {
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
}
return 0;
}
static int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
bool isRedo) {
STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
if (isRedo) {
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
} else {
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
}
return 0;
}
static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) {
char *pos = strstr(dbName, TS_PATH_DELIMITER);
if (pos == NULL) {
......@@ -278,6 +376,8 @@ static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) {
}
static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
if (pCfg->numOfVgroups < TSDB_MIN_VNODES_PER_DB || pCfg->numOfVgroups > TSDB_MAX_VNODES_PER_DB) return -1;
if (pCfg->numOfStables < TSDB_DB_STREAM_MODE_OFF || pCfg->numOfStables > TSDB_DB_STREAM_MODE_ON) return -1;
if (pCfg->buffer < TSDB_MIN_BUFFER_PER_VNODE || pCfg->buffer > TSDB_MAX_BUFFER_PER_VNODE) return -1;
......@@ -300,9 +400,10 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->replications < TSDB_MIN_DB_REPLICA || pCfg->replications > TSDB_MAX_DB_REPLICA) return -1;
if (pCfg->replications > mndGetDnodeSize(pMnode)) return -1;
if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1;
if (pCfg->strict > pCfg->replications) return -1;
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
if (pCfg->hashMethod != 1) return -1;
terrno = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -381,24 +482,8 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
SVgObj *pVgroup = pVgroups + vg;
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
STransAction action = {0};
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_VNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, true) != 0) {
return -1;
}
}
......@@ -412,24 +497,8 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
SVgObj *pVgroup = pVgroups + vg;
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
STransAction action = {0};
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false) != 0) {
return -1;
}
}
......@@ -482,7 +551,6 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
}
if (mndCheckDbCfg(pMnode, &dbObj.cfg) != 0) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1;
}
......@@ -570,37 +638,37 @@ _OVER:
static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
if (pAlter->buffer >= 0 && pAlter->buffer != pDb->cfg.buffer) {
if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) {
pDb->cfg.buffer = pAlter->buffer;
terrno = 0;
}
if (pAlter->pages >= 0 && pAlter->pages != pDb->cfg.pages) {
if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) {
pDb->cfg.pages = pAlter->pages;
terrno = 0;
}
if (pAlter->pageSize >= 0 && pAlter->pageSize != pDb->cfg.pageSize) {
if (pAlter->pageSize > 0 && pAlter->pageSize != pDb->cfg.pageSize) {
pDb->cfg.pageSize = pAlter->pageSize;
terrno = 0;
}
if (pAlter->daysPerFile >= 0 && pAlter->daysPerFile != pDb->cfg.daysPerFile) {
if (pAlter->daysPerFile > 0 && pAlter->daysPerFile != pDb->cfg.daysPerFile) {
pDb->cfg.daysPerFile = pAlter->daysPerFile;
terrno = 0;
}
if (pAlter->daysToKeep0 >= 0 && pAlter->daysToKeep0 != pDb->cfg.daysToKeep0) {
if (pAlter->daysToKeep0 > 0 && pAlter->daysToKeep0 != pDb->cfg.daysToKeep0) {
pDb->cfg.daysToKeep0 = pAlter->daysToKeep0;
terrno = 0;
}
if (pAlter->daysToKeep1 >= 0 && pAlter->daysToKeep1 != pDb->cfg.daysToKeep1) {
if (pAlter->daysToKeep1 > 0 && pAlter->daysToKeep1 != pDb->cfg.daysToKeep1) {
pDb->cfg.daysToKeep1 = pAlter->daysToKeep1;
terrno = 0;
}
if (pAlter->daysToKeep2 >= 0 && pAlter->daysToKeep2 != pDb->cfg.daysToKeep2) {
if (pAlter->daysToKeep2 > 0 && pAlter->daysToKeep2 != pDb->cfg.daysToKeep2) {
pDb->cfg.daysToKeep2 = pAlter->daysToKeep2;
terrno = 0;
}
......@@ -625,7 +693,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
terrno = 0;
}
if (pAlter->replications >= 0 && pAlter->replications != pDb->cfg.replications) {
if (pAlter->replications > 0 && pAlter->replications != pDb->cfg.replications) {
pDb->cfg.replications = pAlter->replications;
pDb->vgVersion++;
terrno = 0;
......@@ -653,30 +721,44 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
}
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
if (pVgroup->replica == pDb->cfg.replications) {
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
STransAction action = {0};
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_ALTER_VNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, true) != 0) {
return -1;
}
}
} else if (pVgroup->replica < pDb->cfg.replications) {
} else {
SVgObj newVgroup = {0};
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
if (newVgroup.replica < pDb->cfg.replications) {
SVnodeGid new1 = {0};
SVnodeGid new2 = {0};
SVnodeGid exist = {0};
if (mndAddVnodeToVgroup(pMnode, &newVgroup, &new1, &new2, &exist) != 0) {
mError("db:%s, failed to add vnode to vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr());
return -1;
}
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &new1, true) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &new2, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &exist, true) != 0) return -1;
} else {
SVnodeGid del1 = {0};
SVnodeGid del2 = {0};
SVnodeGid exist = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, &del1, &del2, &exist) != 0) {
mError("db:%s, failed to remove vnode from vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr());
return -1;
}
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &exist, true) != 0) return -1;
}
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1;
}
return 0;
......@@ -764,6 +846,9 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
code = mndSetDbCfgFromAlterDbReq(&dbObj, &alterReq);
if (code != 0) goto _OVER;
code = mndCheckDbCfg(pMnode, &dbObj.cfg);
if (code != 0) goto _OVER;
dbObj.cfgVersion++;
dbObj.updateTime = taosGetTimestampMs();
code = mndAlterDb(pMnode, pReq, pDb, &dbObj);
......@@ -905,24 +990,8 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
STransAction action = {0};
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, true) != 0) {
return -1;
}
}
......
......@@ -491,6 +491,14 @@ _OVER:
return code;
}
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SVnodeGid *new1, SVnodeGid *new2, SVnodeGid *exist) {
return 0;
}
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SVnodeGid *del1, SVnodeGid *del2, SVnodeGid *exist) {
return 0;
}
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
SEpSet epset = {0};
......
......@@ -73,14 +73,19 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
{
SAlterDbReq alterdbReq = {0};
strcpy(alterdbReq.db, "1.d1");
alterdbReq.buffer = 12;
alterdbReq.daysToKeep0 = 300;
alterdbReq.daysToKeep1 = 400;
alterdbReq.daysToKeep2 = 500;
alterdbReq.pageSize = -1;
alterdbReq.pages = -1;
alterdbReq.daysPerFile = -1;
alterdbReq.daysToKeep0 = -1;
alterdbReq.daysToKeep1 = -1;
alterdbReq.daysToKeep2 = -1;
alterdbReq.fsyncPeriod = 4000;
alterdbReq.walLevel = 2;
alterdbReq.strict = 2;
alterdbReq.strict = 1;
alterdbReq.cacheLastRow = 1;
alterdbReq.replications = 1;
int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);
void* pReq = rpcMallocCont(contLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册