From 5b75ee43966a51c89bbfc8febc9af16a024bf56e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 11 Mar 2023 19:13:50 +0800 Subject: [PATCH] opti:modify tmq logic --- source/client/src/clientTmq.c | 3 +-- source/libs/executor/src/executor.c | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9f24deff94..d32b5a1beb 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1243,7 +1243,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { /*pRspWrapper->vgHandle = pVg;*/ /*pRspWrapper->topicHandle = pTopic;*/ taosWriteQitem(tmq->mqueue, pRspWrapper); - tsem_post(&tmq->rspSem); } goto CREATE_MSG_FAIL; @@ -1923,7 +1922,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { return NULL; } - if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { + while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { if (retryCnt++ > 40) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8b52a401f1..af625be0b1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1078,7 +1078,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pTSInfo->base.dataReader = NULL; // let's seek to the next version in wal file if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { - qError("tqSeekVer failed ver:" PRId64, pOffset->version + 1); + qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1); return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { -- GitLab