提交 9f1c8cc7 编写于 作者: B Benguang Zhao

enh: unify validation of prepare actions with cb validateFp

上级 e7411183
...@@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); ...@@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew); static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq); static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
static int32_t mndProcessDropDbReq(SRpcMsg *pReq); static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
...@@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) { ...@@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) {
.insertFp = (SdbInsertFp)mndDbActionInsert, .insertFp = (SdbInsertFp)mndDbActionInsert,
.updateFp = (SdbUpdateFp)mndDbActionUpdate, .updateFp = (SdbUpdateFp)mndDbActionUpdate,
.deleteFp = (SdbDeleteFp)mndDbActionDelete, .deleteFp = (SdbDeleteFp)mndDbActionDelete,
.validateFp = (SdbValidateFp)mndNewDbActionValidate,
}; };
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq);
...@@ -247,6 +250,19 @@ _OVER: ...@@ -247,6 +250,19 @@ _OVER:
return pRow; return pRow;
} }
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
SDbObj *pNewDb = pObj;
SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name);
if (pOldDb != NULL) {
mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name);
sdbRelease(pMnode->pSdb, pOldDb);
return -1;
}
return 0;
}
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) { static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb); mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb);
return 0; return 0;
......
...@@ -16,9 +16,7 @@ ...@@ -16,9 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndSync.h" #include "mndSync.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndDb.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndVgroup.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
...@@ -75,61 +73,31 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -75,61 +73,31 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code; return code;
} }
static int32_t mndValidateNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
SSdbRaw *pRaw = pAction->pRaw;
SSdb *pSdb = pMnode->pSdb;
SSdbRow *pRow = NULL;
void *pObj = NULL;
int code = -1; int code = -1;
SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_VGROUP])(pAction->pRaw);
if (pRow == NULL) goto _OUT;
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto _OUT;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
if (maxVgId > pVgroup->vgId) {
mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
goto _OUT;
}
code = 0; pRow = (pSdb->decodeFps[pRaw->type])(pRaw);
_OUT:
taosMemoryFreeClear(pRow);
return code;
}
static int32_t mndValidateCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
int code = -1;
SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_DB])(pAction->pRaw);
if (pRow == NULL) goto _OUT; if (pRow == NULL) goto _OUT;
SDbObj *pNewDb = sdbGetRowObj(pRow); pObj = sdbGetRowObj(pRow);
if (pNewDb == NULL) goto _OUT; if (pObj == NULL) goto _OUT;
SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
if (pOldDb != NULL) { code = 0;
mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); if (validateFp) {
sdbRelease(pMnode->pSdb, pOldDb); code = validateFp(pMnode, pTrans, pObj);
goto _OUT;
} }
code = 0;
_OUT: _OUT:
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
return code; return code;
} }
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
int32_t code = 0;
switch (pAction->pRaw->type) {
case SDB_VGROUP:
code = mndValidateNewVgPrepareAction(pMnode, pTrans, pAction);
break;
case SDB_DB:
code = mndValidateCreateDbPrepareAction(pMnode, pTrans, pAction);
break;
default:
}
return code;
}
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = -1; int32_t code = -1;
int32_t action = 0; int32_t action = 0;
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
...@@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
.insertFp = (SdbInsertFp)mndVgroupActionInsert, .insertFp = (SdbInsertFp)mndVgroupActionInsert,
.updateFp = (SdbUpdateFp)mndVgroupActionUpdate, .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete, .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
.validateFp = (SdbValidateFp)mndNewVgActionValidate,
}; };
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
...@@ -171,6 +173,17 @@ _OVER: ...@@ -171,6 +173,17 @@ _OVER:
return pRow; return pRow;
} }
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
SVgObj *pVgroup = pObj;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (maxVgId > pVgroup->vgId) {
mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
return -1;
}
return 0;
}
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) { static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup); mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
return 0; return 0;
......
...@@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj); ...@@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc); typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj);
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3); typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
...@@ -189,6 +190,7 @@ typedef struct SSdb { ...@@ -189,6 +190,7 @@ typedef struct SSdb {
SdbDeployFp deployFps[SDB_MAX]; SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX];
SdbValidateFp validateFps[SDB_MAX];
TdThreadMutex filelock; TdThreadMutex filelock;
} SSdb; } SSdb;
...@@ -207,6 +209,7 @@ typedef struct { ...@@ -207,6 +209,7 @@ typedef struct {
SdbInsertFp insertFp; SdbInsertFp insertFp;
SdbUpdateFp updateFp; SdbUpdateFp updateFp;
SdbDeleteFp deleteFp; SdbDeleteFp deleteFp;
SdbValidateFp validateFp;
} SSdbTable; } SSdbTable;
typedef struct SSdbOpt { typedef struct SSdbOpt {
......
...@@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { ...@@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb->deployFps[sdbType] = table.deployFp; pSdb->deployFps[sdbType] = table.deployFp;
pSdb->encodeFps[sdbType] = table.encodeFp; pSdb->encodeFps[sdbType] = table.encodeFp;
pSdb->decodeFps[sdbType] = table.decodeFp; pSdb->decodeFps[sdbType] = table.decodeFp;
pSdb->validateFps[sdbType] = table.validateFp;
int32_t hashType = 0; int32_t hashType = 0;
if (keyType == SDB_KEY_INT32) { if (keyType == SDB_KEY_INT32) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册