提交 1d9331e2 编写于 作者: S Shengliang Guan

fix: deadlock of mnode if its state changed

上级 c47e387d
...@@ -87,10 +87,11 @@ typedef struct { ...@@ -87,10 +87,11 @@ typedef struct {
typedef struct { typedef struct {
tsem_t syncSem; tsem_t syncSem;
int64_t sync; int64_t sync;
bool standby;
SReplica replica; SReplica replica;
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
SRWLatch lock;
int8_t standby;
int8_t leaderTransferFinish; int8_t leaderTransferFinish;
} SSyncMgmt; } SSyncMgmt;
......
...@@ -60,15 +60,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM ...@@ -60,15 +60,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
} }
taosRLockLatch(&pMgmt->lock);
if (transId <= 0) { if (transId <= 0) {
taosRUnLockLatch(&pMgmt->lock);
mError("trans:%d, invalid commit msg", transId); mError("trans:%d, invalid commit msg", transId);
} else if (transId == pMgmt->transId) { } else if (transId == pMgmt->transId) {
taosRUnLockLatch(&pMgmt->lock);
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
} }
pMgmt->transId = 0; pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} else { } else {
taosRUnLockLatch(&pMgmt->lock);
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) { if (pTrans != NULL) {
mDebug("trans:%d, execute in mnode which not leader", transId); mDebug("trans:%d, execute in mnode which not leader", transId);
...@@ -115,6 +119,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM ...@@ -115,6 +119,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
cbMeta.code, cbMeta.index, cbMeta.term); cbMeta.code, cbMeta.index, cbMeta.term);
taosWLockLatch(&pMgmt->lock);
if (pMgmt->transId == -1) { if (pMgmt->transId == -1) {
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
...@@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM ...@@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
pMgmt->transId = 0; pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} }
taosWUnLockLatch(&pMgmt->lock);
} }
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
...@@ -170,20 +176,24 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) { ...@@ -170,20 +176,24 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
mDebug("vgId:1, become follower"); mDebug("vgId:1, become follower");
taosWLockLatch(&pMnode->syncMgmt.lock);
if (pMnode->syncMgmt.transId != 0) { if (pMnode->syncMgmt.transId != 0) {
pMnode->syncMgmt.transId = 0; pMnode->syncMgmt.transId = 0;
tsem_post(&pMnode->syncMgmt.syncSem); tsem_post(&pMnode->syncMgmt.syncSem);
} }
taosWUnLockLatch(&pMnode->syncMgmt.lock);
} }
static void mndBecomeLeader(struct SSyncFSM *pFsm) { static void mndBecomeLeader(struct SSyncFSM *pFsm) {
mDebug("vgId:1, become leader"); mDebug("vgId:1, become leader");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
taosWLockLatch(&pMnode->syncMgmt.lock);
if (pMnode->syncMgmt.transId != 0) { if (pMnode->syncMgmt.transId != 0) {
pMnode->syncMgmt.transId = 0; pMnode->syncMgmt.transId = 0;
tsem_post(&pMnode->syncMgmt.syncSem); tsem_post(&pMnode->syncMgmt.syncSem);
} }
taosWUnLockLatch(&pMnode->syncMgmt.lock);
} }
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
...@@ -210,6 +220,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { ...@@ -210,6 +220,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
int32_t mndInitSync(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosInitRWLatch(&pMgmt->lock);
pMgmt->transId = 0;
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg}; SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
...@@ -262,11 +274,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { ...@@ -262,11 +274,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
memcpy(req.pCont, pRaw, req.contLen); memcpy(req.pCont, pRaw, req.contLen);
pMgmt->errCode = 0; pMgmt->errCode = 0;
taosWLockLatch(&pMgmt->lock);
pMgmt->transId = transId; pMgmt->transId = transId;
taosWUnLockLatch(&pMgmt->lock);
mTrace("trans:%d, will be proposed", pMgmt->transId); mTrace("trans:%d, will be proposed", pMgmt->transId);
const bool isWeak = false; const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &req, isWeak); int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
if (code == 0) { if (code == 0) {
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
...@@ -294,10 +309,12 @@ void mndSyncStart(SMnode *pMnode) { ...@@ -294,10 +309,12 @@ void mndSyncStart(SMnode *pMnode) {
} }
void mndSyncStop(SMnode *pMnode) { void mndSyncStop(SMnode *pMnode) {
taosWLockLatch(&pMnode->syncMgmt.lock);
if (pMnode->syncMgmt.transId != 0) { if (pMnode->syncMgmt.transId != 0) {
pMnode->syncMgmt.transId = 0; pMnode->syncMgmt.transId = 0;
tsem_post(&pMnode->syncMgmt.syncSem); tsem_post(&pMnode->syncMgmt.syncSem);
} }
taosWUnLockLatch(&pMnode->syncMgmt.lock);
} }
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册