未验证 提交 6a7c1d6b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18940 from taosdata/enh/TD-21127

refact: post sem in vnode while sync timeout
...@@ -230,7 +230,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo); ...@@ -230,7 +230,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo);
int32_t syncStart(int64_t rid); int32_t syncStart(int64_t rid);
void syncStop(int64_t rid); void syncStop(int64_t rid);
void syncPreStop(int64_t rid); void syncPreStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
...@@ -240,6 +240,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm); ...@@ -240,6 +240,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
bool syncIsReadyForRead(int64_t rid); bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid); bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid); bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
SSyncState syncGetState(int64_t rid); SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
...@@ -497,6 +497,9 @@ enum { ...@@ -497,6 +497,9 @@ enum {
// sort page size by default // sort page size by default
#define DEFAULT_PAGESIZE 4096 #define DEFAULT_PAGESIZE 4096
#define VNODE_TIMEOUT_SEC 60
#define MNODE_TIMEOUT_SEC 10
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -38,6 +38,8 @@ typedef struct SVnodeMgmt { ...@@ -38,6 +38,8 @@ typedef struct SVnodeMgmt {
TdThreadRwlock lock; TdThreadRwlock lock;
SVnodesStat state; SVnodesStat state;
STfs *pTfs; STfs *pTfs;
TdThread thread;
bool stop;
} SVnodeMgmt; } SVnodeMgmt;
typedef struct { typedef struct {
......
...@@ -334,6 +334,62 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { ...@@ -334,6 +334,62 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
} }
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
int32_t numOfVnodes = 0;
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = ppVnodes[i];
vnodeSyncCheckTimeout(pVnode->pImpl);
vmReleaseVnode(pMgmt, pVnode);
}
if (ppVnodes != NULL) {
taosMemoryFree(ppVnodes);
}
}
static void *vmThreadFp(void *param) {
SVnodeMgmt *pMgmt = param;
int64_t lastTime = 0;
setThreadName("vnode-timer");
while (1) {
lastTime++;
taosMsleep(100);
if (pMgmt->stop) break;
if (lastTime % 10 != 0) continue;
int64_t sec = lastTime / 10;
if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
vmCheckSyncTimeout(pMgmt);
}
}
return NULL;
}
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
dError("failed to create vnode timer thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
return 0;
}
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
pMgmt->stop = true;
if (taosCheckPthreadValid(pMgmt->thread)) {
taosThreadJoin(pMgmt->thread, NULL);
taosThreadClear(&pMgmt->thread);
}
}
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
int32_t code = -1; int32_t code = -1;
...@@ -510,12 +566,10 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) { ...@@ -510,12 +566,10 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
taosMemoryFree(ppVnodes); taosMemoryFree(ppVnodes);
} }
return 0; return vmInitTimer(pMgmt);
} }
static void vmStop(SVnodeMgmt *pMgmt) { static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
// process inside the vnode
}
SMgmtFunc vmGetMgmtFunc() { SMgmtFunc vmGetMgmtFunc() {
SMgmtFunc mgmtFunc = {0}; SMgmtFunc mgmtFunc = {0};
......
...@@ -89,6 +89,7 @@ typedef struct { ...@@ -89,6 +89,7 @@ typedef struct {
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
int32_t transSec; int32_t transSec;
int64_t transSeq;
TdThreadMutex lock; TdThreadMutex lock;
int8_t selfIndex; int8_t selfIndex;
int8_t numOfReplicas; int8_t numOfReplicas;
......
...@@ -101,6 +101,7 @@ static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) { ...@@ -101,6 +101,7 @@ static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) {
} }
static void mndPullupTrans(SMnode *pMnode) { static void mndPullupTrans(SMnode *pMnode) {
mTrace("pullup trans msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
...@@ -110,6 +111,7 @@ static void mndPullupTrans(SMnode *pMnode) { ...@@ -110,6 +111,7 @@ static void mndPullupTrans(SMnode *pMnode) {
} }
static void mndPullupTtl(SMnode *pMnode) { static void mndPullupTtl(SMnode *pMnode) {
mTrace("pullup ttl");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
...@@ -117,6 +119,7 @@ static void mndPullupTtl(SMnode *pMnode) { ...@@ -117,6 +119,7 @@ static void mndPullupTtl(SMnode *pMnode) {
} }
static void mndCalMqRebalance(SMnode *pMnode) { static void mndCalMqRebalance(SMnode *pMnode) {
mTrace("calc mq rebalance");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
...@@ -143,6 +146,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { ...@@ -143,6 +146,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
} }
static void mndPullupTelem(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) {
mTrace("pullup telem msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
...@@ -152,6 +156,7 @@ static void mndPullupTelem(SMnode *pMnode) { ...@@ -152,6 +156,7 @@ static void mndPullupTelem(SMnode *pMnode) {
} }
static void mndPullupGrant(SMnode *pMnode) { static void mndPullupGrant(SMnode *pMnode) {
mTrace("pullup grant msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
...@@ -162,6 +167,7 @@ static void mndPullupGrant(SMnode *pMnode) { ...@@ -162,6 +167,7 @@ static void mndPullupGrant(SMnode *pMnode) {
} }
static void mndIncreaseUpTime(SMnode *pMnode) { static void mndIncreaseUpTime(SMnode *pMnode) {
mTrace("increate uptime");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
...@@ -213,6 +219,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) ...@@ -213,6 +219,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
} }
static void mndCheckDnodeOffline(SMnode *pMnode) { static void mndCheckDnodeOffline(SMnode *pMnode) {
mTrace("check dnode offline");
if (mndAcquireRpc(pMnode) != 0) return; if (mndAcquireRpc(pMnode) != 0) return;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -280,6 +287,9 @@ static void *mndThreadFp(void *param) { ...@@ -280,6 +287,9 @@ static void *mndThreadFp(void *param) {
if (sec % (tsStatusInterval * 5) == 0) { if (sec % (tsStatusInterval * 5) == 0) {
mndCheckDnodeOffline(pMnode); mndCheckDnodeOffline(pMnode);
}
if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) {
mndSyncCheckTimeout(pMnode); mndSyncCheckTimeout(pMnode);
} }
} }
......
...@@ -17,8 +17,6 @@ ...@@ -17,8 +17,6 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
#define TRANS_TIMEOUT_SEC 10
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
return -1; return -1;
...@@ -80,36 +78,38 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta ...@@ -80,36 +78,38 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = pMeta->code;
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p", " role:%s raw:%p sec:%d seq:%" PRId64,
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw); pRaw, pMgmt->transSec, pMgmt->transSeq);
if (pMgmt->errCode == 0) { if (pMeta->code == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
} }
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = pMeta->code;
if (transId <= 0) { if (transId <= 0) {
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
mError("trans:%d, invalid commit msg", transId); mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
} else if (transId == pMgmt->transId) { } else if (transId == pMgmt->transId) {
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
} else { } else {
mInfo("trans:%d, is proposed and post sem", transId); mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
} }
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
} else { } else {
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) { if (pTrans != NULL) {
mInfo("trans:%d, execute in mnode which not leader", transId); mInfo("trans:%d, execute in mnode which not leader or sync timeout", transId);
mndTransExecute(pMnode, pTrans); mndTransExecute(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
...@@ -210,6 +210,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { ...@@ -210,6 +210,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) {
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} }
...@@ -272,6 +273,7 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -272,6 +273,7 @@ int32_t mndInitSync(SMnode *pMnode) {
taosThreadMutexInit(&pMgmt->lock, NULL); taosThreadMutexInit(&pMgmt->lock, NULL);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
SSyncInfo syncInfo = { SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
...@@ -323,22 +325,24 @@ void mndCleanupSync(SMnode *pMnode) { ...@@ -323,22 +325,24 @@ void mndCleanupSync(SMnode *pMnode) {
} }
void mndSyncCheckTimeout(SMnode *pMnode) { void mndSyncCheckTimeout(SMnode *pMnode) {
mTrace("check sync timeout");
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
int32_t curSec = taosGetTimestampSec(); int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pMgmt->transSec; int32_t delta = curSec - pMgmt->transSec;
if (delta > TRANS_TIMEOUT_SEC) { if (delta > MNODE_TIMEOUT_SEC) {
mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
curSec, delta); pMgmt->transSec, curSec, delta, pMgmt->transSeq);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
terrno = TSDB_CODE_SYN_TIMEOUT; terrno = TSDB_CODE_SYN_TIMEOUT;
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} else { } else {
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, curSec, mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
curSec - pMgmt->transSec); pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
} }
} else { } else {
// mTrace("check sync timeout msg, no trans waiting for confirm"); // mTrace("check sync timeout msg, no trans waiting for confirm");
...@@ -348,7 +352,6 @@ void mndSyncCheckTimeout(SMnode *pMnode) { ...@@ -348,7 +352,6 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0;
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
if (req.contLen <= 0) return -1; if (req.contLen <= 0) return -1;
...@@ -358,6 +361,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { ...@@ -358,6 +361,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
memcpy(req.pCont, pRaw, req.contLen); memcpy(req.pCont, pRaw, req.contLen);
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = 0;
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
...@@ -368,26 +373,28 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { ...@@ -368,26 +373,28 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
mInfo("trans:%d, will be proposed", transId); mInfo("trans:%d, will be proposed", transId);
pMgmt->transId = transId; pMgmt->transId = transId;
pMgmt->transSec = taosGetTimestampSec(); pMgmt->transSec = taosGetTimestampSec();
taosThreadMutexUnlock(&pMgmt->lock);
int32_t code = syncPropose(pMgmt->sync, &req, false); int64_t seq = 0;
int32_t code = syncPropose(pMgmt->sync, &req, false, &seq);
if (code == 0) { if (code == 0) {
mInfo("trans:%d, is proposing and wait sem", transId); mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
pMgmt->transSeq = seq;
taosThreadMutexUnlock(&pMgmt->lock);
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
} else if (code > 0) { } else if (code > 0) {
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0; code = 0;
} else { } else {
mError("trans:%d, failed to proposed since %s", transId, terrstr()); mError("trans:%d, failed to proposed since %s", transId, terrstr());
taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
if (terrno == 0) { if (terrno == 0) {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
...@@ -402,7 +409,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { ...@@ -402,7 +409,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
} }
terrno = pMgmt->errCode; terrno = pMgmt->errCode;
return pMgmt->errCode; return terrno;
} }
void mndSyncStart(SMnode *pMnode) { void mndSyncStart(SMnode *pMnode) {
......
...@@ -54,6 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); ...@@ -54,6 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode); void vnodePreClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode* pVnode);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int32_t vnodeStart(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode);
......
...@@ -344,6 +344,9 @@ struct SVnode { ...@@ -344,6 +344,9 @@ struct SVnode {
bool blocked; bool blocked;
bool restored; bool restored;
tsem_t syncSem; tsem_t syncSem;
int32_t blockSec;
int64_t blockSeq;
SRpcHandleInfo blockInfo;
SQHandle* pQuery; SQHandle* pQuery;
}; };
......
...@@ -22,32 +22,21 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } ...@@ -22,32 +22,21 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
tsem_wait(&pVnode->syncSem); tsem_wait(&pVnode->syncSem);
} }
static inline void vnodeWaitBlockMsgOld(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) {
const STraceId *trace = &pMsg->info.traceId;
taosThreadMutexLock(&pVnode->lock);
if (!pVnode->blocked) {
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
pVnode->blocked = true;
taosThreadMutexUnlock(&pVnode->lock);
tsem_wait(&pVnode->syncSem);
} else {
taosThreadMutexUnlock(&pVnode->lock);
}
}
}
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) { if (vnodeIsMsgBlock(pMsg->msgType)) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
taosThreadMutexLock(&pVnode->lock); taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) { if (pVnode->blocked) {
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
pVnode->blocked = false; pVnode->blocked = false;
pVnode->blockSec = 0;
pVnode->blockSeq = 0;
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
taosThreadMutexUnlock(&pVnode->lock); taosThreadMutexUnlock(&pVnode->lock);
...@@ -217,12 +206,17 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -217,12 +206,17 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
#else #else
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) { static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
int64_t seq = 0;
taosThreadMutexLock(&pVnode->lock); taosThreadMutexLock(&pVnode->lock);
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak); int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType)); bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
if (wait) { if (wait) {
ASSERT(!pVnode->blocked); ASSERT(!pVnode->blocked);
pVnode->blocked = true; pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
pVnode->blockInfo = pMsg->info;
} }
taosThreadMutexUnlock(&pVnode->lock); taosThreadMutexUnlock(&pVnode->lock);
...@@ -289,12 +283,15 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -289,12 +283,15 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
if (vnodeIsMsgBlock(pMsg->msgType)) { if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, blocking msg obtained from apply-queue. index:%" PRId64 ", term: %" PRId64 ", type: %s", vgId, vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
pMsg->info.conn.applyIndex, pMsg->info.conn.applyTerm, TMSG_INFO(pMsg->msgType)); ", blocking msg obtained sec:%d seq:%" PRId64,
vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
pVnode->blockSeq);
} else {
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
} }
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
...@@ -621,6 +618,30 @@ void vnodeSyncClose(SVnode *pVnode) { ...@@ -621,6 +618,30 @@ void vnodeSyncClose(SVnode *pVnode) {
syncStop(pVnode->sync); syncStop(pVnode->sync);
} }
void vnodeSyncCheckTimeout(SVnode *pVnode) {
vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pVnode->blockSec;
if (delta > VNODE_TIMEOUT_SEC) {
vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
vInfo("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
rpcMsg.info.handle, rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg);
}
pVnode->blocked = false;
pVnode->blockSec = 0;
pVnode->blockSeq = 0;
tsem_post(&pVnode->syncSem);
}
}
taosThreadMutexUnlock(&pVnode->lock);
}
bool vnodeIsRoleLeader(SVnode *pVnode) { bool vnodeIsRoleLeader(SVnode *pVnode) {
SSyncState state = syncGetState(pVnode->sync); SSyncState state = syncGetState(pVnode->sync);
return state.state == TAOS_SYNC_STATE_LEADER; return state.state == TAOS_SYNC_STATE_LEADER;
......
...@@ -215,7 +215,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode); ...@@ -215,7 +215,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode);
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq);
int32_t syncNodeRestore(SSyncNode* pSyncNode); int32_t syncNodeRestore(SSyncNode* pSyncNode);
void syncHbTimerDataFree(SSyncHbTimerData* pData); void syncHbTimerDataFree(SSyncHbTimerData* pData);
......
...@@ -151,7 +151,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { ...@@ -151,7 +151,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
} }
syncNodeStartHeartbeatTimer(pSyncNode); syncNodeStartHeartbeatTimer(pSyncNode);
//syncNodeReplicate(pSyncNode); // syncNodeReplicate(pSyncNode);
} }
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
...@@ -218,6 +218,26 @@ int32_t syncLeaderTransfer(int64_t rid) { ...@@ -218,6 +218,26 @@ int32_t syncLeaderTransfer(int64_t rid) {
return ret; return ret;
} }
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return -1;
SRpcMsg rpcMsg = {0};
int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
syncNodeRelease(pNode);
if (ret == 1) {
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle,
rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg);
return 0;
} else {
sInfo("no rpcinfo to send timeout response, seq:%" PRId64, seq);
return -1;
}
}
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
SyncIndex minMatchIndex = SYNC_INDEX_INVALID; SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
...@@ -538,7 +558,7 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { ...@@ -538,7 +558,7 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
pMsg->newLeaderId.vgId = pSyncNode->vgId; pMsg->newLeaderId.vgId = pSyncNode->vgId;
pMsg->newNodeInfo = newLeader; pMsg->newNodeInfo = newLeader;
int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false); int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
return ret; return ret;
} }
...@@ -670,19 +690,19 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { ...@@ -670,19 +690,19 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} }
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sError("sync propose error"); sError("sync propose error");
return -1; return -1;
} }
int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
return ret; return ret;
} }
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType)); sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
...@@ -739,6 +759,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -739,6 +759,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
} }
if (seq != NULL) *seq = seqNum;
return code; return code;
} }
} }
......
...@@ -337,7 +337,7 @@ int main(int argc, char** argv) { ...@@ -337,7 +337,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) { if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false); int32_t ret = syncPropose(rid, pRpcMsg, false, NULL);
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend); sTrace("%s value%d write not leader", s, alreadySend);
} else { } else {
......
...@@ -249,7 +249,7 @@ int main(int argc, char** argv) { ...@@ -249,7 +249,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) { if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false); int32_t ret = syncPropose(rid, pRpcMsg, false, NULL);
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend); sTrace("%s value%d write not leader", s, alreadySend);
} else { } else {
......
...@@ -189,7 +189,7 @@ int main(int argc, char** argv) { ...@@ -189,7 +189,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) { if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false); int32_t ret = syncPropose(rid, pRpcMsg, false, NULL);
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend); sTrace("%s value%d write not leader", s, alreadySend);
} else { } else {
......
...@@ -396,7 +396,7 @@ int main(int argc, char** argv) { ...@@ -396,7 +396,7 @@ int main(int argc, char** argv) {
if (alreadySend < writeRecordNum) { if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false); int32_t ret = syncPropose(rid, pRpcMsg, false, NULL);
if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait);
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册