diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index e86a4f96905512dd08af6acc4b9563206b93f985..2a0a4b0f63ae70c95423a0fc59cf7200417ebd10 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -154,14 +154,14 @@ typedef struct SSnapshotMeta { typedef struct SSyncFSM { void* data; - int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta); SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm); - int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); - void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta); + void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx); - void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); - void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SReConfigCbMeta* pMeta); + void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 5be60c00a966dacda53d198195d1e436724b821f..3292d8fac26b5ba49d8e34d1a690293c12e6c548 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -133,6 +133,11 @@ _OUT: static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) { if (pTrans->stage == TRN_STAGE_PREPARE) { + if (mndTransCheckConflict(pMnode, pTrans) < 0) { + mError("trans:%d, failed to validate trans conflicts.", pTrans->id); + return -1; + } + return mndTransValidatePrepareStage(pMnode, pTrans); } return 0; @@ -153,10 +158,12 @@ static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) { _OUT: if (pTrans) mndTransDropData(pTrans); if (pRow) taosMemoryFreeClear(pRow); + if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT); return code; } -int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { + terrno = TSDB_CODE_SUCCESS; SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; STrans *pTrans = NULL; @@ -177,6 +184,8 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, const SFsmCbMeta *pMet code = mndTransValidate(pMnode, pRaw); if (code != 0) { mError("trans:%d, failed to validate requested trans since %s", transId, terrstr()); + code = 0; + pMeta->code = terrno; goto _OUT; } @@ -236,7 +245,7 @@ _OUT: return 0; } -int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { SMnode *pMnode = pFsm->data; int32_t code = pMsg->code; if (code != 0) { @@ -245,6 +254,7 @@ int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta * pMsg->info.conn.applyIndex = pMeta->index; pMsg->info.conn.applyTerm = pMeta->term; + pMeta->code = 0; atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); @@ -255,7 +265,7 @@ int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta * code = mndProcessWriteMsg(pMnode, pMsg, pMeta); _OUT: - mndPostMgmtCode(pMnode, code); + mndPostMgmtCode(pMnode, code ? code : pMeta->code); rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; return code; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 29f1ddc50f5935b1f69ec01dee3c5dbd809882f0..ff551e6534d1c445c4465e8f2ad3328f03899196 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -431,7 +431,7 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); } -static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { if (pMsg->code == 0) { return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); } @@ -451,7 +451,7 @@ static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFs return 0; } -static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { if (pMeta->isWeak == 1) { return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); } @@ -463,7 +463,7 @@ static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) { return atomic_load_64(&pVnode->state.applied); } -static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),