提交 de7231e5 编写于 作者: H Haojun Liao

fix(stream): fix memory leak in handling dispatch msg when output buffer is full.

上级 0d2fa6f1
......@@ -129,11 +129,11 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
pTask->id.idStr);
} else {
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock) == 0) {
status = TASK_INPUT_STATUS__NORMAL;
} else { // input queue is full, upstream is blocked now
status = TASK_INPUT_STATUS__BLOCKED;
}
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
// input queue is full, upstream is blocked now
status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED;
}
// rsp by input status
......@@ -303,6 +303,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
taosFreeQitem(pItem);
return -1;
}
taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
......@@ -310,6 +311,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
size);
destroyStreamDataBlock((SStreamDataBlock*) pItem);
taosFreeQitem(pItem);
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册