提交 aa7ea60b 编写于 作者: H Haojun Liao

fix(stream): update the free strategy.

上级 84146e6a
...@@ -34,7 +34,7 @@ typedef struct { ...@@ -34,7 +34,7 @@ typedef struct {
static SStreamGlobalEnv streamEnv; static SStreamGlobalEnv streamEnv;
void destroyStreamDataBlock(SStreamDataBlock* pBlock); void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock); int32_t streamDispatch(SStreamTask* pTask);
int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
......
...@@ -200,8 +200,10 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ...@@ -200,8 +200,10 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
int32_t code = 0; int32_t code = 0;
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else if (pTask->outputType == TASK_OUTPUT__SMA) { } else if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else { } else {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = taosWriteQitem(pTask->outputQueue->queue, pBlock); code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
...@@ -209,7 +211,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ...@@ -209,7 +211,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
return code; return code;
} }
streamDispatch(pTask, NULL); streamDispatch(pTask);
} }
return 0; return 0;
...@@ -254,10 +256,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -254,10 +256,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
return 0; return 0;
} }
SStreamDataBlock* pBlock = NULL;
// continue dispatch one block to down stream in pipeline // continue dispatch one block to down stream in pipeline
streamDispatch(pTask, &pBlock); streamDispatch(pTask);
destroyStreamDataBlock(pBlock);
return 0; return 0;
} }
......
...@@ -501,12 +501,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -501,12 +501,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
return code; return code;
} }
int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
if (pBlock != NULL) {
*pBlock = NULL;
}
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
if (numOfElems > 0) { if (numOfElems > 0) {
qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
...@@ -535,9 +531,7 @@ int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { ...@@ -535,9 +531,7 @@ int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
} }
if (pBlock != NULL) { // this block can be freed only when it has been pushed to down stream.
*pBlock = pDispatchedBlock; destroyStreamDataBlock(pDispatchedBlock);
}
return code; return code;
} }
...@@ -60,7 +60,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* ...@@ -60,7 +60,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
*totalSize += size; *totalSize += size;
*totalBlocks += numOfBlocks; *totalBlocks += numOfBlocks;
destroyStreamDataBlock(pStreamBlocks); // destroyStreamDataBlock(pStreamBlocks);
} else { } else {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
} }
...@@ -245,10 +245,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -245,10 +245,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
streamDispatch(pTask);
SStreamDataBlock* pBlock = NULL;
streamDispatch(pTask, &pBlock);
destroyStreamDataBlock(pBlock);
} }
if (finished) { if (finished) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册