From e96ed81ec0df8c6eacdaa0c38c44e8099c9c3346 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 17:29:01 +0800 Subject: [PATCH] fix(stream): add input queue capacity check. --- source/libs/stream/src/stream.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d3e4b23ad1..0d772247b4 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -132,8 +132,6 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); // input queue is full, upstream is blocked now status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED; - - } // rsp by input status @@ -304,10 +302,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + if (code != TSDB_CODE_SUCCESS) { + streamDataSubmitDestroy(px); + taosFreeQitem(pItem); + return code; + } } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { + if (/*(pTask->taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); @@ -317,10 +320,16 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); - taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock((SStreamDataBlock*) pItem); + taosFreeQitem(pItem); + return code; + } } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__GET_RES) { + // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } -- GitLab