提交 4a6d8185 编写于 作者: B Benguang Zhao

enh: add mndTransValidate for prepare actions

上级 6679b8af
...@@ -150,7 +150,7 @@ enum { ...@@ -150,7 +150,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_VG, "create-vg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "tmq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "tmq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
......
...@@ -91,6 +91,11 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans); ...@@ -91,6 +91,11 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans);
void mndTransRefresh(SMnode *pMnode, STrans *pTrans); void mndTransRefresh(SMnode *pMnode, STrans *pTrans);
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname); int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname);
SSdbRaw *mndTransEncode(STrans *pTrans);
SSdbRow *mndTransDecode(SSdbRaw *pRaw);
void mndTransDropData(STrans *pTrans);
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -27,6 +27,7 @@ void mndCleanupVgroup(SMnode *pMnode); ...@@ -27,6 +27,7 @@ void mndCleanupVgroup(SMnode *pMnode);
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup); SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
void mndSortVnodeGid(SVgObj *pVgroup); void mndSortVnodeGid(SVgObj *pVgroup);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndVgroup.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
...@@ -73,76 +74,188 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -73,76 +74,188 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code; return code;
} }
int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
SMnode *pMnode = pFsm->data; SSdbRow *pRow = NULL;
int32_t code = -1;
if (pAction->msgType == TDMT_MND_CREATE_VG) {
pRow = mndVgroupActionDecode(pAction->pRaw);
if (pRow == NULL) goto _OUT;
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto _OUT;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (maxVgId > pVgroup->vgId) {
mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId,
maxVgId);
goto _OUT;
}
}
code = 0;
_OUT:
taosMemoryFreeClear(pRow);
return code;
}
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = -1;
int32_t action = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
if (numOfActions == 0) {
code = 0;
goto _OUT;
}
mInfo("trans:%d, validate %d prepare actions.", pTrans->id, numOfActions);
for (action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
if (pAction->actionType != TRANS_ACTION_RAW) {
mError("trans:%d, prepare action:%d of unexpected type:%d", pTrans->id, action, pAction->actionType);
goto _OUT;
}
code = mndTransValidatePrepareAction(pMnode, pTrans, pAction);
if (code != 0) {
mError("trans:%d, failed to validate prepare action: %d, numOfActions:%d", pTrans->id, action, numOfActions);
goto _OUT;
}
}
code = 0;
_OUT:
return code;
}
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
if (pTrans->stage == TRN_STAGE_PREPARE) {
return mndTransValidatePrepareStage(pMnode, pTrans);
}
return 0;
}
static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
STrans *pTrans = NULL;
int32_t code = -1;
SSdbRow *pRow = mndTransDecode(pRaw);
if (pRow == NULL) goto _OUT;
pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) goto _OUT;
code = mndTransValidateImp(pMnode, pTrans);
_OUT:
if (pTrans) mndTransDropData(pTrans);
if (pRow) taosMemoryFreeClear(pRow);
return code;
}
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
STrans *pTrans = NULL;
int32_t code = -1;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
if (transId <= 0) {
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
terrno = TSDB_CODE_INVALID_MSG;
goto _OUT;
}
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p sec:%d seq:%" PRId64, " role:%s raw:%p sec:%d seq:%" PRId64,
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw, pMgmt->transSec, pMgmt->transSeq); pRaw, pMgmt->transSec, pMgmt->transSeq);
if (pMeta->code == 0) { code = mndTransValidate(pMnode, pRaw);
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); if (code != 0) {
if (code != 0) { mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
mError("trans:%d, failed to write to sdb since %s", transId, terrstr()); goto _OUT;
return 0; }
}
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
goto _OUT;
}
pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans == NULL) {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
goto _OUT;
}
if (pTrans->stage == TRN_STAGE_PREPARE) {
bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
if (!continueExec) goto _OUT;
}
if (pTrans->id != pMgmt->transId) {
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
pTrans->id, pTrans->createdTime, pMgmt->transId);
mndTransRefresh(pMnode, pTrans);
} }
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
code = 0;
_OUT:
if (pTrans) mndReleaseTrans(pMnode, pTrans);
return code;
}
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = pMeta->code; if (pMgmt->transId == 0) {
goto _OUT;
}
if (transId <= 0) { pMgmt->transId = 0;
taosThreadMutexUnlock(&pMgmt->lock); pMgmt->transSec = 0;
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq); pMgmt->transSeq = 0;
} else if (transId == pMgmt->transId) { pMgmt->errCode = code;
if (pMgmt->errCode != 0) { tsem_post(&pMgmt->syncSem);
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
} else { if (pMgmt->errCode != 0) {
mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq); mError("trans:%d, failed to propose since %s, post sem", pMgmt->transId, tstrerror(pMgmt->errCode));
}
pMgmt->transId = 0;
pMgmt->transSec = 0;
pMgmt->transSeq = 0;
tsem_post(&pMgmt->syncSem);
taosThreadMutexUnlock(&pMgmt->lock);
} else { } else {
taosThreadMutexUnlock(&pMgmt->lock); mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, pMgmt->transId, pMgmt->transSeq);
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
transId, pTrans->createdTime, pMgmt->transId);
mndTransRefresh(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
}
} }
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); _OUT:
taosThreadMutexUnlock(&pMgmt->lock);
return 0; return 0;
} }
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
int32_t code = 0; SMnode *pMnode = pFsm->data;
int32_t code = pMsg->code;
if (code != 0) {
goto _OUT;
}
pMsg->info.conn.applyIndex = pMeta->index; pMsg->info.conn.applyIndex = pMeta->index;
pMsg->info.conn.applyTerm = pMeta->term; pMsg->info.conn.applyTerm = pMeta->term;
if (pMsg->code == 0) { atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
SMnode *pMnode = pFsm->data;
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
}
if (!syncUtilUserCommit(pMsg->msgType)) { if (!syncUtilUserCommit(pMsg->msgType)) {
goto _out; goto _OUT;
} }
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
_out: code = mndProcessWriteMsg(pMnode, pMsg, pMeta);
_OUT:
mndPostMgmtCode(pMnode, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return code; return code;
......
...@@ -28,8 +28,6 @@ ...@@ -28,8 +28,6 @@
#define TRANS_ARRAY_SIZE 8 #define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 48 #define TRANS_RESERVE_SIZE 48
static SSdbRaw *mndTransEncode(STrans *pTrans);
static SSdbRow *mndTransDecode(SSdbRaw *pRaw);
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc); static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc);
...@@ -38,14 +36,12 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); ...@@ -38,14 +36,12 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static void mndTransDropLogs(SArray *pArray); static void mndTransDropLogs(SArray *pArray);
static void mndTransDropActions(SArray *pArray); static void mndTransDropActions(SArray *pArray);
static void mndTransDropData(STrans *pTrans);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
...@@ -142,7 +138,7 @@ _OVER: ...@@ -142,7 +138,7 @@ _OVER:
return ret; return ret;
} }
static SSdbRaw *mndTransEncode(STrans *pTrans) { SSdbRaw *mndTransEncode(STrans *pTrans) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
int8_t sver = taosArrayGetSize(pTrans->prepareActions) ? TRANS_VER2_NUMBER : TRANS_VER1_NUMBER; int8_t sver = taosArrayGetSize(pTrans->prepareActions) ? TRANS_VER2_NUMBER : TRANS_VER1_NUMBER;
...@@ -267,7 +263,7 @@ _OVER: ...@@ -267,7 +263,7 @@ _OVER:
return ret; return ret;
} }
static SSdbRow *mndTransDecode(SSdbRaw *pRaw) { SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
SSdbRow *pRow = NULL; SSdbRow *pRow = NULL;
...@@ -444,7 +440,7 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { ...@@ -444,7 +440,7 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
return 0; return 0;
} }
static void mndTransDropData(STrans *pTrans) { void mndTransDropData(STrans *pTrans) {
if (pTrans->prepareActions != NULL) { if (pTrans->prepareActions != NULL) {
mndTransDropActions(pTrans->prepareActions); mndTransDropActions(pTrans->prepareActions);
pTrans->prepareActions = NULL; pTrans->prepareActions = NULL;
...@@ -1330,7 +1326,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1330,7 +1326,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
return code; return code;
} }
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true; bool continueExec = true;
int32_t code = 0; int32_t code = 0;
...@@ -1341,7 +1337,11 @@ static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1341,7 +1337,11 @@ static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action); STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
mndTransExecSingleAction(pMnode, pTrans, pAction); code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code != 0) {
mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions);
return false;
}
} }
_OVER: _OVER:
......
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
#define VGROUP_VER_NUMBER 1 #define VGROUP_VER_NUMBER 1
#define VGROUP_RESERVE_SIZE 64 #define VGROUP_RESERVE_SIZE 64
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
...@@ -1251,7 +1250,7 @@ int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { ...@@ -1251,7 +1250,7 @@ int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
SSdbRaw *pRaw = mndVgroupActionEncode(pVg); SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err; if (pRaw == NULL) goto _err;
STransAction action = {.pRaw = pRaw}; STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG};
if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err; if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING); (void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
pRaw = NULL; pRaw = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册