提交 abe37a16 编写于 作者: H Haojun Liao

refactor: do some internal refactor and add some logs.

上级 6836dc1c
...@@ -795,19 +795,23 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { ...@@ -795,19 +795,23 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
STaosQall* qall = taosAllocateQall(); STaosQall* qall = taosAllocateQall();
taosReadAllQitems(pTmq->delayedTask, qall); taosReadAllQitems(pTmq->delayedTask, qall);
tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems); if (qall->numOfItems == 0) {
taosFreeQall(qall);
return TSDB_CODE_SUCCESS;
}
while (1) { tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
int8_t* pTaskType = NULL; int8_t* pTaskType = NULL;
taosGetQitem(qall, (void**)&pTaskType); taosGetQitem(qall, (void**)&pTaskType);
if (pTaskType == NULL) break;
while (pTaskType != NULL) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
tmqAskEp(pTmq, true); tmqAskEp(pTmq, true);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId; *pRefId = pTmq->refId;
tscDebug("consumer:0x%"PRIx64" retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
...@@ -815,12 +819,16 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { ...@@ -815,12 +819,16 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId; *pRefId = pTmq->refId;
tscDebug("consumer:0x%"PRIx64" commit to mnode in %.2f s", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
// do nothing
} else { } else {
ASSERT(0); ASSERT(0);
} }
taosFreeQitem(pTaskType); taosFreeQitem(pTaskType);
taosGetQitem(qall, (void**)&pTaskType);
} }
taosFreeQall(qall); taosFreeQall(qall);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册