From d48be5c0cea8665a8b5cdc7f42629373101c930a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Apr 2023 18:23:35 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 5 ++++- source/libs/stream/src/stream.c | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 606cfc294a..5893a4b941 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1415,7 +1415,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver); if (succ) { - if (tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit) < 0) { + int32_t code = tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit); + if (code < 0) { + // let's handle the back pressure + tqError("stream task:%d failed to put into queue for, too many", pTask->taskId); continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fd0a5233a1..df8847c26d 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -310,6 +310,7 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { // TODO: back pressure atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); #endif + return 0; } -- GitLab