未验证 提交 91430320 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9708 from taosdata/feature/dnode3

Feature/dnode3
......@@ -165,6 +165,8 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
vnodeClose(pVnode->pImpl);
pVnode->pImpl = NULL;
dDebug("vgId:%d, vnode is closed", pVnode->vgId);
free(pVnode->path);
free(pVnode->db);
free(pVnode);
......@@ -238,11 +240,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
}
int32_t vnodesNum = cJSON_GetArraySize(vnodes);
if (vnodesNum <= 0) {
dError("failed to read %s since vnodes size:%d invalid", file, vnodesNum);
goto PRASE_VNODE_OVER;
}
if (vnodesNum > 0) {
pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg));
if (pCfgs == NULL) {
dError("failed to read %s since out of memory", file);
......@@ -291,6 +289,8 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
}
*ppCfgs = pCfgs;
}
*numOfVnodes = vnodesNum;
code = 0;
dInfo("succcessed to read file %s", file);
......@@ -548,13 +548,13 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra
pCfg->vgVersion = pCreate->vgVersion;
}
static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *pReq) {
static SDropVnodeReq *dndParseDropVnodeReq(SRpcMsg *pReq) {
SDropVnodeReq *pDrop = pReq->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return pDrop;
}
static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *pReq) {
static SAuthVnodeReq *dndParseAuthVnodeReq(SRpcMsg *pReq) {
SAuthVnodeReq *pAuth = pReq->pCont;
pAuth->vgId = htonl(pAuth->vgId);
return pAuth;
......@@ -572,10 +572,10 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", pCreate->vgId);
dDebug("vgId:%d, already exist", pCreate->vgId);
dndReleaseVnode(pDnode, pVnode);
terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
return 0;
return -1;
}
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
......@@ -641,7 +641,7 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDropVnodeReq *pDrop = vnodeParseDropVnodeReq(pReq);
SDropVnodeReq *pDrop = dndParseDropVnodeReq(pReq);
int32_t vgId = pDrop->vgId;
dDebug("vgId:%d, drop vnode req is received", vgId);
......@@ -649,7 +649,8 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
if (pVnode == NULL) {
dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
return 0;
terrno = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
return -1;
}
pVnode->dropped = 1;
......@@ -668,7 +669,7 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SAuthVnodeReq *pAuth = (SAuthVnodeReq *)vnodeParseAuthVnodeReq(pReq);
SAuthVnodeReq *pAuth = (SAuthVnodeReq *)dndParseAuthVnodeReq(pReq);
int32_t vgId = pAuth->vgId;
dDebug("vgId:%d, auth vnode req is received", vgId);
......@@ -685,7 +686,7 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SSyncVnodeReq *pSync = (SSyncVnodeReq *)vnodeParseDropVnodeReq(pReq);
SSyncVnodeReq *pSync = (SSyncVnodeReq *)dndParseDropVnodeReq(pReq);
int32_t vgId = pSync->vgId;
dDebug("vgId:%d, sync vnode req is received", vgId);
......@@ -707,7 +708,7 @@ int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SCompactVnodeReq *pCompact = (SCompactVnodeReq *)vnodeParseDropVnodeReq(pReq);
SCompactVnodeReq *pCompact = (SCompactVnodeReq *)dndParseDropVnodeReq(pReq);
int32_t vgId = pCompact->vgId;
dDebug("vgId:%d, compact vnode req is received", vgId);
......
......@@ -13,7 +13,7 @@
class DndTestMnode : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_mnode", 9113); }
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_mnode", 9114); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
......
......@@ -13,7 +13,7 @@
class DndTestSnode : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_snode", 9112); }
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_snode", 9113); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
......
/**
* @file db.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module vgroup-msg tests
* @brief DNODE module vnode tests
* @version 0.1
* @date 2021-12-20
*
......@@ -13,7 +13,7 @@
class DndTestVnode : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vnode", 9150); }
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vnode", 9115); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
......@@ -57,12 +57,17 @@ TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) {
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9150);
pReplica->port = htons(9527);
}
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED);
}
}
}
......@@ -97,7 +102,7 @@ TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) {
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9150);
pReplica->port = htons(9527);
}
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_VNODE, pReq, contLen);
......@@ -123,7 +128,12 @@ TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_NOT_DEPLOYED);
}
}
}
}
......@@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
#ifdef __cplusplus
}
......
......@@ -177,7 +177,7 @@ static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) {
}
static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) {
mTrace("acct:%s, perform update action, old_row:%p new_row:%p", pOld->acct, pOld, pNew);
mTrace("acct:%s, perform update action, old row:%p new row:%p", pOld->acct, pOld, pNew);
pOld->updateTime = pNew->updateTime;
pOld->status = pNew->status;
......
......@@ -155,7 +155,7 @@ static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) {
}
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew) {
mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
mTrace("bnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime;
return 0;
}
......
......@@ -135,7 +135,7 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
}
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) {
mTrace("cluster:%" PRId64 ", perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
return 0;
}
......
......@@ -28,15 +28,15 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb);
static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg);
static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq);
static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq);
static int32_t mndProcessDropDbReq(SMnodeMsg *pReq);
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq);
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq);
static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq);
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
int32_t mndInitDb(SMnode *pMnode) {
......@@ -48,12 +48,12 @@ int32_t mndInitDb(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndDbActionUpdate,
.deleteFp = (SdbDeleteFp)mndDbActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DB, mndProcessAlterDbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_DB, mndProcessDropDbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DB, mndProcessAlterDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_DB, mndProcessDropDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
......@@ -182,12 +182,12 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
return 0;
}
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) {
mTrace("db:%s, perform update action, old_row:%p new_row:%p", pOldDb->name, pOldDb, pNewDb);
pOldDb->updateTime = pNewDb->updateTime;
pOldDb->cfgVersion = pNewDb->cfgVersion;
pOldDb->vgVersion = pNewDb->vgVersion;
memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg));
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
mTrace("db:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
pOld->updateTime = pNew->updateTime;
pOld->cfgVersion = pNew->cfgVersion;
pOld->vgVersion = pNew->vgVersion;
memcpy(&pOld->cfg, &pNew->cfg, sizeof(SDbCfg));
return 0;
}
......@@ -331,14 +331,15 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SCreateVnodeReq *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1;
SCreateVnodeReq *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup);
if (pReq == NULL) return -1;
action.pCont = pMsg;
action.pCont = pReq;
action.contLen = sizeof(SCreateVnodeReq);
action.msgType = TDMT_DND_CREATE_VNODE;
action.acceptableCode = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
free(pReq);
return -1;
}
}
......@@ -360,14 +361,15 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1;
SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup);
if (pReq == NULL) return -1;
action.pCont = pMsg;
action.pCont = pReq;
action.contLen = sizeof(SDropVnodeReq);
action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg);
free(pReq);
return -1;
}
}
......@@ -376,7 +378,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) {
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbMsg *pCreate, SUserObj *pUser) {
SDbObj dbObj = {0};
memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN);
memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
......@@ -425,43 +427,17 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
}
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
if (pTrans == NULL) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
goto CREATE_DB_OVER;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_DB_OVER;
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_DB_OVER;
code = 0;
......@@ -471,9 +447,9 @@ CREATE_DB_OVER:
return code;
}
static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCreateDbMsg *pCreate = pMsg->rpcMsg.pCont;
static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SCreateDbMsg *pCreate = pReq->rpcMsg.pCont;
pCreate->numOfVgroups = htonl(pCreate->numOfVgroups);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
......@@ -502,13 +478,13 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) {
}
}
SUserObj *pOperUser = mndAcquireUser(pMnode, pMsg->user);
SUserObj *pOperUser = mndAcquireUser(pMnode, pReq->user);
if (pOperUser == NULL) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1;
}
int32_t code = mndCreateDb(pMnode, pMsg, pCreate, pOperUser);
int32_t code = mndCreateDb(pMnode, pReq, pCreate, pOperUser);
mndReleaseUser(pMnode, pOperUser);
if (code != 0) {
......@@ -565,8 +541,8 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) {
return terrno;
}
static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pOldDb);
static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1;
......@@ -574,8 +550,8 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
return 0;
}
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb);
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
......@@ -593,14 +569,14 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SAlterVnodeReq *pMsg = (SAlterVnodeReq *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1;
SAlterVnodeReq *pReq = (SAlterVnodeReq *)mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup);
if (pReq == NULL) return -1;
action.pCont = pMsg;
action.pCont = pReq;
action.contLen = sizeof(SAlterVnodeReq);
action.msgType = TDMT_DND_ALTER_VNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
free(pReq);
return -1;
}
}
......@@ -608,7 +584,7 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
......@@ -617,8 +593,8 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pNewDb->uid) {
if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNewDb, pVgroup) != 0) {
if (pVgroup->dbUid == pNew->uid) {
if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
......@@ -631,27 +607,27 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) {
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("db:%s, failed to update since %s", pOldDb->name, terrstr());
mError("db:%s, failed to update since %s", pOld->name, terrstr());
return terrno;
}
mDebug("trans:%d, used to update db:%s", pTrans->id, pOldDb->name);
mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name);
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) {
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) {
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) {
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
......@@ -668,9 +644,9 @@ UPDATE_DB_OVER:
return code;
}
static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SAlterDbMsg *pAlter = pMsg->rpcMsg.pCont;
static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SAlterDbMsg *pAlter = pReq->rpcMsg.pCont;
pAlter->totalBlocks = htonl(pAlter->totalBlocks);
pAlter->daysToKeep0 = htonl(pAlter->daysToKeep0);
pAlter->daysToKeep1 = htonl(pAlter->daysToKeep1);
......@@ -697,7 +673,7 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) {
dbObj.cfgVersion++;
dbObj.updateTime = taosGetTimestampMs();
code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj);
code = mndUpdateDb(pMnode, pReq, pDb, &dbObj);
mndReleaseDb(pMnode, pDb);
if (code != 0) {
......@@ -757,14 +733,15 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1;
SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup);
if (pReq == NULL) return -1;
action.pCont = pMsg;
action.pCont = pReq;
action.contLen = sizeof(SCreateVnodeReq);
action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
free(pReq);
return -1;
}
}
......@@ -795,35 +772,17 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0;
}
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
mError("db:%s, failed to drop since %s", pDb->name, terrstr());
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_DB_OVER;
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER;
code = 0;
......@@ -832,9 +791,9 @@ DROP_DB_OVER:
return code;
}
static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SDropDbMsg *pDrop = pMsg->rpcMsg.pCont;
static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SDropDbMsg *pDrop = pReq->rpcMsg.pCont;
mDebug("db:%s, start to drop", pDrop->db);
......@@ -850,7 +809,7 @@ static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg) {
}
}
int32_t code = mndDropDb(pMnode, pMsg, pDb);
int32_t code = mndDropDb(pMnode, pReq, pDb);
mndReleaseDb(pMnode, pDb);
if (code != 0) {
......@@ -861,16 +820,16 @@ static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
SUseDbMsg *pUse = pMsg->rpcMsg.pCont;
SUseDbMsg *pUse = pReq->rpcMsg.pCont;
pUse->vgVersion = htonl(pUse->vgVersion);
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mError("db:%s, failed to process use db msg since %s", pUse->db, terrstr());
mError("db:%s, failed to process use db req since %s", pUse->db, terrstr());
return -1;
}
......@@ -922,19 +881,19 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
pRsp->vgNum = htonl(vindex);
pRsp->hashMethod = pDb->hashMethod;
pMsg->pCont = pRsp;
pMsg->contLen = contLen;
pReq->pCont = pRsp;
pReq->contLen = contLen;
mndReleaseDb(pMnode, pDb);
return 0;
}
static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SSyncDbMsg *pSync = pMsg->rpcMsg.pCont;
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSyncDbMsg *pSync = pReq->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pSync->db);
if (pDb == NULL) {
mError("db:%s, failed to process sync db msg since %s", pSync->db, terrstr());
mError("db:%s, failed to process sync db req since %s", pSync->db, terrstr());
return -1;
}
......@@ -942,12 +901,12 @@ static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont;
static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SCompactDbMsg *pCompact = pReq->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db);
if (pDb == NULL) {
mError("db:%s, failed to process compact db msg since %s", pCompact->db, terrstr());
mError("db:%s, failed to process compact db req since %s", pCompact->db, terrstr());
return -1;
}
......@@ -955,8 +914,8 @@ static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
......@@ -1096,8 +1055,8 @@ char *mnGetDbStr(char *src) {
return pos;
}
static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SDbObj *pDb = NULL;
......
......@@ -183,7 +183,7 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
}
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime;
return 0;
}
......
......@@ -152,7 +152,7 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
}
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) {
mTrace("func:%s, perform update action, old_row:%p new_row:%p", pOldFunc->name, pOldFunc, pNewFunc);
mTrace("func:%s, perform update action, old row:%p new row:%p", pOldFunc->name, pOldFunc, pNewFunc);
return 0;
}
......
......@@ -208,7 +208,7 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
}
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime;
return 0;
}
......
......@@ -155,7 +155,7 @@ static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
}
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew) {
mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
mTrace("qnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime;
return 0;
}
......
......@@ -155,7 +155,7 @@ static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) {
}
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) {
mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
mTrace("snode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime;
return 0;
}
......
......@@ -178,7 +178,7 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
}
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) {
mTrace("stb:%s, perform update action, old_row:%p new_row:%p", pOldStb->name, pOldStb, pNewStb);
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOldStb->name, pOldStb, pNewStb);
atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
atomic_exchange_32(&pOldStb->version, pNewStb->version);
......
......@@ -192,7 +192,7 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
}
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
mTrace("user:%s, perform update action, old_row:%p new_row:%p", pOld->user, pOld, pNew);
mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew);
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
pOld->updateTime = pNew->updateTime;
return 0;
......
......@@ -165,7 +165,7 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
}
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) {
mTrace("vgId:%d, perform update action, old_row:%p new_row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup);
mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup);
pOldVgroup->updateTime = pNewVgroup->updateTime;
pOldVgroup->version = pNewVgroup->version;
pOldVgroup->hashBegin = pNewVgroup->hashBegin;
......@@ -189,7 +189,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease(pSdb, pVgroup);
}
SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq));
if (pCreate == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -248,7 +248,7 @@ SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
return pCreate;
}
SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq));
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -13,10 +13,20 @@
class MndTestDb : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_db", 9030); }
static void TearDownTestSuite() { test.Cleanup(); }
static void SetUpTestSuite() {
test.Init("/tmp/mnode_test_db", 9030);
const char* fqdn = "localhost";
const char* firstEp = "localhost:9030";
server2.Start("/tmp/mnode_test_db2", fqdn, 9031, firstEp);
}
static void TearDownTestSuite() {
server2.Stop();
test.Cleanup();
}
static Testbase test;
static TestServer server2;
public:
void SetUp() override {}
......@@ -24,6 +34,7 @@ class MndTestDb : public ::testing::Test {
};
Testbase MndTestDb::test;
TestServer MndTestDb::server2;
TEST_F(MndTestDb, 01_ShowDb) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, "");
......@@ -149,7 +160,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
CheckTimestamp();
CheckInt16(2); // vgroups
CheckInt32(0);
CheckInt32(0); // tables
CheckInt16(1); // replica
CheckInt16(2); // quorum
CheckInt16(10); // days
......@@ -177,7 +188,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
CheckTimestamp();
CheckInt16(2); // vgroups
CheckInt32(0);
CheckInt32(0); // tables
CheckInt16(1); // replica
CheckInt16(2); // quorum
CheckInt16(10); // days
......@@ -301,4 +312,15 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_STREQ(pAddr->fqdn, "localhost");
}
}
{
int32_t contLen = sizeof(SDropDbMsg);
SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d2");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册