diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 6f978b0143b8f30de1d83f2db3e0d404c6aec44d..69c688f61cc33727db68d3e2781eedaec07e577e 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1442,4 +1442,178 @@ TEST(clientCase, sub_tb_mt_test) { } } +TEST(clientCase, ts_3756) { +// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); + + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + tmq_conf_t* conf = tmq_conf_new(); + + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", "group_id_2"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "latest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "tp"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + TAOS_FIELD* fields = NULL; + int32_t numOfFields = 0; + int32_t precision = 0; + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 200; + + int32_t count = 0; + + tmq_topic_assignment* pAssign = NULL; + int32_t numOfAssign = 0; + + int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4); + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + while (1) { + printf("start to poll\n"); + TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + char buf[128]; + + const char* topicName = tmq_get_topic_name(pRes); +// const char* dbName = tmq_get_db_name(pRes); +// int32_t vgroupId = tmq_get_vgroup_id(pRes); +// +// printf("topic: %s\n", topicName); +// printf("db: %s\n", dbName); +// printf("vgroup id: %d\n", vgroupId); + + printSubResults(pRes, &totalRows); + + tmq_topic_assignment* pAssignTmp = NULL; + int32_t numOfAssignTmp = 0; + + code = tmq_get_topic_assignment(tmq, "tp", &pAssignTmp, &numOfAssignTmp); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssignTmp[i].vgId, pAssignTmp[i].currentOffset, pAssignTmp[i].begin, pAssignTmp[i].end); + } + if(numOfAssign != 0){ + int i = 0; + for(; i < numOfAssign; i++){ + if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){ + break; + } + } + if(i == numOfAssign){ + printf("all position is same\n"); + break; + } + tmq_free_assignment(pAssign); + } + numOfAssign = numOfAssignTmp; + pAssign = pAssignTmp; + + } else { +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); +// tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); +// tmq_commit_sync(tmq, pRes); + continue; + } + +// tmq_commit_sync(tmq, pRes); + if (pRes != NULL) { + taos_free_result(pRes); + // if ((++count) > 1) { + // break; + // } + } else { +// break; + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin); + } + + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} #pragma GCC diagnostic pop diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 247c1729a34b32243d01ac7b80c96ad3b606f1a7..44a27fb791a288f2c151eccea53430908d1409d7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -114,9 +114,10 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg); + terrno = 0; int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { - if (terrno != 0) { + if (code == -1 && terrno != 0) { code = terrno; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d433042ad9be3f741cecd9edb994aa870e5c7e4..0371e979cce74c036307409674305b52061b3bbb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -646,7 +646,8 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)data, len); if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { - return TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; } tDecoderClear(&decoder); @@ -654,19 +655,22 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { STqOffset* pOffset = &vgOffset.offset; STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); if (pSavedOffset == NULL) { - return TSDB_CODE_TMQ_NO_COMMITTED; + terrno = TSDB_CODE_TMQ_NO_COMMITTED; + return terrno; } vgOffset.offset = *pSavedOffset; int32_t code = 0; tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code); if (code < 0) { - return TSDB_CODE_INVALID_PARA; + terrno = TSDB_CODE_INVALID_PARA; + return terrno; } void* buf = rpcMallocCont(len); if (buf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; } SEncoder encoder; tEncoderInit(&encoder, buf, len);