From 77d43ef5fdc6d1865ccadc79cb544abc18459781 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 27 Jul 2023 20:28:45 +0800 Subject: [PATCH] fix:tmq ci error --- source/dnode/vnode/src/tq/tqUtil.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 8ebdaa194b..9062084246 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -105,7 +105,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest); - tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); + tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, pHandle->subKey, vgId, dataRsp.rspOffset.version); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); @@ -161,6 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, taosWUnLockLatch(&pTq->lock); } + setRequestVersion(&dataRsp.reqOffset, pOffset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); end : { @@ -235,6 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -246,7 +248,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, // process meta if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -275,7 +278,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); + setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } else { -- GitLab