diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index f72b69a7de243d27d202a08fbb21561a3e1b4224..505a1384efdd7296751c6d29700b0888aa04543e 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -87,10 +87,11 @@ typedef struct { typedef struct { tsem_t syncSem; int64_t sync; - bool standby; SReplica replica; int32_t errCode; int32_t transId; + SRWLatch lock; + int8_t standby; int8_t leaderTransferFinish; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 6a78d2f018c7fbb92facc9b13e3b09895e800509..03e5c2b3a2be91c259230813d564c05f035cd107 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -60,15 +60,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); } + taosRLockLatch(&pMgmt->lock); if (transId <= 0) { + taosRUnLockLatch(&pMgmt->lock); mError("trans:%d, invalid commit msg", transId); } else if (transId == pMgmt->transId) { + taosRUnLockLatch(&pMgmt->lock); if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); } pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); } else { + taosRUnLockLatch(&pMgmt->lock); STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mDebug("trans:%d, execute in mnode which not leader", transId); @@ -115,6 +119,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); + taosWLockLatch(&pMgmt->lock); if (pMgmt->transId == -1) { if (pMgmt->errCode != 0) { mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); @@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); } + taosWUnLockLatch(&pMgmt->lock); } int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { @@ -170,20 +176,24 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; mDebug("vgId:1, become follower"); + taosWLockLatch(&pMnode->syncMgmt.lock); if (pMnode->syncMgmt.transId != 0) { pMnode->syncMgmt.transId = 0; tsem_post(&pMnode->syncMgmt.syncSem); } + taosWUnLockLatch(&pMnode->syncMgmt.lock); } static void mndBecomeLeader(struct SSyncFSM *pFsm) { mDebug("vgId:1, become leader"); SMnode *pMnode = pFsm->data; + taosWLockLatch(&pMnode->syncMgmt.lock); if (pMnode->syncMgmt.transId != 0) { pMnode->syncMgmt.transId = 0; tsem_post(&pMnode->syncMgmt.syncSem); } + taosWUnLockLatch(&pMnode->syncMgmt.lock); } SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { @@ -210,6 +220,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; + taosInitRWLatch(&pMgmt->lock); + pMgmt->transId = 0; SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg}; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); @@ -262,11 +274,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { memcpy(req.pCont, pRaw, req.contLen); pMgmt->errCode = 0; + taosWLockLatch(&pMgmt->lock); pMgmt->transId = transId; + taosWUnLockLatch(&pMgmt->lock); mTrace("trans:%d, will be proposed", pMgmt->transId); const bool isWeak = false; int32_t code = syncPropose(pMgmt->sync, &req, isWeak); + if (code == 0) { tsem_wait(&pMgmt->syncSem); } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { @@ -294,10 +309,12 @@ void mndSyncStart(SMnode *pMnode) { } void mndSyncStop(SMnode *pMnode) { + taosWLockLatch(&pMnode->syncMgmt.lock); if (pMnode->syncMgmt.transId != 0) { pMnode->syncMgmt.transId = 0; tsem_post(&pMnode->syncMgmt.syncSem); } + taosWUnLockLatch(&pMnode->syncMgmt.lock); } bool mndIsMaster(SMnode *pMnode) {