From 9eb15e41efc8a3b936746151163018420bf17b48 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 29 May 2023 16:44:03 +0800 Subject: [PATCH] fix:put poll to push manager if wal not exist when offset is latest --- source/dnode/vnode/src/tq/tqUtil.c | 40 ++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index e73aed8966..d30ed638f1 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -87,12 +87,13 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { return 0; } -static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest) { +static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) { uint64_t consumerId = pRequest->consumerId; STqOffsetVal reqOffset = pRequest->reqOffset; STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey); int32_t vgId = TD_VID(pTq->pVnode); + *pBlockReturned = false; // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value. if (pOffset != NULL) { *pOffsetVal = pOffset->val; @@ -120,12 +121,27 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); - tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); - STqOffset offset = {0}; - strcpy(offset.subKey, pRequest->subKey); - if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { - terrno = TSDB_CODE_PAR_INTERNAL_ERROR; - return -1; + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); + + tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, + pHandle->subKey, vgId, dataRsp.rspOffset.version); + int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + tDeleteSMqDataRsp(&dataRsp); + + *pBlockReturned = true; + return code; + } else { + STaosxRsp taosxRsp = {0}; + tqInitTaosxRsp(&taosxRsp, pRequest); + tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer); + int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + + *pBlockReturned = true; + return code; } } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", @@ -154,8 +170,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, } // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST || (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId)) { + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) { // lock taosWLockLatch(&pTq->lock); code = tqRegisterPushHandle(pTq, pHandle, pMsg); @@ -296,11 +311,16 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ // 1. reset the offset if needed if (IS_OFFSET_RESET_TYPE(reqOffset.type)) { // handle the reset offset cases, according to the consumer's choice. - code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest); + bool blockReturned = false; + code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); if (code != 0) { return code; } + // empty block returned, quit + if (blockReturned) { + return 0; + } } else { // use the consumer specified offset // the offset value can not be monotonious increase?? offset = reqOffset; -- GitLab