diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 30b2fb74ca3251184b3d116db81046a3a9d81919..e431ca4a012ef132dd7203e21a2ab77175a1caf7 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -101,6 +101,7 @@ typedef struct { STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec SRpcMsg* msg; + int32_t noDataPollCnt; } STqHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a66d63a9103e1d94846aaa4a32e5422d132bf9eb..94803ef438cf3071a91fd045f9674e5427098adb 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -16,6 +16,7 @@ #include "tq.h" #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) +#define NO_POLL_CNT 5 static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp); @@ -185,12 +186,18 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - // lock - taosWLockLatch(&pTq->lock); - code = tqRegisterPushHandle(pTq, pHandle, pMsg); - taosWUnLockLatch(&pTq->lock); - tDeleteSMqDataRsp(&dataRsp); - return code; + if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data + pHandle->noDataPollCnt = 0; + // lock + taosWLockLatch(&pTq->lock); + code = tqRegisterPushHandle(pTq, pHandle, pMsg); + taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); + return code; + } + else{ + pHandle->noDataPollCnt++; + } } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index b29e36efdce9f6496da99d212bfd08c552a96bc7..37d97b35a6f9ffc84bd02be50fdfd605f954b64d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -74,18 +74,17 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); - while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] - wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer); - taosMsleep(1); - appliedVer = walGetAppliedVer(pReader->pWal); + if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] + wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); +// taosMsleep(10); } // int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; -// endVer = TMIN(appliedVer, endVer); + int64_t endVer = TMIN(appliedVer, committedVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64, - pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); - while (fetchVer <= committedVer) { + ", applied index:%" PRId64", end index:%" PRId64, + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + while (fetchVer <= endVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; }