diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1f2d59749692d23deed0e115efc6e87c9c815e0b..ee1888ff4922f55572120789312ecbe0a7acf859 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -150,7 +150,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) - TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_VG, "create-vg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "tmq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index ac078ce1a38d14ca883c61c239d69e07c7cf0b4e..625546aa552c77035b4b86dae30d2c0964b66118 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -91,6 +91,11 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans); void mndTransRefresh(SMnode *pMnode, STrans *pTrans); int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname); +SSdbRaw *mndTransEncode(STrans *pTrans); +SSdbRow *mndTransDecode(SSdbRaw *pRaw); +void mndTransDropData(STrans *pTrans); + +bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 286a62438366919550c169b798a09eef0942bfdc..7c2f8b5b65386b7a2e27541cf8d34b0f11c5e1ec 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -27,6 +27,7 @@ void mndCleanupVgroup(SMnode *pMnode); SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); +SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); void mndSortVnodeGid(SVgObj *pVgroup); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 02925f9d8cf39247678bf7a380e913d8569a8792..5be60c00a966dacda53d198195d1e436724b821f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,6 +17,7 @@ #include "mndSync.h" #include "mndCluster.h" #include "mndTrans.h" +#include "mndVgroup.h" static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { @@ -73,76 +74,188 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { - SMnode *pMnode = pFsm->data; +static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + SSdbRow *pRow = NULL; + int32_t code = -1; + + if (pAction->msgType == TDMT_MND_CREATE_VG) { + pRow = mndVgroupActionDecode(pAction->pRaw); + if (pRow == NULL) goto _OUT; + + SVgObj *pVgroup = sdbGetRowObj(pRow); + if (pVgroup == NULL) goto _OUT; + + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + if (maxVgId > pVgroup->vgId) { + mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId, + maxVgId); + goto _OUT; + } + } + + code = 0; +_OUT: + taosMemoryFreeClear(pRow); + return code; +} + +static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) { + int32_t code = -1; + int32_t action = 0; + + int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions); + if (numOfActions == 0) { + code = 0; + goto _OUT; + } + + mInfo("trans:%d, validate %d prepare actions.", pTrans->id, numOfActions); + + for (action = 0; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pTrans->prepareActions, action); + + if (pAction->actionType != TRANS_ACTION_RAW) { + mError("trans:%d, prepare action:%d of unexpected type:%d", pTrans->id, action, pAction->actionType); + goto _OUT; + } + + code = mndTransValidatePrepareAction(pMnode, pTrans, pAction); + if (code != 0) { + mError("trans:%d, failed to validate prepare action: %d, numOfActions:%d", pTrans->id, action, numOfActions); + goto _OUT; + } + } + + code = 0; +_OUT: + return code; +} + +static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) { + if (pTrans->stage == TRN_STAGE_PREPARE) { + return mndTransValidatePrepareStage(pMnode, pTrans); + } + return 0; +} + +static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) { + STrans *pTrans = NULL; + int32_t code = -1; + + SSdbRow *pRow = mndTransDecode(pRaw); + if (pRow == NULL) goto _OUT; + + pTrans = sdbGetRowObj(pRow); + if (pTrans == NULL) goto _OUT; + + code = mndTransValidateImp(pMnode, pTrans); + +_OUT: + if (pTrans) mndTransDropData(pTrans); + if (pRow) taosMemoryFreeClear(pRow); + return code; +} + +int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; - + STrans *pTrans = NULL; + int32_t code = -1; int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); + + if (transId <= 0) { + mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq); + terrno = TSDB_CODE_INVALID_MSG; + goto _OUT; + } + mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 " role:%s raw:%p sec:%d seq:%" PRId64, transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), pRaw, pMgmt->transSec, pMgmt->transSeq); - if (pMeta->code == 0) { - int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); - if (code != 0) { - mError("trans:%d, failed to write to sdb since %s", transId, terrstr()); - return 0; - } - sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); + code = mndTransValidate(pMnode, pRaw); + if (code != 0) { + mError("trans:%d, failed to validate requested trans since %s", transId, terrstr()); + goto _OUT; + } + + code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); + if (code != 0) { + mError("trans:%d, failed to write to sdb since %s", transId, terrstr()); + goto _OUT; + } + + pTrans = mndAcquireTrans(pMnode, transId); + if (pTrans == NULL) { + mError("trans:%d, not found while execute in mnode since %s", transId, terrstr()); + goto _OUT; + } + + if (pTrans->stage == TRN_STAGE_PREPARE) { + bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans); + if (!continueExec) goto _OUT; + } + + if (pTrans->id != pMgmt->transId) { + mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d", + pTrans->id, pTrans->createdTime, pMgmt->transId); + mndTransRefresh(pMnode, pTrans); } + sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); + sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); + code = 0; + +_OUT: + if (pTrans) mndReleaseTrans(pMnode, pTrans); + return code; +} + +static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; taosThreadMutexLock(&pMgmt->lock); - pMgmt->errCode = pMeta->code; + if (pMgmt->transId == 0) { + goto _OUT; + } - if (transId <= 0) { - taosThreadMutexUnlock(&pMgmt->lock); - mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq); - } else if (transId == pMgmt->transId) { - if (pMgmt->errCode != 0) { - mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); - } else { - mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq); - } - pMgmt->transId = 0; - pMgmt->transSec = 0; - pMgmt->transSeq = 0; - tsem_post(&pMgmt->syncSem); - taosThreadMutexUnlock(&pMgmt->lock); + pMgmt->transId = 0; + pMgmt->transSec = 0; + pMgmt->transSeq = 0; + pMgmt->errCode = code; + tsem_post(&pMgmt->syncSem); + + if (pMgmt->errCode != 0) { + mError("trans:%d, failed to propose since %s, post sem", pMgmt->transId, tstrerror(pMgmt->errCode)); } else { - taosThreadMutexUnlock(&pMgmt->lock); - STrans *pTrans = mndAcquireTrans(pMnode, transId); - if (pTrans != NULL) { - mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d", - transId, pTrans->createdTime, pMgmt->transId); - mndTransRefresh(pMnode, pTrans); - mndReleaseTrans(pMnode, pTrans); - } else { - mError("trans:%d, not found while execute in mnode since %s", transId, terrstr()); - } + mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, pMgmt->transId, pMgmt->transSeq); } - sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); +_OUT: + taosThreadMutexUnlock(&pMgmt->lock); return 0; } int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { - int32_t code = 0; + SMnode *pMnode = pFsm->data; + int32_t code = pMsg->code; + if (code != 0) { + goto _OUT; + } + pMsg->info.conn.applyIndex = pMeta->index; pMsg->info.conn.applyTerm = pMeta->term; - if (pMsg->code == 0) { - SMnode *pMnode = pFsm->data; - atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); - } + atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); if (!syncUtilUserCommit(pMsg->msgType)) { - goto _out; + goto _OUT; } - code = mndProcessWriteMsg(pFsm, pMsg, pMeta); -_out: + code = mndProcessWriteMsg(pMnode, pMsg, pMeta); + +_OUT: + mndPostMgmtCode(pMnode, code); rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; return code; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 44114bac0301e7204362a29418b2ed4f4b6f1667..14f12fab2282222c02247cfc5d65803d774dcf5b 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -28,8 +28,6 @@ #define TRANS_ARRAY_SIZE 8 #define TRANS_RESERVE_SIZE 48 -static SSdbRaw *mndTransEncode(STrans *pTrans); -static SSdbRow *mndTransDecode(SSdbRaw *pRaw); static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc); @@ -38,14 +36,12 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); static void mndTransDropActions(SArray *pArray); -static void mndTransDropData(STrans *pTrans); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); @@ -142,7 +138,7 @@ _OVER: return ret; } -static SSdbRaw *mndTransEncode(STrans *pTrans) { +SSdbRaw *mndTransEncode(STrans *pTrans) { terrno = TSDB_CODE_INVALID_MSG; int8_t sver = taosArrayGetSize(pTrans->prepareActions) ? TRANS_VER2_NUMBER : TRANS_VER1_NUMBER; @@ -267,7 +263,7 @@ _OVER: return ret; } -static SSdbRow *mndTransDecode(SSdbRaw *pRaw) { +SSdbRow *mndTransDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_INVALID_MSG; SSdbRow *pRow = NULL; @@ -444,7 +440,7 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { return 0; } -static void mndTransDropData(STrans *pTrans) { +void mndTransDropData(STrans *pTrans) { if (pTrans->prepareActions != NULL) { mndTransDropActions(pTrans->prepareActions); pTrans->prepareActions = NULL; @@ -1330,7 +1326,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) return code; } -static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { +bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; int32_t code = 0; @@ -1341,7 +1337,11 @@ static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pTrans->prepareActions, action); - mndTransExecSingleAction(pMnode, pTrans, pAction); + code = mndTransExecSingleAction(pMnode, pTrans, pAction); + if (code != 0) { + mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions); + return false; + } } _OVER: diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 58612efe48df5147917765eeb228842ed7ca002d..776c263eddbd5d0b05edb1779ec45d882f5d54c0 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -28,7 +28,6 @@ #define VGROUP_VER_NUMBER 1 #define VGROUP_RESERVE_SIZE 64 -static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); @@ -1251,7 +1250,7 @@ int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { SSdbRaw *pRaw = mndVgroupActionEncode(pVg); if (pRaw == NULL) goto _err; - STransAction action = {.pRaw = pRaw}; + STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG}; if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err; (void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING); pRaw = NULL;