diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 1e1ed3bddcc0fc5ce374c7d3e73524c545d1fc3c..a13d203889fbd36c383f622d6e0d6d29d91d39a6 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -240,7 +240,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm); bool syncIsReadyForRead(int64_t rid); bool syncSnapshotSending(int64_t rid); bool syncSnapshotRecving(int64_t rid); -void syncSendTimeoutRsp(int64_t rid, int64_t seq); +int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); SSyncState syncGetState(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e025626ecf993d2a09772d0a5107671b20d795ce..958200da4c612bc0454e2ca7af4fd0b8739f4fa6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -322,31 +322,32 @@ struct STsdbKeepCfg { }; struct SVnode { - char* path; - SVnodeCfg config; - SVState state; - SVStatis statis; - STfs* pTfs; - SMsgCb msgCb; - TdThreadMutex mutex; - TdThreadCond poolNotEmpty; - SVBufPool* pPool; - SVBufPool* inUse; - SMeta* pMeta; - SSma* pSma; - STsdb* pTsdb; - SWal* pWal; - STQ* pTq; - SSink* pSink; - tsem_t canCommit; - int64_t sync; - TdThreadMutex lock; - bool blocked; - bool restored; - tsem_t syncSem; - int32_t blockSec; - int64_t blockSeq; - SQHandle* pQuery; + char* path; + SVnodeCfg config; + SVState state; + SVStatis statis; + STfs* pTfs; + SMsgCb msgCb; + TdThreadMutex mutex; + TdThreadCond poolNotEmpty; + SVBufPool* pPool; + SVBufPool* inUse; + SMeta* pMeta; + SSma* pSma; + STsdb* pTsdb; + SWal* pWal; + STQ* pTq; + SSink* pSink; + tsem_t canCommit; + int64_t sync; + TdThreadMutex lock; + bool blocked; + bool restored; + tsem_t syncSem; + int32_t blockSec; + int64_t blockSeq; + SRpcHandleInfo blockInfo; + SQHandle* pQuery; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 36eb6b293ad5f4d09ff21f83b3865a1a54db2c14..8e907261ffbef0d1aa3a14eee59e5c7fe7136c4a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -32,8 +32,11 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; taosThreadMutexLock(&pVnode->lock); if (pVnode->blocked) { - vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg, + TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq); pVnode->blocked = false; + pVnode->blockSec = 0; + pVnode->blockSeq = 0; tsem_post(&pVnode->syncSem); } taosThreadMutexUnlock(&pVnode->lock); @@ -213,6 +216,7 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak pVnode->blocked = true; pVnode->blockSec = taosGetTimestampSec(); pVnode->blockSeq = seq; + pVnode->blockInfo = pMsg->info; } taosThreadMutexUnlock(&pVnode->lock); @@ -279,12 +283,15 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { for (int32_t i = 0; i < numOfMsgs; ++i) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg, - TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex); if (vnodeIsMsgBlock(pMsg->msgType)) { - vTrace("vgId:%d, blocking msg obtained from apply-queue. index:%" PRId64 ", term: %" PRId64 ", type: %s", vgId, - pMsg->info.conn.applyIndex, pMsg->info.conn.applyTerm, TMSG_INFO(pMsg->msgType)); + vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64 + ", blocking msg obtained sec:%d seq:%" PRId64, + vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec, + pVnode->blockSeq); + } else { + vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg, + TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex); } SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; @@ -618,9 +625,14 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) { int32_t curSec = taosGetTimestampSec(); int32_t delta = curSec - pVnode->blockSec; if (delta > VNODE_TIMEOUT_SEC) { - syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq); vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64, pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq); + if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) { + SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo}; + vInfo("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq, + rpcMsg.info.handle, rpcMsg.info.ahandle); + rpcSendResponse(&rpcMsg); + } pVnode->blocked = false; pVnode->blockSec = 0; pVnode->blockSeq = 0; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index af456f42f8e64deeb776d3345f7e9ad1faeed718..d6ce77193a7c38a670cd704634f91f14c7f60d6a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -218,9 +218,9 @@ int32_t syncLeaderTransfer(int64_t rid) { return ret; } -void syncSendTimeoutRsp(int64_t rid, int64_t seq) { +int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { SSyncNode* pNode = syncNodeAcquire(rid); - if (pNode == NULL) return; + if (pNode == NULL) return -1; SRpcMsg rpcMsg = {0}; int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info); @@ -228,9 +228,13 @@ void syncSendTimeoutRsp(int64_t rid, int64_t seq) { syncNodeRelease(pNode); if (ret == 1) { - sInfo("send response since sync timeout, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, + sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle); rpcSendResponse(&rpcMsg); + return 0; + } else { + sInfo("no rpcinfo to send timeout response, seq:%" PRId64, seq); + return -1; } }