From d3dedbe676eccc207cab66e36128a865a662f6ba Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 25 Jun 2023 14:52:45 +0800 Subject: [PATCH] fix:report current offset in tmq --- source/client/src/clientTmq.c | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e7927cd0ae..31c15158f9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -810,7 +810,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; - offRows->offset = pVg->offsetInfo.committedOffset; + offRows->offset = pVg->offsetInfo.currentOffset; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows); @@ -1463,7 +1463,8 @@ CREATE_MSG_FAIL: } typedef struct SVgroupSaveInfo { - STqOffsetVal offset; + STqOffsetVal currentOffset; + STqOffsetVal commitOffset; int64_t numOfRows; } SVgroupSaveInfo; @@ -1488,12 +1489,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId); SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey)); - int64_t numOfRows = 0; STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg}; - if (pInfo != NULL) { - offsetNew = pInfo->offset; - numOfRows = pInfo->numOfRows; - } SMqClientVg clientVg = { .pollCnt = 0, @@ -1502,11 +1498,11 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .vgStatus = TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, - .numOfRows = numOfRows, + .numOfRows = pInfo ? pInfo->numOfRows : 0, }; - clientVg.offsetInfo.currentOffset = offsetNew; - clientVg.offsetInfo.committedOffset = offsetNew; + clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew; + clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew; clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; @@ -1565,7 +1561,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows}; + SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } -- GitLab