提交 5ca810a9 编写于 作者: H Haojun Liao

fix(query): fix memory leak.

上级 918df89b
...@@ -224,12 +224,12 @@ typedef struct SRequestObj { ...@@ -224,12 +224,12 @@ typedef struct SRequestObj {
SArray* tableList; SArray* tableList;
SQueryExecMetric metric; SQueryExecMetric metric;
SRequestSendRecvBody body; SRequestSendRecvBody body;
bool stableQuery; // todo refactor bool syncQuery; // todo refactor: async query object
bool validateOnly; // todo refactor bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool killed; bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
} SRequestObj; } SRequestObj;
typedef struct SSyncQueryParam { typedef struct SSyncQueryParam {
......
...@@ -320,6 +320,10 @@ void doDestroyRequest(void *p) { ...@@ -320,6 +320,10 @@ void doDestroyRequest(void *p) {
deregisterRequest(pRequest); deregisterRequest(pRequest);
} }
if (pRequest->syncQuery) {
taosMemoryFree(pRequest->body.param);
}
taosMemoryFree(pRequest); taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
} }
......
...@@ -307,7 +307,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -307,7 +307,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosThreadMutexUnlock(&appInfo.mutex); taosThreadMutexUnlock(&appInfo.mutex);
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
taosMemoryFree(pMsg->pData);
return code; return code;
} }
......
...@@ -2047,6 +2047,7 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) { ...@@ -2047,6 +2047,7 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
void syncQueryFn(void* param, void* res, int32_t code) { void syncQueryFn(void* param, void* res, int32_t code) {
SSyncQueryParam* pParam = param; SSyncQueryParam* pParam = param;
pParam->pRequest = res; pParam->pRequest = res;
if (pParam->pRequest) { if (pParam->pRequest) {
pParam->pRequest->code = code; pParam->pRequest->code = code;
} }
...@@ -2093,6 +2094,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { ...@@ -2093,6 +2094,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly); taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
tsem_wait(&param->sem); tsem_wait(&param->sem);
param->pRequest->syncQuery = true;
return param->pRequest; return param->pRequest;
#else #else
size_t sqlLen = strlen(sql); size_t sqlLen = strlen(sql);
......
...@@ -858,8 +858,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re ...@@ -858,8 +858,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId); STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId,
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <common/ttime.h> #include "ttime.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "index.h" #include "index.h"
...@@ -603,13 +603,15 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu ...@@ -603,13 +603,15 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
} }
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0 || const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_group_key") == 0) { if ((strcmp(pName, "_select_value") == 0) ||
(strcmp(pName, "_group_key") == 0)) {
pValCtx[num++] = &pCtx[i]; pValCtx[num++] = &pCtx[i];
} else if (fmIsSelectFunc(pCtx[i].functionId)) { } else if (fmIsSelectFunc(pCtx[i].functionId)) {
p = &pCtx[i]; p = &pCtx[i];
} }
} }
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("page_setSelect num:%d", num); qDebug("page_setSelect num:%d", num);
#endif #endif
......
...@@ -3330,7 +3330,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -3330,7 +3330,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
return (rows > 0) ? pInfo->pRes : NULL; return (rows > 0) ? pInfo->pRes : NULL;
} }
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
...@@ -3341,25 +3341,26 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult ...@@ -3341,25 +3341,26 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity); int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId; pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
pInfo->existNewGroupBlock = NULL; pInfo->existNewGroupBlock = NULL;
// *newgroup = true;
} }
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
// *newgroup = false; int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity); taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) { if (pInfo->pRes->info.rows > pResultInfo->threshold) {
return; return;
} }
} }
// handle the cached new group data block // handle the cached new group data block
if (pInfo->existNewGroupBlock) { if (pInfo->existNewGroupBlock) {
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo); doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
} }
} }
...@@ -3372,8 +3373,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -3372,8 +3373,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock); blockDataCleanup(pResBlock);
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo); doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) { if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) {
return pResBlock; return pResBlock;
} }
...@@ -3407,7 +3408,9 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -3407,7 +3408,9 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
} }
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity);
int32_t numOfResultRows = pOperator->resultInfo.capacity - pBlock->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pBlock, numOfResultRows);
// current group has no more result to return // current group has no more result to return
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
...@@ -3417,13 +3420,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -3417,13 +3420,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
return pResBlock; return pResBlock;
} }
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo); doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) { if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
return pResBlock; return pResBlock;
} }
} else if (pInfo->existNewGroupBlock) { // try next group } else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL); assert(pBlock != NULL);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, NULL, pTaskInfo); doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold) { if (pResBlock->info.rows > pResultInfo->threshold) {
return pResBlock; return pResBlock;
} }
...@@ -4032,8 +4035,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t ...@@ -4032,8 +4035,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
} }
} }
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -4065,7 +4067,6 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -4065,7 +4067,6 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
} }
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult;
pInfo->pCondition = pPhyFillNode->node.pConditions; pInfo->pCondition = pPhyFillNode->node.pConditions;
pInfo->pColMatchColInfo = pColMatchColInfo; pInfo->pColMatchColInfo = pColMatchColInfo;
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
...@@ -4438,21 +4439,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4438,21 +4439,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
// return NULL; // return NULL;
// } // }
int32_t code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo); int32_t code = extractTableSchemaInfo(pHandle, pScanNode->uid, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pScanNode->scan.tableType == TSDB_SUPER_TABLE) { if (pScanNode->tableType == TSDB_SUPER_TABLE) {
code = vnodeGetAllTableList(pHandle->vnode, pScanNode->scan.uid, pTableListInfo->pTableList); code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
} else { // Create one table group. } else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->scan.uid, .groupId = 0}; STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->uid, .groupId = 0};
taosArrayPush(pTableListInfo->pTableList, &info); taosArrayPush(pTableListInfo->pTableList, &info);
} }
...@@ -4605,7 +4606,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4605,7 +4606,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo); pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo); pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
......
...@@ -444,14 +444,12 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -444,14 +444,12 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
trans.pHandle = pMsg->handle; trans.pHandle = pMsg->handle;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans)); SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus)); SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
_return: _return:
tFreeSSchedulerHbRsp(&rsp); tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param); taosMemoryFree(param);
taosMemoryFree(pMsg->pData);
SCH_RET(code); SCH_RET(code);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册