diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index acf878632d30f4ea235dd5379d9998866fd5ec8e..b9d4e4edab06a5bed1d5ff128e60156e9bd50676 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2517,6 +2517,31 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = num; } + for (int32_t j = 0; j < (*numOfAssignment); ++j) { + tmq_topic_assignment* p = &(*assignment)[j]; + + for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); + if (pClientVg->vgId != p->vgId) { + continue; + } + + SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; + + pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; + + char offsetBuf[80] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); + + tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf); + + pOffsetInfo->walVerBegin = p->begin; + pOffsetInfo->walVerEnd = p->end; + pOffsetInfo->currentOffset.version = p->currentOffset; + pOffsetInfo->committedOffset.version = p->currentOffset; + } + } + destroyCommonInfo(pCommon); return code; } else { diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index f15a93cb2c300587bfffe880b8a45bf0aef60175..f1d1ad0865e9b25791a130bfd047ecead064b1b2 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1122,6 +1122,8 @@ TEST(clientCase, sub_tb_test) { return; } + tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, 0); + while (1) { TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); if (pRes != NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4a9bf07429a641ceedf1c131e261f72f8cee7716..1000f82d8f2a5a744f7810999ce8c1c6a6408ddf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -301,8 +301,12 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) } // save the new offset value - tqDebug("vgId:%d sub:%s seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, - pSavedOffset->val.version); + if (pSavedOffset != NULL) { + tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, + pSavedOffset->val.version); + } else { + tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version); + } if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);