From 587f750c94d8d7784a0d964b54c29440d9e1e5ed Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 9 May 2023 17:42:11 +0800 Subject: [PATCH] fix:[TD-24058]send poll result to client if no data 5 times to avoid lost data --- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tqUtil.c | 19 +++++++++++++------ source/libs/wal/src/walRead.c | 15 +++++++-------- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 30b2fb74ca..e431ca4a01 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -101,6 +101,7 @@ typedef struct { STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec SRpcMsg* msg; + int32_t noDataPollCnt; } STqHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a66d63a910..94803ef438 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -16,6 +16,7 @@ #include "tq.h" #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) +#define NO_POLL_CNT 5 static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp); @@ -185,12 +186,18 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - // lock - taosWLockLatch(&pTq->lock); - code = tqRegisterPushHandle(pTq, pHandle, pMsg); - taosWUnLockLatch(&pTq->lock); - tDeleteSMqDataRsp(&dataRsp); - return code; + if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data + pHandle->noDataPollCnt = 0; + // lock + taosWLockLatch(&pTq->lock); + code = tqRegisterPushHandle(pTq, pHandle, pMsg); + taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); + return code; + } + else{ + pHandle->noDataPollCnt++; + } } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index b29e36efdc..37d97b35a6 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -74,18 +74,17 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); - while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] - wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer); - taosMsleep(1); - appliedVer = walGetAppliedVer(pReader->pWal); + if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] + wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); +// taosMsleep(10); } // int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; -// endVer = TMIN(appliedVer, endVer); + int64_t endVer = TMIN(appliedVer, committedVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64, - pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); - while (fetchVer <= committedVer) { + ", applied index:%" PRId64", end index:%" PRId64, + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + while (fetchVer <= endVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; } -- GitLab