未验证 提交 00fa4299 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18921 from taosdata/enh/TD-21127

refact: post sem in mnode while sync timeout
...@@ -84,14 +84,15 @@ typedef struct { ...@@ -84,14 +84,15 @@ typedef struct {
} STelemMgmt; } STelemMgmt;
typedef struct { typedef struct {
tsem_t syncSem; tsem_t syncSem;
int64_t sync; int64_t sync;
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
SRWLatch lock; int32_t transSec;
int8_t selfIndex; TdThreadMutex lock;
int8_t numOfReplicas; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; int8_t numOfReplicas;
SReplica replicas[TSDB_MAX_REPLICA];
} SSyncMgmt; } SSyncMgmt;
typedef struct { typedef struct {
......
...@@ -26,6 +26,7 @@ int32_t mndInitSync(SMnode *pMnode); ...@@ -26,6 +26,7 @@ int32_t mndInitSync(SMnode *pMnode);
void mndCleanupSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode);
bool mndIsLeader(SMnode *pMnode); bool mndIsLeader(SMnode *pMnode);
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId);
void mndSyncCheckTimeout(SMnode *pMnode);
void mndSyncStart(SMnode *pMnode); void mndSyncStart(SMnode *pMnode);
void mndSyncStop(SMnode *pMnode); void mndSyncStop(SMnode *pMnode);
......
...@@ -280,6 +280,7 @@ static void *mndThreadFp(void *param) { ...@@ -280,6 +280,7 @@ static void *mndThreadFp(void *param) {
if (sec % (tsStatusInterval * 5) == 0) { if (sec % (tsStatusInterval * 5) == 0) {
mndCheckDnodeOffline(pMnode); mndCheckDnodeOffline(pMnode);
mndSyncCheckTimeout(pMnode);
} }
} }
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
#define TRANS_TIMEOUT_SEC 10
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
return -1; return -1;
...@@ -89,9 +91,9 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta ...@@ -89,9 +91,9 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
} }
taosWLockLatch(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
if (transId <= 0) { if (transId <= 0) {
taosWUnLockLatch(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
mError("trans:%d, invalid commit msg", transId); mError("trans:%d, invalid commit msg", transId);
} else if (transId == pMgmt->transId) { } else if (transId == pMgmt->transId) {
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
...@@ -100,10 +102,11 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta ...@@ -100,10 +102,11 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
mInfo("trans:%d, is proposed and post sem", transId); mInfo("trans:%d, is proposed and post sem", transId);
} }
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
taosWUnLockLatch(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
} else { } else {
taosWUnLockLatch(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) { if (pTrans != NULL) {
mInfo("trans:%d, execute in mnode which not leader", transId); mInfo("trans:%d, execute in mnode which not leader", transId);
...@@ -198,18 +201,19 @@ int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int3 ...@@ -198,18 +201,19 @@ int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int3
} }
static void mndBecomeFollower(const SSyncFSM *pFsm) { static void mndBecomeFollower(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
mInfo("vgId:1, become follower"); mInfo("vgId:1, become follower");
taosWLockLatch(&pMnode->syncMgmt.lock); taosThreadMutexLock(&pMgmt->lock);
if (pMnode->syncMgmt.transId != 0) { if (pMgmt->transId != 0) {
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
pMnode->syncMgmt.transId); pMgmt->transId = 0;
pMnode->syncMgmt.transId = 0; pMgmt->transSec = 0;
pMnode->syncMgmt.errCode = TSDB_CODE_SYN_NOT_LEADER; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
tsem_post(&pMnode->syncMgmt.syncSem); tsem_post(&pMgmt->syncSem);
} }
taosWUnLockLatch(&pMnode->syncMgmt.lock); taosThreadMutexUnlock(&pMgmt->lock);
} }
static void mndBecomeLeader(const SSyncFSM *pFsm) { static void mndBecomeLeader(const SSyncFSM *pFsm) {
...@@ -265,8 +269,9 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { ...@@ -265,8 +269,9 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
int32_t mndInitSync(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosInitRWLatch(&pMgmt->lock); taosThreadMutexInit(&pMgmt->lock, NULL);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0;
SSyncInfo syncInfo = { SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
...@@ -313,9 +318,34 @@ void mndCleanupSync(SMnode *pMnode) { ...@@ -313,9 +318,34 @@ void mndCleanupSync(SMnode *pMnode) {
mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync); mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
tsem_destroy(&pMgmt->syncSem); tsem_destroy(&pMgmt->syncSem);
taosThreadMutexDestroy(&pMgmt->lock);
memset(pMgmt, 0, sizeof(SSyncMgmt)); memset(pMgmt, 0, sizeof(SSyncMgmt));
} }
void mndSyncCheckTimeout(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) {
int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pMgmt->transSec;
if (delta > TRANS_TIMEOUT_SEC) {
mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec,
curSec, delta);
pMgmt->transId = 0;
pMgmt->transSec = 0;
terrno = TSDB_CODE_SYN_TIMEOUT;
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
tsem_post(&pMgmt->syncSem);
} else {
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, curSec,
curSec - pMgmt->transSec);
}
} else {
// mTrace("check sync timeout msg, no trans waiting for confirm");
}
taosThreadMutexUnlock(&pMgmt->lock);
}
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0; pMgmt->errCode = 0;
...@@ -327,35 +357,38 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { ...@@ -327,35 +357,38 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
if (req.pCont == NULL) return -1; if (req.pCont == NULL) return -1;
memcpy(req.pCont, pRaw, req.contLen); memcpy(req.pCont, pRaw, req.contLen);
taosWLockLatch(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED; terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED;
return -1; return terrno;
} }
mInfo("trans:%d, will be proposed", transId); mInfo("trans:%d, will be proposed", transId);
pMgmt->transId = transId; pMgmt->transId = transId;
taosWUnLockLatch(&pMgmt->lock); pMgmt->transSec = taosGetTimestampSec();
taosThreadMutexUnlock(&pMgmt->lock);
int32_t code = syncPropose(pMgmt->sync, &req, false); int32_t code = syncPropose(pMgmt->sync, &req, false);
if (code == 0) { if (code == 0) {
mInfo("trans:%d, is proposing and wait sem", pMgmt->transId); mInfo("trans:%d, is proposing and wait sem", transId);
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
} else if (code > 0) { } else if (code > 0) {
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
taosWLockLatch(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock); pMgmt->transSec = 0;
taosThreadMutexUnlock(&pMgmt->lock);
sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0; code = 0;
} else { } else {
mError("trans:%d, failed to proposed since %s", transId, terrstr()); mError("trans:%d, failed to proposed since %s", transId, terrstr());
taosWLockLatch(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock); pMgmt->transSec = 0;
taosThreadMutexUnlock(&pMgmt->lock);
if (terrno == 0) { if (terrno == 0) {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
} }
...@@ -382,13 +415,17 @@ void mndSyncStart(SMnode *pMnode) { ...@@ -382,13 +415,17 @@ void mndSyncStart(SMnode *pMnode) {
} }
void mndSyncStop(SMnode *pMnode) { void mndSyncStop(SMnode *pMnode) {
taosWLockLatch(&pMnode->syncMgmt.lock); SSyncMgmt *pMgmt = &pMnode->syncMgmt;
if (pMnode->syncMgmt.transId != 0) {
mInfo("vgId:1, is stopped and post sem, trans:%d", pMnode->syncMgmt.transId); taosThreadMutexLock(&pMgmt->lock);
pMnode->syncMgmt.transId = 0; if (pMgmt->transId != 0) {
tsem_post(&pMnode->syncMgmt.syncSem); mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId);
pMgmt->transId = 0;
pMgmt->transSec = 0;
pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
tsem_post(&pMgmt->syncSem);
} }
taosWUnLockLatch(&pMnode->syncMgmt.lock); taosThreadMutexUnlock(&pMgmt->lock);
} }
bool mndIsLeader(SMnode *pMnode) { bool mndIsLeader(SMnode *pMnode) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册