提交 13f3ca42 编写于 作者: H Haojun Liao

fix(stream): add input queue size limitation.

上级 6ceda683
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "ttimer.h" #include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (200)
int32_t streamInit() { int32_t streamInit() {
int8_t old; int8_t old;
...@@ -295,13 +296,17 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -295,13 +296,17 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return -1; return -1;
} }
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0;
qDebug("s-task:%s submit enqueue %p %p msgLen:%dB ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
pSubmitBlock->submit.ver, total); pSubmitBlock->submit.ver, numOfBlocks, size);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr,
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE);
streamDataSubmitDestroy(pSubmitBlock); streamDataSubmitDestroy(pSubmitBlock);
return -1; return -1;
} }
...@@ -309,13 +314,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -309,13 +314,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY ||
qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); (numOfBlocks >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr,
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE);
return -1; return -1;
} }
qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks);
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) { } else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册