diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 606cfc294a26b29498148296216b9d5b4ae961d2..5893a4b941c54932347ed76b23be13bd37733144 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1415,7 +1415,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver); if (succ) { - if (tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit) < 0) { + int32_t code = tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit); + if (code < 0) { + // let's handle the back pressure + tqError("stream task:%d failed to put into queue for, too many", pTask->taskId); continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fd0a5233a12e3c757b8f24c6916bf3f40000f9c3..df8847c26de923ad98b53b3f21bc69af7f715c6f 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -310,6 +310,7 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { // TODO: back pressure atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); #endif + return 0; }