未验证 提交 82903443 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22484 from taosdata/fix/liaohj

fix(stream): fix memory leak and do some internal refactor.
...@@ -189,7 +189,7 @@ int32_t streamInit(); ...@@ -189,7 +189,7 @@ int32_t streamInit();
void streamCleanUp(); void streamCleanUp();
SStreamQueue* streamQueueOpen(int64_t cap); SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* queue); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
......
...@@ -766,6 +766,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -766,6 +766,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} }
streamFreeQitem(pTask->msgInfo.pData); streamFreeQitem(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -35,18 +35,17 @@ FAIL: ...@@ -35,18 +35,17 @@ FAIL:
return NULL; return NULL;
} }
void streamQueueClose(SStreamQueue* queue) { void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
while (1) { qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
void* qItem = streamQueueNextItem(queue);
if (qItem) { void* qItem = NULL;
streamFreeQitem(qItem); while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
} else { streamFreeQitem(qItem);
break;
}
} }
taosFreeQall(queue->qall);
taosCloseQueue(queue->queue); taosFreeQall(pQueue->qall);
taosMemoryFree(queue); taosCloseQueue(pQueue->queue);
taosMemoryFree(pQueue);
} }
#if 0 #if 0
......
...@@ -220,11 +220,11 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -220,11 +220,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) { if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue); streamQueueClose(pTask->inputQueue, pTask->id.taskId);
} }
if (pTask->outputInfo.queue) { if (pTask->outputInfo.queue) {
streamQueueClose(pTask->outputInfo.queue); streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId);
} }
if (pTask->exec.qmsg) { if (pTask->exec.qmsg) {
...@@ -255,6 +255,11 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -255,6 +255,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
} }
if (pTask->msgInfo.pData != NULL) {
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
}
if (pTask->id.idStr != NULL) { if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr); taosMemoryFree((void*)pTask->id.idStr);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册