未验证 提交 d09011de 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19190 from taosdata/feature/stream_main

fix memory leak
...@@ -487,8 +487,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -487,8 +487,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
taosMemoryFree(pReqs); taosMemoryFree(pReqs);
} }
return code; return code;
} else {
ASSERT(0);
} }
return 0; return 0;
} }
...@@ -514,7 +512,6 @@ int32_t streamDispatch(SStreamTask* pTask) { ...@@ -514,7 +512,6 @@ int32_t streamDispatch(SStreamTask* pTask) {
int32_t code = 0; int32_t code = 0;
if (streamDispatchAllBlocks(pTask, pBlock) < 0) { if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
ASSERT(0);
code = -1; code = -1;
streamQueueProcessFail(pTask->outputQueue); streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
......
...@@ -143,6 +143,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ...@@ -143,6 +143,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
taosArrayDestroy(pTask->checkReqIds); taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
streamTaskLaunchRecover(pTask, version); streamTaskLaunchRecover(pTask, version);
} }
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
......
...@@ -175,6 +175,8 @@ void tFreeSStreamTask(SStreamTask* pTask) { ...@@ -175,6 +175,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
} }
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
} }
if (pTask->pState) streamStateClose(pTask->pState); if (pTask->pState) streamStateClose(pTask->pState);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册