diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 2646fe30b3b0dc37ac7493c649dfcc03138dee13..a17ad9775632e5a89232c15702c02234b7040906 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -795,19 +795,23 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { STaosQall* qall = taosAllocateQall(); taosReadAllQitems(pTmq->delayedTask, qall); - tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems); + if (qall->numOfItems == 0) { + taosFreeQall(qall); + return TSDB_CODE_SUCCESS; + } - while (1) { - int8_t* pTaskType = NULL; - taosGetQitem(qall, (void**)&pTaskType); - if (pTaskType == NULL) break; + tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems); + int8_t* pTaskType = NULL; + taosGetQitem(qall, (void**)&pTaskType); + while (pTaskType != NULL) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { tmqAskEp(pTmq, true); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; + tscDebug("consumer:0x%"PRIx64" retrieve ep from mnode in 1s", pTmq->consumerId); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); @@ -815,12 +819,16 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; + tscDebug("consumer:0x%"PRIx64" commit to mnode in %.2f s", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { + // do nothing } else { ASSERT(0); } + taosFreeQitem(pTaskType); + taosGetQitem(qall, (void**)&pTaskType); } taosFreeQall(qall);