提交 f4864b87 编写于 作者: H Haojun Liao

fix(stream): fix memory leak.

上级 bfc4a072
......@@ -31,6 +31,12 @@ typedef struct {
void* timer;
} SStreamGlobalEnv;
typedef struct {
SEpSet epset;
int32_t taskId;
SRpcMsg msg;
} SStreamContinueExecInfo;
extern SStreamGlobalEnv streamEnv;
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
......
......@@ -648,12 +648,6 @@ int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistory
return 0;
}
typedef struct {
SEpSet epset;
int32_t taskId;
SRpcMsg msg;
} SStreamContinueExecInfo;
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
int32_t len = 0;
int32_t code = 0;
......
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <libs/transport/trpc.h>
#include <streamInt.h>
#include "executor.h"
#include "tstream.h"
#include "wal.h"
......@@ -203,6 +205,11 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
return 0;
}
static void freeItem(void* p) {
SStreamContinueExecInfo* pInfo = p;
rpcFreeCont(pInfo->msg.pCont);
}
void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s", pTask->id.idStr);
......@@ -252,7 +259,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
if (pTask->pRspMsgList != NULL) {
pTask->pRspMsgList = taosArrayDestroy(pTask->pRspMsgList);
taosArrayDestroyEx(pTask->pRspMsgList, freeItem);
pTask->pRspMsgList = NULL;
}
taosThreadMutexDestroy(&pTask->lock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册