提交 336102b8 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 5cbad1da
...@@ -33,7 +33,8 @@ typedef struct { ...@@ -33,7 +33,8 @@ typedef struct {
static SStreamGlobalEnv streamEnv; static SStreamGlobalEnv streamEnv;
int32_t streamDispatch(SStreamTask* pTask); void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock);
int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
......
...@@ -200,22 +200,16 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ...@@ -200,22 +200,16 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
int32_t code = 0; int32_t code = 0;
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
} else if (pTask->outputType == TASK_OUTPUT__SMA) { } else if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
} else { } else {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = taosWriteQitem(pTask->outputQueue->queue, pBlock); code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (code != 0) { if (code != 0) {
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
return code; return code;
} }
streamDispatch(pTask); streamDispatch(pTask, NULL);
} }
return 0; return 0;
...@@ -260,8 +254,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -260,8 +254,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
return 0; return 0;
} }
// continue dispatch SStreamDataBlock* pBlock = NULL;
streamDispatch(pTask); // continue dispatch one block to down stream in pipeline
streamDispatch(pTask, &pBlock);
destroyStreamDataBlock(pBlock);
return 0; return 0;
} }
......
...@@ -501,7 +501,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -501,7 +501,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
return code; return code;
} }
int32_t streamDispatch(SStreamTask* pTask) { int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
...@@ -517,23 +517,24 @@ int32_t streamDispatch(SStreamTask* pTask) { ...@@ -517,23 +517,24 @@ int32_t streamDispatch(SStreamTask* pTask) {
return 0; return 0;
} }
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) { if (pDispatchedBlock == NULL) {
qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0; return 0;
} }
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK);
int32_t code = 0; int32_t code = streamDispatchAllBlocks(pTask, *pBlock);
if (streamDispatchAllBlocks(pTask, pBlock) < 0) { if (code != TSDB_CODE_SUCCESS) {
code = -1;
streamQueueProcessFail(pTask->outputQueue); streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
} }
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); if (pBlock != NULL) {
taosFreeQitem(pBlock); *pBlock = pDispatchedBlock;
}
return code; return code;
} }
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
static int32_t updateCheckPointInfo (SStreamTask* pTask); static int32_t updateCheckPointInfo (SStreamTask* pTask);
static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
} }
...@@ -33,10 +33,11 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { ...@@ -33,10 +33,11 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) {
return (status == TASK_STATUS__PAUSE); return (status == TASK_STATUS__PAUSE);
} }
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
int32_t size, int64_t* totalSize, int32_t* totalBlocks) { int32_t* totalBlocks) {
int32_t code = updateCheckPointInfo(pTask); int32_t code = updateCheckPointInfo(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return code; return code;
} }
...@@ -44,6 +45,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* ...@@ -44,6 +45,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
if (numOfBlocks > 0) { if (numOfBlocks > 0) {
SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) { if (pStreamBlocks == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return -1; return -1;
} }
...@@ -51,18 +53,19 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* ...@@ -51,18 +53,19 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
code = streamTaskOutputResultBlock(pTask, pStreamBlocks); code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); destroyStreamDataBlock(pStreamBlocks);
taosFreeQitem(pStreamBlocks);
return -1; return -1;
} }
*totalSize += size;
*totalBlocks += numOfBlocks;
ASSERT(taosArrayGetSize(pRes) == 0);
destroyStreamDataBlock(pStreamBlocks);
} else { } else {
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
} }
*totalSize += size;
*totalBlocks += numOfBlocks;
ASSERT(taosArrayGetSize(pRes) == 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -73,12 +76,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -73,12 +76,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
*totalBlocks = 0; *totalBlocks = 0;
*totalSize = 0; *totalSize = 0;
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
int32_t size = 0; int32_t size = 0;
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) { while (1) {
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
taosArrayDestroy(pRes); // memory leak
return 0; return 0;
} }
...@@ -141,7 +145,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -141,7 +145,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
size = 0; size = 0;
numOfBlocks = 0; numOfBlocks = 0;
ASSERT(taosArrayGetSize(pRes) == 0);
} }
} }
...@@ -151,8 +154,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -151,8 +154,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} else {
ASSERT(taosArrayGetSize(pRes) == 0); taosArrayDestroy(pRes);
} }
return 0; return 0;
...@@ -238,7 +241,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -238,7 +241,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
streamDispatch(pTask);
SStreamDataBlock* pBlock = NULL;
streamDispatch(pTask, &pBlock);
destroyStreamDataBlock(pBlock);
} }
if (finished) { if (finished) {
...@@ -337,6 +343,11 @@ SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStr ...@@ -337,6 +343,11 @@ SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStr
return pStreamBlocks; return pStreamBlocks;
} }
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
}
int32_t streamExecForAll(SStreamTask* pTask) { int32_t streamExecForAll(SStreamTask* pTask) {
int32_t code = 0; int32_t code = 0;
while (1) { while (1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册