未验证 提交 7688f8ed 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #16204 from taosdata/feature/stream

fix(mnode): memory leak
...@@ -636,6 +636,7 @@ typedef struct { ...@@ -636,6 +636,7 @@ typedef struct {
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
void tFreeStreamObj(SStreamObj* pObj);
typedef struct { typedef struct {
char streamName[TSDB_STREAM_FNAME_LEN]; char streamName[TSDB_STREAM_FNAME_LEN];
......
...@@ -34,6 +34,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate); ...@@ -34,6 +34,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName); SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb); int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
void mndFreeStb(SStbObj *pStb);
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst); void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize); void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
......
...@@ -116,6 +116,25 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { ...@@ -116,6 +116,25 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
return 0; return 0;
} }
void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema);
int32_t sz = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t taskSz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < taskSz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
tFreeSStreamTask(pTask);
}
taosArrayDestroy(pLevel);
}
taosArrayDestroy(pStream->tasks);
}
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL; if (pVgEpNew == NULL) return NULL;
......
...@@ -266,6 +266,15 @@ _OVER: ...@@ -266,6 +266,15 @@ _OVER:
return pRow; return pRow;
} }
void mndFreeStb(SStbObj *pStb) {
taosArrayDestroy(pStb->pFuncs);
taosMemoryFreeClear(pStb->pColumns);
taosMemoryFreeClear(pStb->pTags);
taosMemoryFreeClear(pStb->comment);
taosMemoryFreeClear(pStb->pAst1);
taosMemoryFreeClear(pStb->pAst2);
}
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb); mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
return 0; return 0;
...@@ -273,12 +282,7 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { ...@@ -273,12 +282,7 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb); mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
taosArrayDestroy(pStb->pFuncs); mndFreeStb(pStb);
taosMemoryFreeClear(pStb->pColumns);
taosMemoryFreeClear(pStb->pTags);
taosMemoryFreeClear(pStb->comment);
taosMemoryFreeClear(pStb->pAst1);
taosMemoryFreeClear(pStb->pAst2);
return 0; return 0;
} }
......
...@@ -167,6 +167,9 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) { ...@@ -167,6 +167,9 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) { static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
mTrace("stream:%s, perform delete action", pStream->name); mTrace("stream:%s, perform delete action", pStream->name);
taosWLockLatch(&pStream->lock);
tFreeStreamObj(pStream);
taosWUnLockLatch(&pStream->lock);
return 0; return 0;
} }
...@@ -493,10 +496,17 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre ...@@ -493,10 +496,17 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
stbObj.uid = pStream->targetStbUid; stbObj.uid = pStream->targetStbUid;
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
mndFreeStb(&stbObj);
goto _OVER;
}
tFreeSMCreateStbReq(&createReq);
mndFreeStb(&stbObj);
return 0; return 0;
_OVER: _OVER:
tFreeSMCreateStbReq(&createReq);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
return -1; return -1;
...@@ -715,6 +725,7 @@ _OVER: ...@@ -715,6 +725,7 @@ _OVER:
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
tFreeSCMCreateStreamReq(&createStreamReq); tFreeSCMCreateStreamReq(&createStreamReq);
tFreeStreamObj(&streamObj);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册