From 78fd70202c98f7133c8901e75260f0d4dd030a6f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 10 Aug 2023 16:18:53 +0800 Subject: [PATCH] fix:offset error in tmq & add test cases --- source/client/src/clientTmq.c | 10 ++--- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 5 +-- source/dnode/vnode/src/tq/tqUtil.c | 62 +++++++++++++-------------- source/libs/wal/src/walRead.c | 6 ++- tests/system-test/7-tmq/tmq_offset.py | 6 +-- utils/test/c/tmq_offset_test.c | 10 +++++ 7 files changed, 54 insertions(+), 47 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ae82be2470..b4168046f4 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p return 0; } -static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ +static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){ if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); - pVg->offsetInfo.beginOffset = *reqOffset; + if(hasData) pVg->offsetInfo.beginOffset = *reqOffset; pVg->offsetInfo.endOffset = *rspOffset; } else { tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); @@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } - updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); + updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId, pDataRsp->blockNum != 0); char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); @@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId); + updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); @@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId); + updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId, pollRspWrapper->taosxRsp.blockNum != 0); if (pollRspWrapper->taosxRsp.blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 13b991e038..a6a84075b5 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); -int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq); +int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset); #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 98695a9e63..65ff1539aa 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { } SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, &req); + tqInitDataRsp(&dataRsp, req.reqOffset); dataRsp.blockNum = 0; - dataRsp.rspOffset = dataRsp.reqOffset; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset); tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); @@ -714,7 +713,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, &req); + tqInitDataRsp(&dataRsp, req.reqOffset); if (req.useSnapshot == true) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 1a0663665d..5cbca6e0f2 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -20,8 +20,9 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); -int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { - pRsp->reqOffset = pReq->reqOffset; +int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { + pRsp->reqOffset = pOffset; + pRsp->rspOffset = pOffset; pRsp->blockData = taosArrayInit(0, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); @@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { return 0; } -static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { - pRsp->reqOffset = pReq->reqOffset; +static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { + pRsp->reqOffset = pOffset; + pRsp->rspOffset = pOffset; pRsp->withTbName = 1; pRsp->withSchema = 1; @@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) { uint64_t consumerId = pRequest->consumerId; - STqOffsetVal reqOffset = pRequest->reqOffset; STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey); int32_t vgId = TD_VID(pTq->pVnode); @@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } else { // no poll occurs in this vnode for this topic, let's seek to the right offset value. - if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { + if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { if (pRequest->useSnapshot) { tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot", consumerId, pHandle->subKey, vgId); @@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); } - } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { + } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) { walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pRequest); + tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1); - tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1); + tqInitDataRsp(&dataRsp, *pOffsetVal); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, pHandle->subKey, vgId, dataRsp.rspOffset.version); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); @@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand *pBlockReturned = true; return code; - } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { + } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, vgId, pRequest->subKey); @@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } -static void setRequestVersion(STqOffsetVal* offset, int64_t ver){ - if(offset->type == TMQ_OFFSET__LOG){ - offset->version = ver; - } -} +//static void setRequestVersion(STqOffsetVal* offset, int64_t ver){ +// if(offset->type == TMQ_OFFSET__LOG){ +// offset->version = ver; +// } +//} static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { @@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, terrno = 0; SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pRequest); - dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset + tqInitDataRsp(&dataRsp, *pOffset); +// dataRsp.reqOffset.type = pOffset->type; // store origin type for getting offset in tmq_get_vgroup_offset qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); @@ -160,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, taosWUnLockLatch(&pTq->lock); } - setRequestVersion(&dataRsp.reqOffset, pOffset->version); +// setRequestVersion(&dataRsp.reqOffset, pOffset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); end : { @@ -181,8 +182,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SWalCkHead* pCkHead = NULL; SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, pRequest); - taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset + tqInitTaosxRsp(&taosxRsp, *offset); +// taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { @@ -235,7 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - setRequestVersion(&taosxRsp.reqOffset, offset->version); +// setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -248,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - setRequestVersion(&taosxRsp.reqOffset, offset->version); +// setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -278,7 +279,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); - setRequestVersion(&taosxRsp.reqOffset, offset->version); +// setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } else { @@ -295,15 +296,13 @@ end: } int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { - int32_t code = -1; - STqOffsetVal offset = {0}; STqOffsetVal reqOffset = pRequest->reqOffset; // 1. reset the offset if needed - if (IS_OFFSET_RESET_TYPE(reqOffset.type)) { + if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) { // handle the reset offset cases, according to the consumer's choice. bool blockReturned = false; - code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); + int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned); if (code != 0) { return code; } @@ -312,20 +311,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ if (blockReturned) { return 0; } - } else if(reqOffset.type != 0){ // use the consumer specified offset - // the offset value can not be monotonious increase?? - offset = reqOffset; - } else { + } else if(reqOffset.type == 0){ // use the consumer specified offset uError("req offset type is 0"); return TSDB_CODE_TMQ_INVALID_MSG; } // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); + return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); } else { // todo handle the case where re-balance occurs. // for taosx - return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); + return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); } } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 04a76146c6..54b9576eb1 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -75,7 +75,10 @@ int32_t walNextValidMsg(SWalReader *pReader) { wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64, pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); - + if (fetchVer > appliedVer){ + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; + } while (fetchVer <= appliedVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; @@ -97,7 +100,6 @@ int32_t walNextValidMsg(SWalReader *pReader) { } } - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } diff --git a/tests/system-test/7-tmq/tmq_offset.py b/tests/system-test/7-tmq/tmq_offset.py index a39365c13b..33d36eda71 100644 --- a/tests/system-test/7-tmq/tmq_offset.py +++ b/tests/system-test/7-tmq/tmq_offset.py @@ -23,16 +23,16 @@ class TDTestCase: def run(self): tdSql.prepare() buildPath = tdCom.getBuildPath() - cmdStr1 = '%s/build/bin/taosBenchmark -i 10 -B 1 -t 100000 -n 100000 -y &'%(buildPath) + cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath) tdLog.info(cmdStr1) os.system(cmdStr1) - time.sleep(20) + time.sleep(10) cmdStr2 = '%s/build/bin/tmq_offset_test &'%(buildPath) tdLog.info(cmdStr2) os.system(cmdStr2) - time.sleep(30) + time.sleep(20) os.system("kill -9 `pgrep taosBenchmark`") result = os.system("kill -9 `pgrep tmq_offset_test`") diff --git a/utils/test/c/tmq_offset_test.c b/utils/test/c/tmq_offset_test.c index 03f710ae16..18931e2548 100644 --- a/utils/test/c/tmq_offset_test.c +++ b/utils/test/c/tmq_offset_test.c @@ -190,6 +190,16 @@ void test_offset(TAOS* pConn){ ASSERT(0); } + for(int i = 0; i < numOfAssign; i++){ + int64_t position = tmq_position(tmq, "tp", pAssign[i].vgId); + if(position == 0) continue; + + printf("position = %lld\n", position); + tmq_commit_offset_sync(tmq, "tp", pAssign[i].vgId, position); + int64_t committed = tmq_committed(tmq, "tp", pAssign[i].vgId); + ASSERT(position == committed); + } + tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); -- GitLab