diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 4e0b0630bc1f08969ebec2cf4a2ff674cb1824ff..f2b1db19e8eb3a15f9edf04d1bbbac3a6c750e0c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -487,8 +487,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat taosMemoryFree(pReqs); } return code; - } else { - ASSERT(0); } return 0; } @@ -514,7 +512,6 @@ int32_t streamDispatch(SStreamTask* pTask) { int32_t code = 0; if (streamDispatchAllBlocks(pTask, pBlock) < 0) { - ASSERT(0); code = -1; streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 6889a870d19b2ec01f2f77ab57d8e70866599053..52777fd83401454483955eababaffe3123243383 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -143,6 +143,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ASSERT(left >= 0); if (left == 0) { taosArrayDestroy(pTask->checkReqIds); + pTask->checkReqIds = NULL; streamTaskLaunchRecover(pTask, version); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f7252ed8a036152b2ae9fb46861fdb8ac6754d2d..e9aba0bc3941d501ba25a5ea28aaf02e11d42cbd 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -175,6 +175,8 @@ void tFreeSStreamTask(SStreamTask* pTask) { } if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + taosArrayDestroy(pTask->checkReqIds); + pTask->checkReqIds = NULL; } if (pTask->pState) streamStateClose(pTask->pState);