提交 3b24f882 编写于 作者: L Liu Jicong

fix mem leak

上级 fbc9e770
...@@ -756,24 +756,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -756,24 +756,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz); tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
void* pBuf = NULL; void* pBuf = NULL;
SSubmitReq2* pReq = NULL; SArray* tagArray = NULL;
SArray* tagArray = NULL; SArray* pVals = NULL;
SArray* pVals = NULL;
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) { if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
goto _end; goto _end;
} }
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
goto _end;
}
for (int32_t i = 0; i < blockSz; i++) { for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
...@@ -948,13 +938,17 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -948,13 +938,17 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
taosArrayPush(tbData.aRowP, &pRow); taosArrayPush(tbData.aRowP, &pRow);
} }
taosArrayClear(pReq->aSubmitTbData); SSubmitReq2 submitReq = {0};
taosArrayPush(pReq->aSubmitTbData, &tbData); if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
goto _end;
}
taosArrayPush(submitReq.aSubmitTbData, &tbData);
// encode // encode
int32_t len; int32_t len;
int32_t code; int32_t code;
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code);
SEncoder encoder; SEncoder encoder;
len += sizeof(SMsgHead); len += sizeof(SMsgHead);
pBuf = rpcMallocCont(len); pBuf = rpcMallocCont(len);
...@@ -964,7 +958,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -964,7 +958,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
((SMsgHead*)pBuf)->vgId = TD_VID(pVnode); ((SMsgHead*)pBuf)->vgId = TD_VID(pVnode);
((SMsgHead*)pBuf)->contLen = htonl(len); ((SMsgHead*)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr()); tqError("failed to encode submit req since %s", terrstr());
tEncoderClear(&encoder); tEncoderClear(&encoder);
...@@ -972,6 +966,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -972,6 +966,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
continue; continue;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE);
SRpcMsg msg = { SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT, .msgType = TDMT_VND_SUBMIT,
...@@ -988,8 +983,6 @@ _end: ...@@ -988,8 +983,6 @@ _end:
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
// TODO: change // TODO: change
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
} }
#if 0 #if 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册