From af65f9703bf809091c3b6c7579f143c9f80b253f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 23 Apr 2023 17:36:59 +0800 Subject: [PATCH] fix:tmq error if consume callback is earlier than consume --- utils/test/c/tmqSim.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index f2de219f4e..530b142173 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -690,11 +690,12 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { } static int32_t g_once_commit_flag = 0; +static int32_t g_once_consume_flag = 0; static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (0 == g_once_commit_flag) { + if (g_once_consume_flag == 1 && 0 == g_once_commit_flag) { g_once_commit_flag = 1; notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } @@ -773,8 +774,6 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { void loop_consume(SThreadInfo* pInfo) { int32_t code; - int32_t once_flag = 0; - int64_t totalMsgs = 0; int64_t totalRows = 0; @@ -834,8 +833,8 @@ void loop_consume(SThreadInfo* pInfo) { lastTotalMsgs = totalMsgs; } - if (0 == once_flag) { - once_flag = 1; + if (0 == g_once_consume_flag) { + g_once_consume_flag = 1; notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); } -- GitLab