提交 87deb36f 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 6d04c4e2
...@@ -132,7 +132,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -132,7 +132,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
qDebug("s-task:%s set source blocks:%d", id, (int32_t)numOfBlocks);
qDebug("s-task:%s in this batch, all %d blocks need to be processed and dump results", id, (int32_t)numOfBlocks);
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
...@@ -140,6 +141,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -140,6 +141,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
taosArrayPush(pInfo->pBlockLists, pReq); taosArrayPush(pInfo->pBlockLists, pReq);
} }
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
taosArrayPush(pInfo->pBlockLists, input); taosArrayPush(pInfo->pBlockLists, input);
...@@ -150,6 +152,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -150,6 +152,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SPackedData tmp = { .pDataBlock = pDataBlock }; SPackedData tmp = { .pDataBlock = pDataBlock };
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else { } else {
ASSERT(0); ASSERT(0);
......
...@@ -1798,8 +1798,9 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) ...@@ -1798,8 +1798,9 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; const char* id = GET_TASKID(pTaskInfo);
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream scan started, %s", GET_TASKID(pTaskInfo)); qDebug("stream scan started, %s", GET_TASKID(pTaskInfo));
...@@ -1923,6 +1924,8 @@ FETCH_NEXT_BLOCK: ...@@ -1923,6 +1924,8 @@ FETCH_NEXT_BLOCK:
} }
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
SSDataBlock* pBlock = pPacked->pDataBlock; SSDataBlock* pBlock = pPacked->pDataBlock;
if (pBlock->info.parTbName[0]) { if (pBlock->info.parTbName[0]) {
...@@ -2057,7 +2060,6 @@ FETCH_NEXT_BLOCK: ...@@ -2057,7 +2060,6 @@ FETCH_NEXT_BLOCK:
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
const char* id = GET_TASKID(pTaskInfo);
SSDataBlock* pBlock = pInfo->pRes; SSDataBlock* pBlock = pInfo->pRes;
SDataBlockInfo* pBlockInfo = &pBlock->info; SDataBlockInfo* pBlockInfo = &pBlock->info;
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
......
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
#include "streamInc.h" #include "streamInc.h"
// maximum allowed processed block batches. One block may include several submit blocks // maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 16 #define MIN_STREAM_EXEC_BATCH_NUM 8
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000
static int32_t updateCheckPointInfo (SStreamTask* pTask); static int32_t updateCheckPointInfo (SStreamTask* pTask);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册