提交 28efd7dc 编写于 作者: B Benguang Zhao

enh: check trans conflicts in mndTransValidate

上级 4a6d8185
...@@ -154,14 +154,14 @@ typedef struct SSnapshotMeta { ...@@ -154,14 +154,14 @@ typedef struct SSnapshotMeta {
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* data; void* data;
int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm); SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm);
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm); int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm);
......
...@@ -133,6 +133,11 @@ _OUT: ...@@ -133,6 +133,11 @@ _OUT:
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
if (pTrans->stage == TRN_STAGE_PREPARE) { if (pTrans->stage == TRN_STAGE_PREPARE) {
if (mndTransCheckConflict(pMnode, pTrans) < 0) {
mError("trans:%d, failed to validate trans conflicts.", pTrans->id);
return -1;
}
return mndTransValidatePrepareStage(pMnode, pTrans); return mndTransValidatePrepareStage(pMnode, pTrans);
} }
return 0; return 0;
...@@ -153,10 +158,12 @@ static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) { ...@@ -153,10 +158,12 @@ static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
_OUT: _OUT:
if (pTrans) mndTransDropData(pTrans); if (pTrans) mndTransDropData(pTrans);
if (pRow) taosMemoryFreeClear(pRow); if (pRow) taosMemoryFreeClear(pRow);
if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT);
return code; return code;
} }
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
terrno = TSDB_CODE_SUCCESS;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
STrans *pTrans = NULL; STrans *pTrans = NULL;
...@@ -177,6 +184,8 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, const SFsmCbMeta *pMet ...@@ -177,6 +184,8 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, const SFsmCbMeta *pMet
code = mndTransValidate(pMnode, pRaw); code = mndTransValidate(pMnode, pRaw);
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to validate requested trans since %s", transId, terrstr()); mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
code = 0;
pMeta->code = terrno;
goto _OUT; goto _OUT;
} }
...@@ -236,7 +245,7 @@ _OUT: ...@@ -236,7 +245,7 @@ _OUT:
return 0; return 0;
} }
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
int32_t code = pMsg->code; int32_t code = pMsg->code;
if (code != 0) { if (code != 0) {
...@@ -245,6 +254,7 @@ int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta * ...@@ -245,6 +254,7 @@ int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *
pMsg->info.conn.applyIndex = pMeta->index; pMsg->info.conn.applyIndex = pMeta->index;
pMsg->info.conn.applyTerm = pMeta->term; pMsg->info.conn.applyTerm = pMeta->term;
pMeta->code = 0;
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
...@@ -255,7 +265,7 @@ int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta * ...@@ -255,7 +265,7 @@ int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *
code = mndProcessWriteMsg(pMnode, pMsg, pMeta); code = mndProcessWriteMsg(pMnode, pMsg, pMeta);
_OUT: _OUT:
mndPostMgmtCode(pMnode, code); mndPostMgmtCode(pMnode, code ? code : pMeta->code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return code; return code;
......
...@@ -431,7 +431,7 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm ...@@ -431,7 +431,7 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
} }
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
if (pMsg->code == 0) { if (pMsg->code == 0) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
} }
...@@ -451,7 +451,7 @@ static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFs ...@@ -451,7 +451,7 @@ static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFs
return 0; return 0;
} }
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
if (pMeta->isWeak == 1) { if (pMeta->isWeak == 1) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
} }
...@@ -463,7 +463,7 @@ static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) { ...@@ -463,7 +463,7 @@ static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
return atomic_load_64(&pVnode->state.applied); return atomic_load_64(&pVnode->state.applied);
} }
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册