From 87deb36f8a3a6fe9c903e5be0f775478a0d15149 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 19:36:02 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/src/executor.c | 5 ++++- source/libs/executor/src/scanoperator.c | 10 ++++++---- source/libs/stream/src/streamExec.c | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fb35b211c9..ed8a4f3a3b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -132,7 +132,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; - qDebug("s-task:%s set source blocks:%d", id, (int32_t)numOfBlocks); + + qDebug("s-task:%s in this batch, all %d blocks need to be processed and dump results", id, (int32_t)numOfBlocks); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { @@ -140,6 +141,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); taosArrayPush(pInfo->pBlockLists, pReq); } + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_SUBMIT) { taosArrayPush(pInfo->pBlockLists, input); @@ -150,6 +152,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SPackedData tmp = { .pDataBlock = pDataBlock }; taosArrayPush(pInfo->pBlockLists, &tmp); } + pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { ASSERT(0); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2d2f377041..88f5642ef9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1797,9 +1797,10 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + const char* id = GET_TASKID(pTaskInfo); + SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStreamScanInfo* pInfo = pOperator->info; qDebug("stream scan started, %s", GET_TASKID(pTaskInfo)); @@ -1922,7 +1923,9 @@ FETCH_NEXT_BLOCK: return NULL; } - int32_t current = pInfo->validBlockIndex++; + int32_t current = pInfo->validBlockIndex++; + qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id); + SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SSDataBlock* pBlock = pPacked->pDataBlock; if (pBlock->info.parTbName[0]) { @@ -2057,7 +2060,6 @@ FETCH_NEXT_BLOCK: return pInfo->pUpdateRes; } - const char* id = GET_TASKID(pTaskInfo); SSDataBlock* pBlock = pInfo->pRes; SDataBlockInfo* pBlockInfo = &pBlock->info; int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d6f5533db4..0970bbcf0c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,8 +16,8 @@ #include "streamInc.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_EXEC_BATCH_NUM 128 -#define MIN_STREAM_EXEC_BATCH_NUM 16 +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 8 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 static int32_t updateCheckPointInfo (SStreamTask* pTask); -- GitLab