diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 02bb65b7622fdd49be72de93a2d0856be159261f..b47288bf4579fc9a4e5204f1840588a79cf2b564 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -189,7 +189,7 @@ int32_t streamInit(); void streamCleanUp(); SStreamQueue* streamQueueOpen(int64_t cap); -void streamQueueClose(SStreamQueue* queue); +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bb321734045e8cbc44906a7ecc57175047542b75..3d21db2d7985568e9d0b0a14b434bac48d351030 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -766,6 +766,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } streamFreeQitem(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index aaf9fdec724cf6765232cb8a77adfb36754ce6d9..22e09693c806af70b49764fb9dab3b82a26bcf06 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -35,18 +35,17 @@ FAIL: return NULL; } -void streamQueueClose(SStreamQueue* queue) { - while (1) { - void* qItem = streamQueueNextItem(queue); - if (qItem) { - streamFreeQitem(qItem); - } else { - break; - } +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { + qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue)); + + void* qItem = NULL; + while ((qItem = streamQueueNextItem(pQueue)) != NULL) { + streamFreeQitem(qItem); } - taosFreeQall(queue->qall); - taosCloseQueue(queue->queue); - taosMemoryFree(queue); + + taosFreeQall(pQueue->qall); + taosCloseQueue(pQueue->queue); + taosMemoryFree(pQueue); } #if 0 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9056fa8d93397bea55b216c202d5090259ab5419..eda9c1f2bbe4f8f077a1e681e11d0f0b48082bf4 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -220,11 +220,11 @@ void tFreeStreamTask(SStreamTask* pTask) { int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { - streamQueueClose(pTask->inputQueue); + streamQueueClose(pTask->inputQueue, pTask->id.taskId); } if (pTask->outputInfo.queue) { - streamQueueClose(pTask->outputInfo.queue); + streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId); } if (pTask->exec.qmsg) { @@ -255,6 +255,11 @@ void tFreeStreamTask(SStreamTask* pTask) { streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); } + if (pTask->msgInfo.pData != NULL) { + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; + } + if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); }