From 336102b8c52905d47d455cea4a3352f1fe47d14a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 13:34:49 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/inc/streamInc.h | 3 +- source/libs/stream/src/stream.c | 14 +++------ source/libs/stream/src/streamDispatch.c | 19 ++++++------ source/libs/stream/src/streamExec.c | 41 ++++++++++++++++--------- 4 files changed, 43 insertions(+), 34 deletions(-) diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 7e1b4dcf2d..bd13d9ec37 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -33,7 +33,8 @@ typedef struct { static SStreamGlobalEnv streamEnv; -int32_t streamDispatch(SStreamTask* pTask); +void destroyStreamDataBlock(SStreamDataBlock* pBlock); +int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock); int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7f19529d11..80511cd491 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -200,22 +200,16 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock int32_t code = 0; if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); } else if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); } else { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); code = taosWriteQitem(pTask->outputQueue->queue, pBlock); if (code != 0) { - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); return code; } - streamDispatch(pTask); + streamDispatch(pTask, NULL); } return 0; @@ -260,8 +254,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return 0; } - // continue dispatch - streamDispatch(pTask); + SStreamDataBlock* pBlock = NULL; + // continue dispatch one block to down stream in pipeline + streamDispatch(pTask, &pBlock); + destroyStreamDataBlock(pBlock); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7e0fbd30b1..82b3f9a2f2 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -501,7 +501,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return code; } -int32_t streamDispatch(SStreamTask* pTask) { +int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); @@ -517,23 +517,24 @@ int32_t streamDispatch(SStreamTask* pTask) { return 0; } - SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); - if (pBlock == NULL) { + SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue); + if (pDispatchedBlock == NULL) { qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return 0; } - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); + ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK); - int32_t code = 0; - if (streamDispatchAllBlocks(pTask, pBlock) < 0) { - code = -1; + int32_t code = streamDispatchAllBlocks(pTask, *pBlock); + if (code != TSDB_CODE_SUCCESS) { streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); } - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); + if (pBlock != NULL) { + *pBlock = pDispatchedBlock; + } + return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 038bbbd35b..18afc367bd 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -23,7 +23,7 @@ static int32_t updateCheckPointInfo (SStreamTask* pTask); static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); - bool streamTaskShouldStop(const SStreamStatus* pStatus) { +bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); } @@ -33,10 +33,11 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } -static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, - int32_t size, int64_t* totalSize, int32_t* totalBlocks) { +static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, + int32_t* totalBlocks) { int32_t code = updateCheckPointInfo(pTask); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } @@ -44,6 +45,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* if (numOfBlocks > 0) { SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); if (pStreamBlocks == NULL) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return -1; } @@ -51,18 +53,19 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* code = streamTaskOutputResultBlock(pTask, pStreamBlocks); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(pStreamBlocks); + destroyStreamDataBlock(pStreamBlocks); return -1; } + + *totalSize += size; + *totalBlocks += numOfBlocks; + + ASSERT(taosArrayGetSize(pRes) == 0); + destroyStreamDataBlock(pStreamBlocks); } else { - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } - *totalSize += size; - *totalBlocks += numOfBlocks; - - ASSERT(taosArrayGetSize(pRes) == 0); return TSDB_CODE_SUCCESS; } @@ -73,12 +76,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i *totalBlocks = 0; *totalSize = 0; - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); int32_t size = 0; int32_t numOfBlocks = 0; + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); while (1) { if (streamTaskShouldStop(&pTask->status)) { + taosArrayDestroy(pRes); // memory leak return 0; } @@ -141,7 +145,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i size = 0; numOfBlocks = 0; - ASSERT(taosArrayGetSize(pRes) == 0); } } @@ -151,8 +154,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i if (code != TSDB_CODE_SUCCESS) { return code; } - - ASSERT(taosArrayGetSize(pRes) == 0); + } else { + taosArrayDestroy(pRes); } return 0; @@ -238,7 +241,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); - streamDispatch(pTask); + + SStreamDataBlock* pBlock = NULL; + streamDispatch(pTask, &pBlock); + destroyStreamDataBlock(pBlock); } if (finished) { @@ -337,6 +343,11 @@ SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStr return pStreamBlocks; } +void destroyStreamDataBlock(SStreamDataBlock* pBlock) { + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); +} + int32_t streamExecForAll(SStreamTask* pTask) { int32_t code = 0; while (1) { -- GitLab