提交 9da39891 编写于 作者: S Shengliang Guan

refact: post sem in mnode while sync timeout

上级 f137e839
......@@ -88,6 +88,7 @@ typedef struct {
int64_t sync;
int32_t errCode;
int32_t transId;
int32_t transSec;
TdThreadMutex lock;
int8_t selfIndex;
int8_t numOfReplicas;
......
......@@ -26,6 +26,7 @@ int32_t mndInitSync(SMnode *pMnode);
void mndCleanupSync(SMnode *pMnode);
bool mndIsLeader(SMnode *pMnode);
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId);
void mndSyncCheckTimeout(SMnode *pMnode);
void mndSyncStart(SMnode *pMnode);
void mndSyncStop(SMnode *pMnode);
......
......@@ -280,6 +280,7 @@ static void *mndThreadFp(void *param) {
if (sec % (tsStatusInterval * 5) == 0) {
mndCheckDnodeOffline(pMnode);
mndSyncCheckTimeout(pMnode);
}
}
......
......@@ -17,6 +17,8 @@
#include "mndSync.h"
#include "mndTrans.h"
#define TRANS_TIMEOUT_SEC 10
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
......@@ -100,6 +102,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
mInfo("trans:%d, is proposed and post sem", transId);
}
pMgmt->transId = 0;
pMgmt->transSec = 0;
tsem_post(&pMgmt->syncSem);
taosThreadMutexUnlock(&pMgmt->lock);
} else {
......@@ -206,6 +209,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) {
if (pMgmt->transId != 0) {
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
pMgmt->transId = 0;
pMgmt->transSec = 0;
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
tsem_post(&pMgmt->syncSem);
}
......@@ -267,6 +271,7 @@ int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexInit(&pMgmt->lock, NULL);
pMgmt->transId = 0;
pMgmt->transSec = 0;
SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
......@@ -317,6 +322,30 @@ void mndCleanupSync(SMnode *pMnode) {
memset(pMgmt, 0, sizeof(SSyncMgmt));
}
void mndSyncCheckTimeout(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) {
int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pMgmt->transSec;
if (delta > TRANS_TIMEOUT_SEC) {
mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec,
curSec, delta);
pMgmt->transId = 0;
pMgmt->transSec = 0;
terrno = TSDB_CODE_SYN_TIMEOUT;
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
tsem_post(&pMgmt->syncSem);
} else {
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, curSec,
curSec - pMgmt->transSec);
}
} else {
mTrace("check sync timeout msg, no trans waiting for confirm");
}
taosThreadMutexUnlock(&pMgmt->lock);
}
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0;
......@@ -333,11 +362,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosThreadMutexUnlock(&pMgmt->lock);
terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED;
return -1;
return terrno;
}
mInfo("trans:%d, will be proposed", transId);
pMgmt->transId = transId;
pMgmt->transSec = taosGetTimestampSec();
taosThreadMutexUnlock(&pMgmt->lock);
int32_t code = syncPropose(pMgmt->sync, &req, false);
......@@ -348,6 +378,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0;
pMgmt->transSec = 0;
taosThreadMutexUnlock(&pMgmt->lock);
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
......@@ -356,6 +387,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
mError("trans:%d, failed to proposed since %s", transId, terrstr());
taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0;
pMgmt->transSec = 0;
taosThreadMutexUnlock(&pMgmt->lock);
if (terrno == 0) {
terrno = TSDB_CODE_APP_ERROR;
......@@ -389,6 +421,7 @@ void mndSyncStop(SMnode *pMnode) {
if (pMgmt->transId != 0) {
mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId);
pMgmt->transId = 0;
pMgmt->transSec = 0;
tsem_post(&pMgmt->syncSem);
}
taosThreadMutexUnlock(&pMgmt->lock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册