提交 f623837b 编写于 作者: S Shengliang Guan

refact: adjust mnode timer

上级 2b256542
...@@ -89,6 +89,7 @@ typedef struct { ...@@ -89,6 +89,7 @@ typedef struct {
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
int32_t transSec; int32_t transSec;
int64_t transSeq;
TdThreadMutex lock; TdThreadMutex lock;
int8_t selfIndex; int8_t selfIndex;
int8_t numOfReplicas; int8_t numOfReplicas;
......
...@@ -280,6 +280,9 @@ static void *mndThreadFp(void *param) { ...@@ -280,6 +280,9 @@ static void *mndThreadFp(void *param) {
if (sec % (tsStatusInterval * 5) == 0) { if (sec % (tsStatusInterval * 5) == 0) {
mndCheckDnodeOffline(pMnode); mndCheckDnodeOffline(pMnode);
}
if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) {
mndSyncCheckTimeout(pMnode); mndSyncCheckTimeout(pMnode);
} }
} }
......
...@@ -17,8 +17,6 @@ ...@@ -17,8 +17,6 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
#define TRANS_TIMEOUT_SEC 10
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
return -1; return -1;
...@@ -103,6 +101,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta ...@@ -103,6 +101,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
} }
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
} else { } else {
...@@ -210,6 +209,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { ...@@ -210,6 +209,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) {
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} }
...@@ -272,6 +272,7 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -272,6 +272,7 @@ int32_t mndInitSync(SMnode *pMnode) {
taosThreadMutexInit(&pMgmt->lock, NULL); taosThreadMutexInit(&pMgmt->lock, NULL);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
SSyncInfo syncInfo = { SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
...@@ -328,17 +329,18 @@ void mndSyncCheckTimeout(SMnode *pMnode) { ...@@ -328,17 +329,18 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
int32_t curSec = taosGetTimestampSec(); int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pMgmt->transSec; int32_t delta = curSec - pMgmt->transSec;
if (delta > TRANS_TIMEOUT_SEC) { if (delta > MNODE_TIMEOUT_SEC) {
mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
curSec, delta); pMgmt->transSec, curSec, delta, pMgmt->transSeq);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
terrno = TSDB_CODE_SYN_TIMEOUT; terrno = TSDB_CODE_SYN_TIMEOUT;
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} else { } else {
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, curSec, mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
curSec - pMgmt->transSec); pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
} }
} else { } else {
// mTrace("check sync timeout msg, no trans waiting for confirm"); // mTrace("check sync timeout msg, no trans waiting for confirm");
...@@ -368,26 +370,28 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { ...@@ -368,26 +370,28 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
mInfo("trans:%d, will be proposed", transId); mInfo("trans:%d, will be proposed", transId);
pMgmt->transId = transId; pMgmt->transId = transId;
pMgmt->transSec = taosGetTimestampSec(); pMgmt->transSec = taosGetTimestampSec();
taosThreadMutexUnlock(&pMgmt->lock);
int32_t code = syncPropose(pMgmt->sync, &req, false); int64_t seq = 0;
int32_t code = syncPropose(pMgmt->sync, &req, false, &seq);
if (code == 0) { if (code == 0) {
mInfo("trans:%d, is proposing and wait sem", transId); mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
pMgmt->transSeq = seq;
taosThreadMutexUnlock(&pMgmt->lock);
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
} else if (code > 0) { } else if (code > 0) {
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0; code = 0;
} else { } else {
mError("trans:%d, failed to proposed since %s", transId, terrstr()); mError("trans:%d, failed to proposed since %s", transId, terrstr());
taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
if (terrno == 0) { if (terrno == 0) {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册