提交 e35d1457 编写于 作者: H Haojun Liao

fix(stream): fix memory leak.

上级 30e7cb58
...@@ -135,7 +135,6 @@ typedef struct { ...@@ -135,7 +135,6 @@ typedef struct {
typedef struct { typedef struct {
int8_t type; int8_t type;
int64_t ver; int64_t ver;
int32_t* dataRef;
SSDataBlock* pBlock; SSDataBlock* pBlock;
} SStreamRefDataBlock; } SStreamRefDataBlock;
......
...@@ -947,6 +947,8 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream ...@@ -947,6 +947,8 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
SDecoder* pCoder = &(SDecoder){0}; SDecoder* pCoder = &(SDecoder){0};
SDeleteRes* pRes = &(SDeleteRes){0}; SDeleteRes* pRes = &(SDeleteRes){0};
*pRefBlock = NULL;
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
if (pRes->uidList == NULL) { if (pRes->uidList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -984,21 +986,13 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream ...@@ -984,21 +986,13 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
} }
taosArrayDestroy(pRes->uidList); taosArrayDestroy(pRes->uidList);
int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
*pRef = 1;
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
if (pRefBlock == NULL) { if (pRefBlock == NULL) {
taosMemoryFree(pRef);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
(*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
(*pRefBlock)->pBlock = pDelBlock; (*pRefBlock)->pBlock = pDelBlock;
(*pRefBlock)->dataRef = pRef;
atomic_add_fetch_32((*pRefBlock)->dataRef, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1069,8 +1063,6 @@ int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -1069,8 +1063,6 @@ int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK; pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
pRefBlock->pBlock = pDelBlock; pRefBlock->pBlock = pDelBlock;
pRefBlock->dataRef = pRef;
atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
atomic_sub_fetch_32(pRef, 1); atomic_sub_fetch_32(pRef, 1);
......
...@@ -29,7 +29,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { ...@@ -29,7 +29,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
if (code < 0) { if (code < 0) {
tqError("s-task:%s failed to put into queue, too many, next ver:%" PRId64, pTask->id.idStr, /*pPackedData->ver*/ 0L); tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr);
return -1; return -1;
} }
......
...@@ -1912,7 +1912,6 @@ FETCH_NEXT_BLOCK: ...@@ -1912,7 +1912,6 @@ FETCH_NEXT_BLOCK:
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
if (pInfo->validBlockIndex >= total) { if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo); doClearBufferedBlocks(pInfo);
/*pOperator->status = OP_EXEC_DONE;*/
return NULL; return NULL;
} }
......
...@@ -194,13 +194,7 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -194,13 +194,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosFreeQitem(pMerge); taosFreeQitem(pMerge);
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) { } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
blockDataDestroy(pRefBlock->pBlock);
int32_t ref = atomic_sub_fetch_32(pRefBlock->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
blockDataDestroy(pRefBlock->pBlock);
taosMemoryFree(pRefBlock->dataRef);
}
taosFreeQitem(pRefBlock); taosFreeQitem(pRefBlock);
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册