提交 a85a73e2 编写于 作者: D dapan1121

fix: fix memory leak issues

上级 9d148b42
......@@ -33,6 +33,7 @@ typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, i
typedef struct {
void *handle;
bool localExec;
localFetchFp fp;
SArray *explainRes;
} SLocalFetch;
......
......@@ -28,7 +28,7 @@ void qExplainFreeResNode(SExplainResNode *resNode) {
return;
}
taosMemoryFreeClear(resNode->pExecInfo);
taosArrayDestroy(resNode->pExecInfo);
SNode *node = NULL;
FOREACH(node, resNode->pChildren) { qExplainFreeResNode((SExplainResNode *)node); }
......@@ -58,6 +58,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i);
tFreeSExplainRsp(rsp);
}
taosArrayDestroy(group->nodeExecInfo);
}
pIter = taosHashIterate(pCtx->groupHash, pIter);
......@@ -66,6 +67,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
taosHashCleanup(pCtx->groupHash);
taosArrayDestroy(pCtx->rows);
taosMemoryFreeClear(pCtx->tbuf);
taosMemoryFree(pCtx);
}
......
......@@ -1778,12 +1778,6 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
}
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
......@@ -1797,7 +1791,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
SDataBuf pBuf = {0};
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
loadRemoteDataCallback(pWrapper, &pBuf, code);
taosMemoryFree(pWrapper);
} else {
SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
}
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
sourceIndex, totalSources);
......@@ -4051,7 +4052,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
if (!pTaskInfo->localFetch.fp) {
if (!pTaskInfo->localFetch.localExec) {
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
}
......
......@@ -94,7 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t execNum = 0;
qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch, ctx->explainRes};
SLocalFetch localFetch = {(void*)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes};
SArray *pResList = taosArrayInit(4, POINTER_BYTES);
while (true) {
......
......@@ -371,6 +371,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
}
SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp));
taosMemoryFreeClear(msg);
break;
}
case TDMT_SCH_FETCH_RSP:
......
......@@ -898,7 +898,8 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
pTask->msgLen);
SCH_ERR_RET(code);
} else {
SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
//binary msg
//SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册