提交 f137e839 编写于 作者: S Shengliang Guan

refact: change SRWLatch to TdThreadMutex in mnode

上级 998b45ef
......@@ -84,14 +84,14 @@ typedef struct {
} STelemMgmt;
typedef struct {
tsem_t syncSem;
int64_t sync;
int32_t errCode;
int32_t transId;
SRWLatch lock;
int8_t selfIndex;
int8_t numOfReplicas;
SReplica replicas[TSDB_MAX_REPLICA];
tsem_t syncSem;
int64_t sync;
int32_t errCode;
int32_t transId;
TdThreadMutex lock;
int8_t selfIndex;
int8_t numOfReplicas;
SReplica replicas[TSDB_MAX_REPLICA];
} SSyncMgmt;
typedef struct {
......
......@@ -89,9 +89,9 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
}
taosWLockLatch(&pMgmt->lock);
taosThreadMutexLock(&pMgmt->lock);
if (transId <= 0) {
taosWUnLockLatch(&pMgmt->lock);
taosThreadMutexUnlock(&pMgmt->lock);
mError("trans:%d, invalid commit msg", transId);
} else if (transId == pMgmt->transId) {
if (pMgmt->errCode != 0) {
......@@ -101,9 +101,9 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
}
pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem);
taosWUnLockLatch(&pMgmt->lock);
taosThreadMutexUnlock(&pMgmt->lock);
} else {
taosWUnLockLatch(&pMgmt->lock);
taosThreadMutexUnlock(&pMgmt->lock);
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("trans:%d, execute in mnode which not leader", transId);
......@@ -198,18 +198,18 @@ int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int3
}
static void mndBecomeFollower(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
mInfo("vgId:1, become follower");
taosWLockLatch(&pMnode->syncMgmt.lock);
if (pMnode->syncMgmt.transId != 0) {
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader",
pMnode->syncMgmt.transId);
pMnode->syncMgmt.transId = 0;
pMnode->syncMgmt.errCode = TSDB_CODE_SYN_NOT_LEADER;
tsem_post(&pMnode->syncMgmt.syncSem);
taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) {
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
pMgmt->transId = 0;
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
tsem_post(&pMgmt->syncSem);
}
taosWUnLockLatch(&pMnode->syncMgmt.lock);
taosThreadMutexUnlock(&pMgmt->lock);
}
static void mndBecomeLeader(const SSyncFSM *pFsm) {
......@@ -265,7 +265,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosInitRWLatch(&pMgmt->lock);
taosThreadMutexInit(&pMgmt->lock, NULL);
pMgmt->transId = 0;
SSyncInfo syncInfo = {
......@@ -313,6 +313,7 @@ void mndCleanupSync(SMnode *pMnode) {
mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
tsem_destroy(&pMgmt->syncSem);
taosThreadMutexDestroy(&pMgmt->lock);
memset(pMgmt, 0, sizeof(SSyncMgmt));
}
......@@ -327,35 +328,35 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
if (req.pCont == NULL) return -1;
memcpy(req.pCont, pRaw, req.contLen);
taosWLockLatch(&pMgmt->lock);
taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock);
taosThreadMutexUnlock(&pMgmt->lock);
terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED;
return -1;
}
mInfo("trans:%d, will be proposed", transId);
pMgmt->transId = transId;
taosWUnLockLatch(&pMgmt->lock);
taosThreadMutexUnlock(&pMgmt->lock);
int32_t code = syncPropose(pMgmt->sync, &req, false);
if (code == 0) {
mInfo("trans:%d, is proposing and wait sem", pMgmt->transId);
mInfo("trans:%d, is proposing and wait sem", transId);
tsem_wait(&pMgmt->syncSem);
} else if (code > 0) {
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
taosWLockLatch(&pMgmt->lock);
taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock);
taosThreadMutexUnlock(&pMgmt->lock);
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0;
} else {
mError("trans:%d, failed to proposed since %s", transId, terrstr());
taosWLockLatch(&pMgmt->lock);
taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock);
taosThreadMutexUnlock(&pMgmt->lock);
if (terrno == 0) {
terrno = TSDB_CODE_APP_ERROR;
}
......@@ -382,13 +383,15 @@ void mndSyncStart(SMnode *pMnode) {
}
void mndSyncStop(SMnode *pMnode) {
taosWLockLatch(&pMnode->syncMgmt.lock);
if (pMnode->syncMgmt.transId != 0) {
mInfo("vgId:1, is stopped and post sem, trans:%d", pMnode->syncMgmt.transId);
pMnode->syncMgmt.transId = 0;
tsem_post(&pMnode->syncMgmt.syncSem);
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) {
mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId);
pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem);
}
taosWUnLockLatch(&pMnode->syncMgmt.lock);
taosThreadMutexUnlock(&pMgmt->lock);
}
bool mndIsLeader(SMnode *pMnode) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册