From 8cef033f92c2c2b42f0a55660636a6d94f402764 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 8 Dec 2022 11:18:43 +0800 Subject: [PATCH] fix mem leak --- source/dnode/vnode/src/tq/tqSink.c | 132 ++++++++++++----------------- 1 file changed, 56 insertions(+), 76 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3dd802d605..428a1a965a 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -765,17 +765,12 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* int32_t len = 0; SSubmitReq2* pReq = NULL; SArray* tagArray = NULL; - SArray* createTbArray = NULL; SArray* pVals = NULL; if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) { goto _end; } - if (!(createTbArray = taosArrayInit(blockSz, POINTER_BYTES))) { - goto _end; - } - if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end; @@ -824,91 +819,77 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* tqDebug("failed to put delete req into write-queue since %s", terrstr()); } } else { + SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); + if (!pTbData) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } // create table req if (createTb) { - for (int32_t i = 0; i < blockSz; ++i) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - SVCreateTbReq* pCreateTbReq = NULL; - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - taosArrayPush(createTbArray, &pCreateTbReq); - continue; - } + SVCreateTbReq* pCreateTbReq = NULL; - if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { - goto _end; - }; - - // don't move to the end of loop as to destroy in the end of func when error occur - taosArrayPush(createTbArray, &pCreateTbReq); - - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); - - // set tag content - taosArrayClear(tagArray); - STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; - taosArrayPush(tagArray, &tagVal); - pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); - - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; - - // set table name - if (pDataBlock->info.parTbName[0]) { - pCreateTbReq->name = strdup(pDataBlock->info.parTbName); - } else { - pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); - } + if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { + goto _end; + }; + + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); + + // set tag content + taosArrayClear(tagArray); + STagVal tagVal = { + .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.id.groupId, + }; + taosArrayPush(tagArray, &tagVal); + pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + if (pTag == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; } - } + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - // SSubmitTbData req - int32_t rows = pDataBlock->info.rows; + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + pCreateTbReq->ctb.tagName = tagName; - SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); - if (!pTbData) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; + // set table name + if (pDataBlock->info.parTbName[0]) { + pCreateTbReq->name = strdup(pDataBlock->info.parTbName); + } else { + pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + } + pTbData->pCreateTbReq = pCreateTbReq; + pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; } - if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) { - taosMemoryFree(pTbData); - goto _end; - } pTbData->suid = suid; pTbData->uid = 0; // uid is assigned by vnode pTbData->sver = pTSchema->version; - tqDebug("tq sink, convert block1 %d, rows: %d", i, rows); + int32_t rows = pDataBlock->info.rows; - if (createTb) { - pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i); - if (pTbData->pCreateTbReq) pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) { + taosMemoryFreeClear(pTbData); + goto _end; } + tqDebug("tq sink, convert block1 %d, rows: %d", i, rows); + if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { taosArrayDestroy(pTbData->aRowP); taosMemoryFree(pTbData); @@ -979,7 +960,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* } } _end: - taosArrayDestroy(createTbArray); taosArrayDestroy(tagArray); taosArrayDestroy(pVals); // TODO: change -- GitLab