diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index befcb00ac793778694ba9713098ad657dfd317be..0ee9a7bd7d9e5528e003d87f83122e477b2d132b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1151,6 +1151,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { }; if (tsem_init(¶m.rspSem, 0, 0) != 0) { + code = TSDB_CODE_TSC_INTERNAL_ERROR; goto FAIL; } @@ -1186,6 +1187,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { if (retryCnt++ > MAX_RETRY_COUNT) { + tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); + code = TSDB_CODE_TSC_INTERNAL_ERROR; goto FAIL; } diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index eda9c70f159b9c440d530a8036ef01eeb409930c..87b0d11d1c00f631c7c09081952af82174e2432f 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -6,6 +6,14 @@ add_executable(tmq_taosx_ci tmq_taosx_ci.c) add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) add_executable(get_db_name_test get_db_name_test.c) +add_executable(tmq_offset tmqOffset.c) +target_link_libraries( + tmq_offset + PUBLIC taos_static + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( create_table PUBLIC taos_static diff --git a/utils/test/c/tmqOffset.c b/utils/test/c/tmqOffset.c new file mode 100644 index 0000000000000000000000000000000000000000..7225cb87bdcbf77807c1861d22cef72f955d49bd --- /dev/null +++ b/utils/test/c/tmqOffset.c @@ -0,0 +1,64 @@ +// +// Created by mingming wanng on 2023/4/7. +// +#include +#include +#include "taoserror.h" +#include "tlog.h" +#include "tmsg.h" + +typedef struct { + int32_t size; +} STqOffsetHead; + +int32_t tqOffsetRestoreFromFile(const char* fname) { + TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); + if (pFile != NULL) { + STqOffsetHead head = {0}; + int32_t code; + + while (1) { + if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + if (code == 0) { + break; + } else { + printf("code:%d != 0\n", code); + return -1; + } + } + int32_t size = htonl(head.size); + void* memBuf = taosMemoryCalloc(1, size); + if (memBuf == NULL) { + printf("memBuf == NULL\n"); + return -1; + } + if ((code = taosReadFile(pFile, memBuf, size)) != size) { + taosMemoryFree(memBuf); + printf("code:%d != size:%d\n", code, size); + return -1; + } + STqOffset offset; + SDecoder decoder; + tDecoderInit(&decoder, memBuf, size); + if (tDecodeSTqOffset(&decoder, &offset) < 0) { + taosMemoryFree(memBuf); + tDecoderClear(&decoder); + printf("tDecodeSTqOffset error\n"); + return -1; + } + + tDecoderClear(&decoder); + printf("subkey:%s, type:%d, uid/version:%"PRId64", ts:%"PRId64"\n", + offset.subKey, offset.val.type, offset.val.uid, offset.val.ts); + taosMemoryFree(memBuf); + } + + taosCloseFile(&pFile); + } + return 0; +} + +int main(int argc, char *argv[]) { + tqOffsetRestoreFromFile("offset-ver0"); + return 0; +}