提交 2383b316 编写于 作者: S Shengliang Guan

refactor: adjust db codes

上级 ea2247e2
...@@ -257,6 +257,7 @@ typedef struct { ...@@ -257,6 +257,7 @@ typedef struct {
int32_t acctId; int32_t acctId;
SHashObj* readDbs; SHashObj* readDbs;
SHashObj* writeDbs; SHashObj* writeDbs;
SRWLatch lock;
} SUserObj; } SUserObj;
typedef struct { typedef struct {
......
...@@ -33,6 +33,7 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); ...@@ -33,6 +33,7 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -47,13 +47,15 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq); ...@@ -47,13 +47,15 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
static int32_t mndProcessGetIndexReq(SNodeMsg *pReq); static int32_t mndProcessGetIndexReq(SNodeMsg *pReq);
int32_t mndInitDb(SMnode *pMnode) { int32_t mndInitDb(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_DB, SSdbTable table = {
.keyType = SDB_KEY_BINARY, .sdbType = SDB_DB,
.encodeFp = (SdbEncodeFp)mndDbActionEncode, .keyType = SDB_KEY_BINARY,
.decodeFp = (SdbDecodeFp)mndDbActionDecode, .encodeFp = (SdbEncodeFp)mndDbActionEncode,
.insertFp = (SdbInsertFp)mndDbActionInsert, .decodeFp = (SdbDecodeFp)mndDbActionDecode,
.updateFp = (SdbUpdateFp)mndDbActionUpdate, .insertFp = (SdbInsertFp)mndDbActionInsert,
.deleteFp = (SdbDeleteFp)mndDbActionDelete}; .updateFp = (SdbUpdateFp)mndDbActionUpdate,
.deleteFp = (SdbDeleteFp)mndDbActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DB, mndProcessAlterDbReq); mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DB, mndProcessAlterDbReq);
...@@ -194,6 +196,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { ...@@ -194,6 +196,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
} }
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pDb->lock);
terrno = 0; terrno = 0;
...@@ -222,17 +225,29 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { ...@@ -222,17 +225,29 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) { static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
mTrace("db:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); mTrace("db:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
taosWLockLatch(&pOld->lock); taosWLockLatch(&pOld->lock);
SArray *pOldRetensions = pOld->cfg.pRetensions;
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
pOld->cfgVersion = pNew->cfgVersion; pOld->cfgVersion = pNew->cfgVersion;
pOld->vgVersion = pNew->vgVersion; pOld->vgVersion = pNew->vgVersion;
memcpy(&pOld->cfg, &pNew->cfg, sizeof(SDbCfg)); pOld->cfg.buffer = pNew->cfg.buffer;
pNew->cfg.pRetensions = pOldRetensions; pOld->cfg.pages = pNew->cfg.pages;
pOld->cfg.pageSize = pNew->cfg.pageSize;
pOld->cfg.daysPerFile = pNew->cfg.daysPerFile;
pOld->cfg.daysToKeep0 = pNew->cfg.daysToKeep0;
pOld->cfg.daysToKeep1 = pNew->cfg.daysToKeep1;
pOld->cfg.daysToKeep2 = pNew->cfg.daysToKeep2;
pOld->cfg.fsyncPeriod = pNew->cfg.fsyncPeriod;
pOld->cfg.walLevel = pNew->cfg.walLevel;
pOld->cfg.strict = pNew->cfg.strict;
pOld->cfg.cacheLastRow = pNew->cfg.cacheLastRow;
pOld->cfg.replications = pNew->cfg.replications;
taosWUnLockLatch(&pOld->lock); taosWUnLockLatch(&pOld->lock);
return 0; return 0;
} }
static int32_t mndGetGlobalVgroupVersion(SMnode *pMnode) { return sdbGetTableVer(pMnode->pSdb, SDB_VGROUP); } static inline int32_t mndGetGlobalVgroupVersion(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
return sdbGetTableVer(pSdb, SDB_VGROUP);
}
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db) { SDbObj *mndAcquireDb(SMnode *pMnode, const char *db) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -638,69 +653,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p ...@@ -638,69 +653,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0; return 0;
} }
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
SAlterVnodeReq alterReq = {0};
alterReq.vgVersion = pVgroup->version;
alterReq.buffer = pDb->cfg.buffer;
alterReq.pages = pDb->cfg.pages;
alterReq.pageSize = pDb->cfg.pageSize;
alterReq.daysPerFile = pDb->cfg.daysPerFile;
alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
alterReq.walLevel = pDb->cfg.walLevel;
alterReq.strict = pDb->cfg.strict;
alterReq.cacheLastRow = pDb->cfg.cacheLastRow;
alterReq.replica = pVgroup->replica;
alterReq.selfIndex = -1;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &alterReq.replicas[v];
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) {
return NULL;
}
pReplica->id = pVgidDnode->id;
pReplica->port = pVgidDnode->port;
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
mndReleaseDnode(pMnode, pVgidDnode);
if (pDnode->id == pVgid->dnodeId) {
alterReq.selfIndex = v;
}
}
if (alterReq.selfIndex == -1) {
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
contLen += +sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq);
*pContLen = contLen;
return pReq;
}
static int32_t mndBuilAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
STransAction action = {0}; STransAction action = {0};
SVnodeGid *pVgid = pVgroup->vnodeGid + vn; SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
...@@ -736,7 +689,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -736,7 +689,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid == pNew->uid) { if (pVgroup->dbUid == pNew->uid) {
if (mndBuilAlterVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) { if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
return -1; return -1;
...@@ -752,19 +705,19 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -752,19 +705,19 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndAlterDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) { static int32_t mndAlterDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg);
if (pTrans == NULL) goto UPDATE_DB_OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name); mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
mndTransSetDbInfo(pTrans, pOld); mndTransSetDbInfo(pTrans, pOld);
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_DB_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
UPDATE_DB_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
...@@ -778,7 +731,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) { ...@@ -778,7 +731,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
if (tDeserializeSAlterDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) { if (tDeserializeSAlterDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto ALTER_DB_OVER; goto _OVER;
} }
mDebug("db:%s, start to alter", alterReq.db); mDebug("db:%s, start to alter", alterReq.db);
...@@ -786,24 +739,26 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) { ...@@ -786,24 +739,26 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
pDb = mndAcquireDb(pMnode, alterReq.db); pDb = mndAcquireDb(pMnode, alterReq.db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto ALTER_DB_OVER; goto _OVER;
} }
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
goto ALTER_DB_OVER; goto _OVER;
} }
if (mndCheckAlterDropCompactDbAuth(pUser, pDb) != 0) { if (mndCheckAlterDropCompactDbAuth(pUser, pDb) != 0) {
goto ALTER_DB_OVER; goto _OVER;
} }
SDbObj dbObj = {0}; SDbObj dbObj = {0};
memcpy(&dbObj, pDb, sizeof(SDbObj)); memcpy(&dbObj, pDb, sizeof(SDbObj));
dbObj.cfg.numOfRetensions = 0;
dbObj.cfg.pRetensions = NULL;
code = mndSetDbCfgFromAlterDbReq(&dbObj, &alterReq); code = mndSetDbCfgFromAlterDbReq(&dbObj, &alterReq);
if (code != 0) { if (code != 0) {
goto ALTER_DB_OVER; goto _OVER;
} }
dbObj.cfgVersion++; dbObj.cfgVersion++;
...@@ -811,7 +766,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) { ...@@ -811,7 +766,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
code = mndAlterDb(pMnode, pReq, pDb, &dbObj); code = mndAlterDb(pMnode, pReq, pDb, &dbObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
ALTER_DB_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to alter since %s", alterReq.db, terrstr()); mError("db:%s, failed to alter since %s", alterReq.db, terrstr());
} }
...@@ -831,13 +786,13 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) { ...@@ -831,13 +786,13 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
if (tDeserializeSDbCfgReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) { if (tDeserializeSDbCfgReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto GET_DB_CFG_OVER; goto _OVER;
} }
pDb = mndAcquireDb(pMnode, cfgReq.db); pDb = mndAcquireDb(pMnode, cfgReq.db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto GET_DB_CFG_OVER; goto _OVER;
} }
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups; cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
...@@ -866,7 +821,7 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) { ...@@ -866,7 +821,7 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
if (pRsp == NULL) { if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1; code = -1;
goto GET_DB_CFG_OVER; goto _OVER;
} }
tSerializeSDbCfgRsp(pRsp, contLen, &cfgRsp); tSerializeSDbCfgRsp(pRsp, contLen, &cfgRsp);
...@@ -876,9 +831,9 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) { ...@@ -876,9 +831,9 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
code = 0; code = 0;
GET_DB_CFG_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0) {
mError("db:%s, failed to get cfg since %s", cfgReq.db, terrstr()); mError("db:%s, failed to get cfg since %s", cfgReq.db, terrstr());
} }
...@@ -1097,7 +1052,8 @@ _OVER: ...@@ -1097,7 +1052,8 @@ _OVER:
return code; return code;
} }
void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) { static int32_t mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode) {
int32_t numOfTables = 0;
int32_t vindex = 0; int32_t vindex = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -1108,8 +1064,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) { ...@@ -1108,8 +1064,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) { if (pVgroup->dbUid == pDb->uid) {
*num += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT; numOfTables += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT;
vindex++; vindex++;
} }
...@@ -1117,6 +1072,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) { ...@@ -1117,6 +1072,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
} }
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return numOfTables;
} }
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
...@@ -1170,8 +1126,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs ...@@ -1170,8 +1126,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs
return -1; return -1;
} }
int32_t numOfTable = 0; int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
mndGetDBTableNum(pDb, pMnode, &numOfTable);
if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable) { if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable) {
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos); mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos);
...@@ -1195,7 +1150,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { ...@@ -1195,7 +1150,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
if (tDeserializeSUseDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &usedbReq) != 0) { if (tDeserializeSUseDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &usedbReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto USE_DB_OVER; goto _OVER;
} }
char *p = strchr(usedbReq.db, '.'); char *p = strchr(usedbReq.db, '.');
...@@ -1206,12 +1161,11 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { ...@@ -1206,12 +1161,11 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo)); usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
if (usedbRsp.pVgroupInfos == NULL) { if (usedbRsp.pVgroupInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto USE_DB_OVER; goto _OVER;
} }
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
usedbRsp.vgVersion = vgVersion++; usedbRsp.vgVersion = vgVersion++;
} else { } else {
usedbRsp.vgVersion = usedbReq.vgVersion; usedbRsp.vgVersion = usedbReq.vgVersion;
} }
...@@ -1232,15 +1186,15 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { ...@@ -1232,15 +1186,15 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
} else { } else {
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
goto USE_DB_OVER; goto _OVER;
} }
if (mndCheckUseDbAuth(pUser, pDb) != 0) { if (mndCheckUseDbAuth(pUser, pDb) != 0) {
goto USE_DB_OVER; goto _OVER;
} }
if (mndExtractDbInfo(pMnode, pDb, &usedbRsp, &usedbReq) < 0) { if (mndExtractDbInfo(pMnode, pDb, &usedbRsp, &usedbReq) < 0) {
goto USE_DB_OVER; goto _OVER;
} }
code = 0; code = 0;
...@@ -1252,7 +1206,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { ...@@ -1252,7 +1206,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
if (pRsp == NULL) { if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1; code = -1;
goto USE_DB_OVER; goto _OVER;
} }
tSerializeSUseDbRsp(pRsp, contLen, &usedbRsp); tSerializeSUseDbRsp(pRsp, contLen, &usedbRsp);
...@@ -1260,7 +1214,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { ...@@ -1260,7 +1214,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
pReq->pRsp = pRsp; pReq->pRsp = pRsp;
pReq->rspLen = contLen; pReq->rspLen = contLen;
USE_DB_OVER: _OVER:
if (code != 0) { if (code != 0) {
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr()); mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
} }
...@@ -1298,8 +1252,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, ...@@ -1298,8 +1252,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
continue; continue;
} }
int32_t numOfTable = 0; int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
mndGetDBTableNum(pDb, pMnode, &numOfTable);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) { if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) {
mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName); mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName);
...@@ -1514,9 +1467,6 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in ...@@ -1514,9 +1467,6 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols); pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
colDataAppend(pColInfo, rows, (const char *)b, false); colDataAppend(pColInfo, rows, (const char *)b, false);
} }
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
// *(int8_t *)pWrite = pDb->cfg.update;
} }
static void setInformationSchemaDbCfg(SDbObj *pDbObj) { static void setInformationSchemaDbCfg(SDbObj *pDbObj) {
...@@ -1544,7 +1494,6 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) { ...@@ -1544,7 +1494,6 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) {
static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj; SVgObj *pVgroup = pObj;
int32_t *numOfTables = p1; int32_t *numOfTables = p1;
*numOfTables += pVgroup->numOfTables; *numOfTables += pVgroup->numOfTables;
return true; return true;
} }
......
...@@ -186,6 +186,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { ...@@ -186,6 +186,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
} }
SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pUser->lock);
terrno = 0; terrno = 0;
...@@ -228,11 +229,12 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { ...@@ -228,11 +229,12 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew); mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew);
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); taosWLockLatch(&pOld->lock);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
TSWAP(pOld->readDbs, pNew->readDbs); TSWAP(pOld->readDbs, pNew->readDbs);
TSWAP(pOld->writeDbs, pNew->writeDbs); TSWAP(pOld->writeDbs, pNew->writeDbs);
taosWUnLockLatch(&pOld->lock);
return 0; return 0;
} }
...@@ -426,8 +428,12 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { ...@@ -426,8 +428,12 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
} }
memcpy(&newUser, pUser, sizeof(SUserObj)); memcpy(&newUser, pUser, sizeof(SUserObj));
taosRLockLatch(&pUser->lock);
newUser.readDbs = mndDupDbHash(pUser->readDbs); newUser.readDbs = mndDupDbHash(pUser->readDbs);
newUser.writeDbs = mndDupDbHash(pUser->writeDbs); newUser.writeDbs = mndDupDbHash(pUser->writeDbs);
taosRUnLockLatch(&pUser->lock);
if (newUser.readDbs == NULL || newUser.writeDbs == NULL) { if (newUser.readDbs == NULL || newUser.writeDbs == NULL) {
goto _OVER; goto _OVER;
} }
...@@ -598,8 +604,11 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { ...@@ -598,8 +604,11 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) {
memcpy(authRsp.user, pUser->user, TSDB_USER_LEN); memcpy(authRsp.user, pUser->user, TSDB_USER_LEN);
authRsp.superAuth = pUser->superUser; authRsp.superAuth = pUser->superUser;
taosRLockLatch(&pUser->lock);
authRsp.readDbs = mndDupDbHash(pUser->readDbs); authRsp.readDbs = mndDupDbHash(pUser->readDbs);
authRsp.writeDbs = mndDupDbHash(pUser->writeDbs); authRsp.writeDbs = mndDupDbHash(pUser->writeDbs);
taosRUnLockLatch(&pUser->lock);
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
......
...@@ -34,9 +34,9 @@ static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp); ...@@ -34,9 +34,9 @@ static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp); static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp); static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows); static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows); static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
int32_t mndInitVgroup(SMnode *pMnode) { int32_t mndInitVgroup(SMnode *pMnode) {
...@@ -46,7 +46,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -46,7 +46,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
.decodeFp = (SdbDecodeFp)mndVgroupActionDecode, .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
.insertFp = (SdbInsertFp)mndVgroupActionInsert, .insertFp = (SdbInsertFp)mndVgroupActionInsert,
.updateFp = (SdbUpdateFp)mndVgroupActionUpdate, .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete}; .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,};
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp);
...@@ -67,28 +67,28 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { ...@@ -67,28 +67,28 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE); SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
if (pRaw == NULL) goto VG_ENCODE_OVER; if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, VG_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, VG_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, VG_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->version, VG_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, VG_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, VG_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, VG_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, VG_ENCODE_OVER) SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
for (int8_t i = 0; i < pVgroup->replica; ++i) { for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, VG_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
} }
SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, VG_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0; terrno = 0;
VG_ENCODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr()); mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
...@@ -103,41 +103,41 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { ...@@ -103,41 +103,41 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto VG_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_VGROUP_VER_NUMBER) { if (sver != TSDB_VGROUP_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto VG_DECODE_OVER; goto _OVER;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj)); SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj));
if (pRow == NULL) goto VG_DECODE_OVER; if (pRow == NULL) goto _OVER;
SVgObj *pVgroup = sdbGetRowObj(pRow); SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto VG_DECODE_OVER; if (pVgroup == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, VG_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, VG_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, VG_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, VG_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, VG_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, VG_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, VG_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, VG_DECODE_OVER) SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
for (int8_t i = 0; i < pVgroup->replica; ++i) { for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, VG_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
if (pVgroup->replica == 1) { if (pVgroup->replica == 1) {
pVgid->role = TAOS_SYNC_STATE_LEADER; pVgid->role = TAOS_SYNC_STATE_LEADER;
} }
} }
SDB_GET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
VG_DECODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr()); mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
...@@ -254,6 +254,68 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg ...@@ -254,6 +254,68 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
return pReq; return pReq;
} }
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SAlterVnodeReq alterReq = {0};
alterReq.vgVersion = pVgroup->version;
alterReq.buffer = pDb->cfg.buffer;
alterReq.pages = pDb->cfg.pages;
alterReq.pageSize = pDb->cfg.pageSize;
alterReq.daysPerFile = pDb->cfg.daysPerFile;
alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
alterReq.walLevel = pDb->cfg.walLevel;
alterReq.strict = pDb->cfg.strict;
alterReq.cacheLastRow = pDb->cfg.cacheLastRow;
alterReq.replica = pVgroup->replica;
alterReq.selfIndex = -1;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &alterReq.replicas[v];
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) {
return NULL;
}
pReplica->id = pVgidDnode->id;
pReplica->port = pVgidDnode->port;
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
mndReleaseDnode(pMnode, pVgidDnode);
if (pDnode->id == pVgid->dnodeId) {
alterReq.selfIndex = v;
}
}
if (alterReq.selfIndex == -1) {
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
contLen += +sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq);
*pContLen = contLen;
return pReq;
}
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SDropVnodeReq dropReq = {0}; SDropVnodeReq dropReq = {0};
dropReq.dnodeId = pDnode->id; dropReq.dnodeId = pDnode->id;
...@@ -372,12 +434,12 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { ...@@ -372,12 +434,12 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj)); pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
if (pVgroups == NULL) { if (pVgroups == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto ALLOC_VGROUP_OVER; goto _OVER;
} }
pArray = mndBuildDnodesArray(pMnode); pArray = mndBuildDnodesArray(pMnode);
if (pArray == NULL) { if (pArray == NULL) {
goto ALLOC_VGROUP_OVER; goto _OVER;
} }
mDebug("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray), mDebug("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
...@@ -410,7 +472,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { ...@@ -410,7 +472,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) { if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
goto ALLOC_VGROUP_OVER; goto _OVER;
} }
allocedVgroups++; allocedVgroups++;
...@@ -421,7 +483,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { ...@@ -421,7 +483,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
mDebug("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications); mDebug("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
ALLOC_VGROUP_OVER: _OVER:
if (code != 0) taosMemoryFree(pVgroups); if (code != 0) taosMemoryFree(pVgroups);
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
return code; return code;
...@@ -492,7 +554,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep ...@@ -492,7 +554,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
return 0; return 0;
} }
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) { static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
...@@ -533,14 +595,13 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* ...@@ -533,14 +595,13 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock*
// default 3 replica // default 3 replica
for (int32_t i = 0; i < 3; ++i) { for (int32_t i = 0; i < 3; ++i) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (i < pVgroup->replica) { if (i < pVgroup->replica) {
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false); colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false);
char buf1[20] = {0}; char buf1[20] = {0};
const char *role = syncStr(pVgroup->vnodeGid[i].role); const char *role = syncStr(pVgroup->vnodeGid[i].role);
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)buf1, false); colDataAppend(pColInfo, numOfRows, (const char *)buf1, false);
...@@ -597,13 +658,13 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { ...@@ -597,13 +658,13 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
return numOfVnodes; return numOfVnodes;
} }
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) { static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t cols = 0; int32_t cols = 0;
// int32_t dnodeId = pShow->replica; // int32_t dnodeId = pShow->replica;
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup); pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册