diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 047000054f73933150a0b345929954f9a80f25bd..9a30bc7727eb9722d4cce9b5048a5af6d9788297 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -17,6 +17,7 @@ #include "ttimer.h" #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (200) int32_t streamInit() { int8_t old; @@ -295,13 +296,17 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; + + qDebug("s-task:%s submit enqueue %p %p msgLen:%dB ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, - pSubmitBlock->submit.ver, total); + pSubmitBlock->submit.ver, numOfBlocks, size); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { - qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && + (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); streamDataSubmitDestroy(pSubmitBlock); return -1; } @@ -309,13 +314,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { - qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || + (numOfBlocks >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); return -1; } - qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); + qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem);