diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 1473345730347e826fcdada8b7e38271234baf14..722013115f072a8b4724dddb11735d706ba8f00b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -421,12 +421,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId); - if (doSendDispatchMsg(pTask, &req, vgId, pEpSet) < 0) { - taosArrayDestroyP(req.data, taosMemoryFree); - taosArrayDestroy(req.dataLen); - return code; - } - + code = doSendDispatchMsg(pTask, &req, vgId, pEpSet); + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return code; } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0);