diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 4cf4800472b856fc7da2603677cd33060794d00c..a13d203889fbd36c383f622d6e0d6d29d91d39a6 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -230,7 +230,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo); int32_t syncStart(int64_t rid); void syncStop(int64_t rid); void syncPreStop(int64_t rid); -int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); +int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); @@ -240,6 +240,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm); bool syncIsReadyForRead(int64_t rid); bool syncSnapshotSending(int64_t rid); bool syncSnapshotRecving(int64_t rid); +int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); SSyncState syncGetState(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/include/util/tdef.h b/include/util/tdef.h index 8d1fccfa1124d9bc1c37a3604f807380cb61760e..e1d421a39930368a5f66744c0f9fbf7a07475831 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -497,6 +497,9 @@ enum { // sort page size by default #define DEFAULT_PAGESIZE 4096 +#define VNODE_TIMEOUT_SEC 60 +#define MNODE_TIMEOUT_SEC 10 + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index b38dc19361a0bedf9cf1f22b34d3fc074e1172b4..b5c554e0caf83724e7e2bb8b157793765527a844 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -38,6 +38,8 @@ typedef struct SVnodeMgmt { TdThreadRwlock lock; SVnodesStat state; STfs *pTfs; + TdThread thread; + bool stop; } SVnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 07ebd72379e29d26d2ebbe830f0df9c8458aa969..313a88fc5c60f3ac2ae84b1ca6cf811033f5779c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -334,6 +334,62 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { taosMemoryFree(pMgmt); } +static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) { + int32_t numOfVnodes = 0; + SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); + + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = ppVnodes[i]; + vnodeSyncCheckTimeout(pVnode->pImpl); + vmReleaseVnode(pMgmt, pVnode); + } + + if (ppVnodes != NULL) { + taosMemoryFree(ppVnodes); + } +} + +static void *vmThreadFp(void *param) { + SVnodeMgmt *pMgmt = param; + int64_t lastTime = 0; + setThreadName("vnode-timer"); + + while (1) { + lastTime++; + taosMsleep(100); + if (pMgmt->stop) break; + if (lastTime % 10 != 0) continue; + + int64_t sec = lastTime / 10; + if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) { + vmCheckSyncTimeout(pMgmt); + } + } + + return NULL; +} + +static int32_t vmInitTimer(SVnodeMgmt *pMgmt) { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) { + dError("failed to create vnode timer thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + return 0; +} + +static void vmCleanupTimer(SVnodeMgmt *pMgmt) { + pMgmt->stop = true; + if (taosCheckPthreadValid(pMgmt->thread)) { + taosThreadJoin(pMgmt->thread, NULL); + taosThreadClear(&pMgmt->thread); + } +} + static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { int32_t code = -1; @@ -510,12 +566,10 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) { taosMemoryFree(ppVnodes); } - return 0; + return vmInitTimer(pMgmt); } -static void vmStop(SVnodeMgmt *pMgmt) { - // process inside the vnode -} +static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); } SMgmtFunc vmGetMgmtFunc() { SMgmtFunc mgmtFunc = {0}; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 526fa142eb07b88c94f35cf5cd71c6d30769bddd..785ecc2bf502ca5f2a1e1203f1dc5fbe29b60a21 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -89,6 +89,7 @@ typedef struct { int32_t errCode; int32_t transId; int32_t transSec; + int64_t transSeq; TdThreadMutex lock; int8_t selfIndex; int8_t numOfReplicas; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index e12a908d881951e32ae72afdde0d78f082c45caf..117a9f5e67dd258b5f29a43b92f2ef6867766d2a 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -101,6 +101,7 @@ static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) { } static void mndPullupTrans(SMnode *pMnode) { + mTrace("pullup trans msg"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -110,6 +111,7 @@ static void mndPullupTrans(SMnode *pMnode) { } static void mndPullupTtl(SMnode *pMnode) { + mTrace("pullup ttl"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; @@ -117,6 +119,7 @@ static void mndPullupTtl(SMnode *pMnode) { } static void mndCalMqRebalance(SMnode *pMnode) { + mTrace("calc mq rebalance"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -143,6 +146,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { } static void mndPullupTelem(SMnode *pMnode) { + mTrace("pullup telem msg"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -152,6 +156,7 @@ static void mndPullupTelem(SMnode *pMnode) { } static void mndPullupGrant(SMnode *pMnode) { + mTrace("pullup grant msg"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -162,6 +167,7 @@ static void mndPullupGrant(SMnode *pMnode) { } static void mndIncreaseUpTime(SMnode *pMnode) { + mTrace("increate uptime"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -213,6 +219,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) } static void mndCheckDnodeOffline(SMnode *pMnode) { + mTrace("check dnode offline"); if (mndAcquireRpc(pMnode) != 0) return; SSdb *pSdb = pMnode->pSdb; @@ -280,6 +287,9 @@ static void *mndThreadFp(void *param) { if (sec % (tsStatusInterval * 5) == 0) { mndCheckDnodeOffline(pMnode); + } + + if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) { mndSyncCheckTimeout(pMnode); } } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index ba14f526a9e19346b247285ae0724c6d534249ad..c96faddc4c69c0360ee55c2bae78741ef4c025d8 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,8 +17,6 @@ #include "mndSync.h" #include "mndTrans.h" -#define TRANS_TIMEOUT_SEC 10 - static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { return -1; @@ -80,36 +78,38 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta SSdbRaw *pRaw = pMsg->pCont; int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); - pMgmt->errCode = pMeta->code; mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 - " role:%s raw:%p", + " role:%s raw:%p sec:%d seq:%" PRId64, transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), - pRaw); + pRaw, pMgmt->transSec, pMgmt->transSeq); - if (pMgmt->errCode == 0) { + if (pMeta->code == 0) { sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); } taosThreadMutexLock(&pMgmt->lock); + pMgmt->errCode = pMeta->code; + if (transId <= 0) { taosThreadMutexUnlock(&pMgmt->lock); - mError("trans:%d, invalid commit msg", transId); + mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq); } else if (transId == pMgmt->transId) { if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); } else { - mInfo("trans:%d, is proposed and post sem", transId); + mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq); } pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; tsem_post(&pMgmt->syncSem); taosThreadMutexUnlock(&pMgmt->lock); } else { taosThreadMutexUnlock(&pMgmt->lock); STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { - mInfo("trans:%d, execute in mnode which not leader", transId); + mInfo("trans:%d, execute in mnode which not leader or sync timeout", transId); mndTransExecute(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); @@ -210,6 +210,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; tsem_post(&pMgmt->syncSem); } @@ -272,6 +273,7 @@ int32_t mndInitSync(SMnode *pMnode) { taosThreadMutexInit(&pMgmt->lock, NULL); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; SSyncInfo syncInfo = { .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, @@ -323,22 +325,24 @@ void mndCleanupSync(SMnode *pMnode) { } void mndSyncCheckTimeout(SMnode *pMnode) { + mTrace("check sync timeout"); SSyncMgmt *pMgmt = &pMnode->syncMgmt; taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { int32_t curSec = taosGetTimestampSec(); int32_t delta = curSec - pMgmt->transSec; - if (delta > TRANS_TIMEOUT_SEC) { - mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, - curSec, delta); + if (delta > MNODE_TIMEOUT_SEC) { + mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, + pMgmt->transSec, curSec, delta, pMgmt->transSeq); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; terrno = TSDB_CODE_SYN_TIMEOUT; pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; tsem_post(&pMgmt->syncSem); } else { - mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, curSec, - curSec - pMgmt->transSec); + mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, + pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq); } } else { // mTrace("check sync timeout msg, no trans waiting for confirm"); @@ -348,7 +352,6 @@ void mndSyncCheckTimeout(SMnode *pMnode) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - pMgmt->errCode = 0; SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; if (req.contLen <= 0) return -1; @@ -358,6 +361,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { memcpy(req.pCont, pRaw, req.contLen); taosThreadMutexLock(&pMgmt->lock); + pMgmt->errCode = 0; + if (pMgmt->transId != 0) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosThreadMutexUnlock(&pMgmt->lock); @@ -368,26 +373,28 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { mInfo("trans:%d, will be proposed", transId); pMgmt->transId = transId; pMgmt->transSec = taosGetTimestampSec(); - taosThreadMutexUnlock(&pMgmt->lock); - int32_t code = syncPropose(pMgmt->sync, &req, false); + int64_t seq = 0; + int32_t code = syncPropose(pMgmt->sync, &req, false, &seq); if (code == 0) { - mInfo("trans:%d, is proposing and wait sem", transId); + mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq); + pMgmt->transSeq = seq; + taosThreadMutexUnlock(&pMgmt->lock); tsem_wait(&pMgmt->syncSem); } else if (code > 0) { mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); - taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; taosThreadMutexUnlock(&pMgmt->lock); sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); code = 0; } else { mError("trans:%d, failed to proposed since %s", transId, terrstr()); - taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; taosThreadMutexUnlock(&pMgmt->lock); if (terrno == 0) { terrno = TSDB_CODE_APP_ERROR; @@ -402,7 +409,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { } terrno = pMgmt->errCode; - return pMgmt->errCode; + return terrno; } void mndSyncStart(SMnode *pMnode) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 99260ffefdd2a30c857c7e8675849246039936fc..540f0c3127bd26d12064bfca9eb0b135dae8622f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -54,6 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); +void vnodeSyncCheckTimeout(SVnode* pVnode); void vnodeClose(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8cf212cb1d084c20870ccfd13d6d2168f4cc9250..958200da4c612bc0454e2ca7af4fd0b8739f4fa6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -322,29 +322,32 @@ struct STsdbKeepCfg { }; struct SVnode { - char* path; - SVnodeCfg config; - SVState state; - SVStatis statis; - STfs* pTfs; - SMsgCb msgCb; - TdThreadMutex mutex; - TdThreadCond poolNotEmpty; - SVBufPool* pPool; - SVBufPool* inUse; - SMeta* pMeta; - SSma* pSma; - STsdb* pTsdb; - SWal* pWal; - STQ* pTq; - SSink* pSink; - tsem_t canCommit; - int64_t sync; - TdThreadMutex lock; - bool blocked; - bool restored; - tsem_t syncSem; - SQHandle* pQuery; + char* path; + SVnodeCfg config; + SVState state; + SVStatis statis; + STfs* pTfs; + SMsgCb msgCb; + TdThreadMutex mutex; + TdThreadCond poolNotEmpty; + SVBufPool* pPool; + SVBufPool* inUse; + SMeta* pMeta; + SSma* pSma; + STsdb* pTsdb; + SWal* pWal; + STQ* pTq; + SSink* pSink; + tsem_t canCommit; + int64_t sync; + TdThreadMutex lock; + bool blocked; + bool restored; + tsem_t syncSem; + int32_t blockSec; + int64_t blockSeq; + SRpcHandleInfo blockInfo; + SQHandle* pQuery; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index aa215a852fa5169a4a4aefc09a228be8ee7d1891..8e907261ffbef0d1aa3a14eee59e5c7fe7136c4a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -22,32 +22,21 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg, + TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq); tsem_wait(&pVnode->syncSem); } -static inline void vnodeWaitBlockMsgOld(SVnode *pVnode, const SRpcMsg *pMsg) { - if (vnodeIsMsgBlock(pMsg->msgType)) { - const STraceId *trace = &pMsg->info.traceId; - taosThreadMutexLock(&pVnode->lock); - if (!pVnode->blocked) { - vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - pVnode->blocked = true; - taosThreadMutexUnlock(&pVnode->lock); - tsem_wait(&pVnode->syncSem); - } else { - taosThreadMutexUnlock(&pVnode->lock); - } - } -} - static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; taosThreadMutexLock(&pVnode->lock); if (pVnode->blocked) { - vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg, + TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq); pVnode->blocked = false; + pVnode->blockSec = 0; + pVnode->blockSeq = 0; tsem_post(&pVnode->syncSem); } taosThreadMutexUnlock(&pVnode->lock); @@ -217,12 +206,17 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) #else static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) { + int64_t seq = 0; + taosThreadMutexLock(&pVnode->lock); - int32_t code = syncPropose(pVnode->sync, pMsg, isWeak); + int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq); bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType)); if (wait) { ASSERT(!pVnode->blocked); pVnode->blocked = true; + pVnode->blockSec = taosGetTimestampSec(); + pVnode->blockSeq = seq; + pVnode->blockInfo = pMsg->info; } taosThreadMutexUnlock(&pVnode->lock); @@ -289,12 +283,15 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { for (int32_t i = 0; i < numOfMsgs; ++i) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg, - TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex); if (vnodeIsMsgBlock(pMsg->msgType)) { - vTrace("vgId:%d, blocking msg obtained from apply-queue. index:%" PRId64 ", term: %" PRId64 ", type: %s", vgId, - pMsg->info.conn.applyIndex, pMsg->info.conn.applyTerm, TMSG_INFO(pMsg->msgType)); + vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64 + ", blocking msg obtained sec:%d seq:%" PRId64, + vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec, + pVnode->blockSeq); + } else { + vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg, + TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex); } SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; @@ -621,6 +618,30 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } +void vnodeSyncCheckTimeout(SVnode *pVnode) { + vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId); + taosThreadMutexLock(&pVnode->lock); + if (pVnode->blocked) { + int32_t curSec = taosGetTimestampSec(); + int32_t delta = curSec - pVnode->blockSec; + if (delta > VNODE_TIMEOUT_SEC) { + vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64, + pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq); + if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) { + SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo}; + vInfo("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq, + rpcMsg.info.handle, rpcMsg.info.ahandle); + rpcSendResponse(&rpcMsg); + } + pVnode->blocked = false; + pVnode->blockSec = 0; + pVnode->blockSeq = 0; + tsem_post(&pVnode->syncSem); + } + } + taosThreadMutexUnlock(&pVnode->lock); +} + bool vnodeIsRoleLeader(SVnode *pVnode) { SSyncState state = syncGetState(pVnode->sync); return state.state == TAOS_SYNC_STATE_LEADER; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index dee0bf95c7074e1bcbd06cfeb86e9b194a740375..a5524ffbdefd1040c086108ac6a1593c499d6fce 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -215,7 +215,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode); int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode); -int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); +int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq); int32_t syncNodeRestore(SSyncNode* pSyncNode); void syncHbTimerDataFree(SSyncHbTimerData* pData); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6fabab18cb6cc3ee0b68043b7cf6d6e73a5b2cf3..d6ce77193a7c38a670cd704634f91f14c7f60d6a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -151,7 +151,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { } syncNodeStartHeartbeatTimer(pSyncNode); - //syncNodeReplicate(pSyncNode); + // syncNodeReplicate(pSyncNode); } syncNodeRelease(pSyncNode); @@ -218,6 +218,26 @@ int32_t syncLeaderTransfer(int64_t rid) { return ret; } +int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { + SSyncNode* pNode = syncNodeAcquire(rid); + if (pNode == NULL) return -1; + + SRpcMsg rpcMsg = {0}; + int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info); + rpcMsg.code = TSDB_CODE_SYN_TIMEOUT; + + syncNodeRelease(pNode); + if (ret == 1) { + sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, + rpcMsg.info.ahandle); + rpcSendResponse(&rpcMsg); + return 0; + } else { + sInfo("no rpcinfo to send timeout response, seq:%" PRId64, seq); + return -1; + } +} + SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { SyncIndex minMatchIndex = SYNC_INDEX_INVALID; @@ -538,7 +558,7 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { pMsg->newLeaderId.vgId = pSyncNode->vgId; pMsg->newNodeInfo = newLeader; - int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false); + int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL); rpcFreeCont(rpcMsg.pCont); return ret; } @@ -670,19 +690,19 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { syncNodeRelease(pSyncNode); } -int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { +int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { sError("sync propose error"); return -1; } - int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); + int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq); syncNodeRelease(pSyncNode); return ret; } -int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { +int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { terrno = TSDB_CODE_SYN_NOT_LEADER; sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType)); @@ -739,6 +759,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); } + if (seq != NULL) *seq = seqNum; return code; } } diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 057f2ea6dd7f38241fcc0759113e65e513d020d3..7a5d0777bb0ba00f3d5f4507088138d8c776fe15 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -337,7 +337,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); - int32_t ret = syncPropose(rid, pRpcMsg, false); + int32_t ret = syncPropose(rid, pRpcMsg, false, NULL); if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index bab3d2236f3a33cd40bf356cb60227a6731ef022..f35a23b15b3d49ecbbaad922f3bb005ec7f22865 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -249,7 +249,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); - int32_t ret = syncPropose(rid, pRpcMsg, false); + int32_t ret = syncPropose(rid, pRpcMsg, false, NULL); if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 4a82bba15d959f545a333d84a4af8ed7b62e0225..22132eb01f68d92449bcd5d9b808a7e480fbb39c 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -189,7 +189,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); - int32_t ret = syncPropose(rid, pRpcMsg, false); + int32_t ret = syncPropose(rid, pRpcMsg, false, NULL); if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 8c486df118f26b53264d8830b9d3bfc8292adb14..4bc2e92d0c7a55ac6dd5d3c6aa697acad3e57d53 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -396,7 +396,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); - int32_t ret = syncPropose(rid, pRpcMsg, false); + int32_t ret = syncPropose(rid, pRpcMsg, false, NULL); if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); } else {