From 13f3ca42267f42f5bda1952c2462d20ec063fea5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 18:47:29 +0800 Subject: [PATCH] fix(stream): add input queue size limitation. --- source/libs/stream/src/stream.c | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 047000054f..9a30bc7727 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -17,6 +17,7 @@ #include "ttimer.h" #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (200) int32_t streamInit() { int8_t old; @@ -295,13 +296,17 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; + + qDebug("s-task:%s submit enqueue %p %p msgLen:%dB ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, - pSubmitBlock->submit.ver, total); + pSubmitBlock->submit.ver, numOfBlocks, size); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { - qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && + (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); streamDataSubmitDestroy(pSubmitBlock); return -1; } @@ -309,13 +314,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { - qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || + (numOfBlocks >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); return -1; } - qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); + qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); -- GitLab