diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 122dbc47f54907b291b78ca4921ea9b915712000..038bbbd35bcb05af040baeb1b1455b8c155a7d1d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,6 +18,7 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 128 #define MIN_STREAM_EXEC_BATCH_NUM 16 +#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 static int32_t updateCheckPointInfo (SStreamTask* pTask); static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); @@ -32,6 +33,39 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } +static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, + int32_t size, int64_t* totalSize, int32_t* totalBlocks) { + int32_t code = updateCheckPointInfo(pTask); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int32_t numOfBlocks = taosArrayGetSize(pRes); + if (numOfBlocks > 0) { + SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); + if (pStreamBlocks == NULL) { + return -1; + } + + qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); + + code = streamTaskOutputResultBlock(pTask, pStreamBlocks); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(pStreamBlocks); + return -1; + } + } else { + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + } + + *totalSize += size; + *totalBlocks += numOfBlocks; + + ASSERT(taosArrayGetSize(pRes) == 0); + return TSDB_CODE_SUCCESS; +} + static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -98,38 +132,25 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i pTask->selfChildId, numOfBlocks, size / 1048576.0); // current output should be dispatched to down stream nodes - if (numOfBlocks > 1000) { - code = updateCheckPointInfo(pTask); + if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) { + ASSERT(numOfBlocks == taosArrayGetSize(pRes)); + code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); if (code != TSDB_CODE_SUCCESS) { return code; } - ASSERT(numOfBlocks == taosArrayGetSize(pRes)); - - if (numOfBlocks > 0) { - SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); - if (pStreamBlocks == NULL) { - return -1; - } - - qDebug("s-task:%s output exec stream data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); - - code = streamTaskOutputResultBlock(pTask, pStreamBlocks); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(pStreamBlocks); - return -1; - } - } else { - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); - } + size = 0; + numOfBlocks = 0; + ASSERT(taosArrayGetSize(pRes) == 0); } + } - *totalSize += size; - *totalBlocks += numOfBlocks; - - size = 0; - numOfBlocks = 0; + if (numOfBlocks > 0) { + ASSERT(numOfBlocks == taosArrayGetSize(pRes)); + code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; + } ASSERT(taosArrayGetSize(pRes) == 0); }