未验证 提交 7a3920bb 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21492 from taosdata/refact/fillhistory

fix(tmq): set the correct offset rsp when no poll occuring.
......@@ -1100,7 +1100,7 @@ TEST(clientCase, sub_tb_test) {
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_t1");
tmq_list_append(topicList, "t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
......@@ -1118,7 +1118,7 @@ TEST(clientCase, sub_tb_test) {
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
int32_t code = tmq_get_topic_assignment(tmq, "topic_t1", &pAssign, &numOfAssign);
int32_t code = tmq_get_topic_assignment(tmq, "t1", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_consumer_close(tmq);
......@@ -1127,7 +1127,16 @@ TEST(clientCase, sub_tb_test) {
return;
}
tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, 0);
tmq_offset_seek(tmq, "t1", pAssign[0].vgId, 4);
code = tmq_get_topic_assignment(tmq, "t1", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
......
......@@ -510,8 +510,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req);
......@@ -537,7 +535,12 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = currentVer; // return current consume offset value
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
if (currentVer == -1) { // not start to read data from wal yet, return req offset directly
dataRsp.rspOffset.version = reqOffset.version;
} else {
dataRsp.rspOffset.version = currentVer; // return current consume offset value
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册