diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index b04727bfc030ed9d2181462725778a75c6202d99..ccc17289b01414f41bc9959fd2ba8fd7cd7f8061 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1100,7 +1100,7 @@ TEST(clientCase, sub_tb_test) { // 创建订阅 topics 列表 tmq_list_t* topicList = tmq_list_new(); - tmq_list_append(topicList, "topic_t1"); + tmq_list_append(topicList, "t1"); // 启动订阅 tmq_subscribe(tmq, topicList); @@ -1118,7 +1118,7 @@ TEST(clientCase, sub_tb_test) { tmq_topic_assignment* pAssign = NULL; int32_t numOfAssign = 0; - int32_t code = tmq_get_topic_assignment(tmq, "topic_t1", &pAssign, &numOfAssign); + int32_t code = tmq_get_topic_assignment(tmq, "t1", &pAssign, &numOfAssign); if (code != 0) { printf("error occurs:%s\n", tmq_err2str(code)); tmq_consumer_close(tmq); @@ -1127,7 +1127,16 @@ TEST(clientCase, sub_tb_test) { return; } - tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, 0); + tmq_offset_seek(tmq, "t1", pAssign[0].vgId, 4); + + code = tmq_get_topic_assignment(tmq, "t1", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } while (1) { TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 14deff75020ebfc2ebe53dc7f44d28b7885c1d60..c6077a3f8f013d03b5f24bf997922fb878cfc1ba 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -510,8 +510,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); - SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, &req); @@ -537,7 +535,12 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; if (reqOffset.type == TMQ_OFFSET__LOG) { - dataRsp.rspOffset.version = currentVer; // return current consume offset value + int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); + if (currentVer == -1) { // not start to read data from wal yet, return req offset directly + dataRsp.rspOffset.version = reqOffset.version; + } else { + dataRsp.rspOffset.version = currentVer; // return current consume offset value + } } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {