From 92feb89a0925fc3b0c403ae9429cabdceb8b5154 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 May 2023 18:08:04 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqRestore.c | 2 -- source/libs/stream/src/stream.c | 30 +++++++++++++-------------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 8b0f9f1e85..09e4ca4093 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -137,8 +137,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } // append the data for the stream -// tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); - SStreamQueueItem* pItem = NULL; int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { // failed, continue diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 236c1833ab..7874c50b8a 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,8 +16,10 @@ #include "streamInc.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100) +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (50) +#define ONE_MB_F (1048576.0) +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q)/ONE_MB_F) int32_t streamInit() { int8_t old; @@ -281,27 +283,25 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S } bool tInputQueueIsFull(const SStreamTask* pTask) { - return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; + bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); + return (isFull || size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); } int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { - int8_t type = pItem->type; + int8_t type = pItem->type; + int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit2* pSubmitBlock = (SStreamDataSubmit2*)pItem; - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, - pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, - pSubmitBlock->submit.ver, total); - qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size); + pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, total, size); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, - numOfBlocks, size); + total, size); streamDataSubmitDestroy(pSubmitBlock); return -1; } @@ -310,10 +310,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, size); -- GitLab