提交 1b263602 编写于 作者: H Haojun Liao

fix(stream): fix memory leak.

上级 16d7707b
...@@ -337,6 +337,7 @@ struct SStreamTask { ...@@ -337,6 +337,7 @@ struct SStreamTask {
SMsgCb* pMsgCb; // msg handle SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend SStreamState* pState; // state backend
SArray* pRspMsgList; SArray* pRspMsgList;
TdThreadMutex lock;
// the followings attributes don't be serialized // the followings attributes don't be serialized
int32_t notReadyTasks; int32_t notReadyTasks;
......
...@@ -91,6 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -91,6 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
ASSERT(pTask->exec.pExecutor); ASSERT(pTask->exec.pExecutor);
taosThreadMutexInit(&pTask->lock, NULL);
streamSetupScheduleTrigger(pTask); streamSetupScheduleTrigger(pTask);
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
......
...@@ -921,6 +921,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -921,6 +921,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->status.taskStatus = TASK_STATUS__NORMAL; pTask->status.taskStatus = TASK_STATUS__NORMAL;
} }
taosThreadMutexInit(&pTask->lock, NULL);
streamSetupScheduleTrigger(pTask); streamSetupScheduleTrigger(pTask);
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
......
...@@ -691,10 +691,11 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, ...@@ -691,10 +691,11 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
info.msg.info = *pRpcInfo; info.msg.info = *pRpcInfo;
// todo: fix race condition here taosThreadMutexLock(&pTask->lock);
if (pTask->pRspMsgList == NULL) { if (pTask->pRspMsgList == NULL) {
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
} }
taosThreadMutexUnlock(&pTask->lock);
taosArrayPush(pTask->pRspMsgList, &info); taosArrayPush(pTask->pRspMsgList, &info);
......
...@@ -251,5 +251,10 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -251,5 +251,10 @@ void tFreeStreamTask(SStreamTask* pTask) {
tSimpleHashCleanup(pTask->pNameMap); tSimpleHashCleanup(pTask->pNameMap);
} }
if (pTask->pRspMsgList != NULL) {
pTask->pRspMsgList = taosArrayDestroy(pTask->pRspMsgList);
}
taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册