From 86580cec090f96839e57f06935f90ef5fc6dd330 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 19 May 2022 10:53:24 +0800 Subject: [PATCH] enh(tmq): remove old tmq_commit api --- example/src/tmq.c | 2 +- include/client/taos.h | 6 ++-- tests/test/c/tmqDemo.c | 2 +- tests/test/c/tmqSim.c | 77 ++++++++++++++++++++---------------------- 4 files changed, 41 insertions(+), 46 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index b79d21d051..1abce3f188 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -239,7 +239,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { msg_process(tmqmessage); taos_free_result(tmqmessage); - tmq_commit(tmq, NULL, 1); + tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL); /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ } } diff --git a/include/client/taos.h b/include/client/taos.h index 0194357841..0b8c67aa79 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -232,11 +232,11 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); -DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); -DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param); DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets); +DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param); + #if 0 -DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); +DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); #endif diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index dc1a77c231..96d7741897 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -368,7 +368,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { /*msg_process(tmqmessage);*/ taos_free_result(tmqmessage); - if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); + if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit_sync(tmq, NULL); } } diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index cf113369bc..8322c1a60c 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -37,10 +37,10 @@ typedef struct { TdThread thread; int32_t consumerId; - int32_t ifManualCommit; - //int32_t autoCommitIntervalMs; // 1000 ms - //char autoCommit[8]; // true, false - //char autoOffsetRest[16]; // none, earliest, latest + int32_t ifManualCommit; + // int32_t autoCommitIntervalMs; // 1000 ms + // char autoCommit[8]; // true, false + // char autoOffsetRest[16]; // none, earliest, latest int32_t ifCheckData; int64_t expectMsgCnt; @@ -99,21 +99,15 @@ static void printHelp() { } void initLogFile() { - time_t now; - struct tm curTime; - char filename[256]; + time_t now; + struct tm curTime; + char filename[256]; - now = taosTime(NULL); + now = taosTime(NULL); taosLocalTime(&now, &curTime); - sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", - configDir, - curTime.tm_year+1900, - curTime.tm_mon+1, - curTime.tm_mday, - curTime.tm_hour, - curTime.tm_min, - curTime.tm_sec); - //sprintf(filename, "%s/../log/tmqlog.txt", configDir); + sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900, + curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec); + // sprintf(filename, "%s/../log/tmqlog.txt", configDir); TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (NULL == pFile) { fprintf(stderr, "Failed to open %s for save result\n", filename); @@ -137,9 +131,9 @@ void saveConfigToLogFile() { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); - //taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); - //taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); - //taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); + // taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); + // taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); + // taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); taosFprintfFile(g_fp, " Topics: "); for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); @@ -234,17 +228,17 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) while (1) { TAOS_ROW row = taos_fetch_row(msg); - if (row == NULL) break; + if (row == NULL) break; - TAOS_FIELD* fields = taos_fetch_fields(msg); + TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); - + taos_print_row(buf, row, fields, numOfFields); - - if (0 != g_stConfInfo.showRowFlag) { + + if (0 != g_stConfInfo.showRowFlag) { taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); } - + totalRows++; } @@ -276,7 +270,7 @@ void build_consumer(SThreadInfo* pInfo) { tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); - //tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); @@ -299,7 +293,7 @@ void build_consumer(SThreadInfo* pInfo) { pInfo->tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); - + return; } @@ -322,10 +316,10 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); - time_t tTime = taosGetTimestampSec(); + time_t tTime = taosGetTimestampSec(); struct tm tm = *taosLocalTime(&tTime, NULL); taosFprintfFile(g_fp, "# save result: %d-%02d-%02d %02d:%02d:%02d, sql: %s\n", tm.tm_year + 1900, tm.tm_mon + 1, - tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr); + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { @@ -357,11 +351,11 @@ void loop_consume(SThreadInfo* pInfo) { totalMsgs++; if (totalRows >= pInfo->expectMsgCnt) { - taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); + taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); break; } - } else { - taosFprintfFile(g_fp, "==== delay over time, so break\n"); + } else { + taosFprintfFile(g_fp, "==== delay over time, so break\n"); break; } } @@ -389,7 +383,7 @@ void* consumeThreadFunc(void* param) { pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - + tmq_list_destroy(pInfo->topicList); pInfo->topicList = NULL; @@ -397,17 +391,18 @@ void* consumeThreadFunc(void* param) { if (pInfo->ifManualCommit) { taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n"); - pPrint("tmq_commit() manual commit when consume end.\n"); - tmq_commit(pInfo->tmq, NULL, 0); + pPrint("tmq_commit() manual commit when consume end.\n"); + /*tmq_commit(pInfo->tmq, NULL, 0);*/ + tmq_commit_sync(pInfo->tmq, NULL); } - + err = tmq_unsubscribe(pInfo->tmq); if (err) { pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); pInfo->consumeMsgCnt = -1; return NULL; } - + err = tmq_consumer_close(pInfo->tmq); if (err) { pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); @@ -485,9 +480,9 @@ int32_t getConsumeInfo() { int32_t* lengths = taos_fetch_lengths(pRes); // set default value - //g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; - //memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); - //memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); + // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; + // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); + // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); for (int i = 0; i < num_fields; ++i) { if (row[i] == NULL || 0 == i) { -- GitLab