未验证 提交 557531da 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #12202 from taosdata/feature/3.0_liaohj

fix(query): fix memory leak.
...@@ -115,8 +115,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks); ...@@ -115,8 +115,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData); void colDataDestroy(SColumnInfoData* pColData);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols,
// sometimes info.numOfCols != array size
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
......
...@@ -165,15 +165,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t ...@@ -165,15 +165,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/ */
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
/**
* release the query handle and decrease the reference count in cache
* @param pMgmt
* @param pQInfo
* @param freeHandle
* @return
*/
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes);
......
...@@ -225,6 +225,7 @@ typedef struct SExecTaskInfo { ...@@ -225,6 +225,7 @@ typedef struct SExecTaskInfo {
char* sql; // query sql string char* sql; // query sql string
jmp_buf env; // jump to this position when error happens. jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SSubplan *plan;
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
} SExecTaskInfo; } SExecTaskInfo;
......
...@@ -36,21 +36,6 @@ typedef struct STaskMgmt { ...@@ -36,21 +36,6 @@ typedef struct STaskMgmt {
bool closed; bool closed;
} STaskMgmt; } STaskMgmt;
static void taskMgmtKillTaskFn(void* handle, void* param1) {
void** fp = (void**)handle;
qKillTask(*fp);
}
static void freeqinfoFn(void *qhandle) {
void** handle = qhandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillTask(*handle);
qDestroyTask(*handle);
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL); assert(readHandle != NULL && pSubplan != NULL);
......
...@@ -1225,6 +1225,8 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -1225,6 +1225,8 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
taosVariantDestroy(&pCtx[i].tag); taosVariantDestroy(&pCtx[i].tag);
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx); taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
taosMemoryFree(pCtx[i].input.pData);
taosMemoryFree(pCtx[i].input.pColumnDataAgg);
} }
taosMemoryFreeClear(pCtx); taosMemoryFreeClear(pCtx);
...@@ -2840,9 +2842,9 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray ...@@ -2840,9 +2842,9 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList) { SArray* pColList) {
blockDataEnsureCapacity(pRes, numOfRows);
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
blockDataEnsureCapacity(pRes, numOfRows);
int32_t dataLen = *(int32_t*)pData; int32_t dataLen = *(int32_t*)pData;
pData += sizeof(int32_t); pData += sizeof(int32_t);
...@@ -2898,20 +2900,23 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -2898,20 +2900,23 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
SSDataBlock block = {.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)), .info.numOfCols = numOfCols}; SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {0}; SColumnInfoData idata = {0};
idata.info.type = pSchema[i].type; idata.info.type = pSchema[i].type;
idata.info.bytes = pSchema[i].bytes; idata.info.bytes = pSchema[i].bytes;
idata.info.colId = pSchema[i].colId; idata.info.colId = pSchema[i].colId;
taosArrayPush(block.pDataBlock, &idata); taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(idata.info.type)) { if (IS_VAR_DATA_TYPE(idata.info.type)) {
block.info.hasVarCol = true; pBlock->info.hasVarCol = true;
} }
} }
blockDataEnsureCapacity(&block, numOfRows); blockDataEnsureCapacity(pBlock, numOfRows);
int32_t dataLen = *(int32_t*)pStart; int32_t dataLen = *(int32_t*)pStart;
uint64_t groupId = *(uint64_t*)(pStart + sizeof(int32_t)); uint64_t groupId = *(uint64_t*)(pStart + sizeof(int32_t));
...@@ -2924,7 +2929,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -2924,7 +2929,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
colLen[i] = htonl(colLen[i]); colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] >= 0); ASSERT(colLen[i] >= 0);
SColumnInfoData* pColInfoData = taosArrayGet(block.pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colLen[i]; pColInfoData->varmeta.length = colLen[i];
pColInfoData->varmeta.allocLen = colLen[i]; pColInfoData->varmeta.allocLen = colLen[i];
...@@ -2943,7 +2948,10 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -2943,7 +2948,10 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
} }
// data from mnode // data from mnode
relocateColumnData(pRes, pColList, block.pDataBlock); relocateColumnData(pRes, pColList, pBlock->pDataBlock);
taosArrayDestroy(pBlock->pDataBlock);
taosMemoryFree(pBlock);
// blockDataDestroy(pBlock);
} }
pRes->info.rows = numOfRows; pRes->info.rows = numOfRows;
...@@ -4184,6 +4192,18 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { ...@@ -4184,6 +4192,18 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
pOperator->numOfDownstream = 0; pOperator->numOfDownstream = 0;
} }
if (pOperator->pExpr != NULL) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
SExprInfo* pExprInfo = &pOperator->pExpr[i];
if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) {
taosMemoryFree(pExprInfo->base.pParam[0].pCol);
}
taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr);
}
}
taosMemoryFree(pOperator->pExpr);
taosMemoryFreeClear(pOperator->info); taosMemoryFreeClear(pOperator->info);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
} }
...@@ -4195,8 +4215,6 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -4195,8 +4215,6 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
// pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
// pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ || if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ ||
pAggSup->pResultRowHashTable == NULL) { pAggSup->pResultRowHashTable == NULL) {
...@@ -4358,6 +4376,8 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -4358,6 +4376,8 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param; SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pPseudoColInfo);
} }
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -5189,6 +5209,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -5189,6 +5209,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
(*pTaskInfo)->plan = pPlan;
return code; return code;
_complete: _complete:
...@@ -5287,10 +5308,10 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { ...@@ -5287,10 +5308,10 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo); doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo);
destroyOperatorInfo(pTaskInfo->pRoot);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults); // taosHashCleanup(pTaskInfo->summary.operatorProfResults);
destroyOperatorInfo(pTaskInfo->pRoot);
taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo); taosMemoryFreeClear(pTaskInfo);
......
...@@ -670,6 +670,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { ...@@ -670,6 +670,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
} }
taosArrayDestroy(pInfo->scanCols);
} }
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
...@@ -1154,7 +1156,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe ...@@ -1154,7 +1156,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
NULL, NULL, NULL); NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
...@@ -1248,6 +1250,11 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1248,6 +1250,11 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SExprInfo* pExprInfo = &pOperator->pExpr[0]; SExprInfo* pExprInfo = &pOperator->pExpr[0];
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
if (taosArrayGetSize(pInfo->pTableGroups->pGroupList) == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0); SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0);
char str[512] = {0}; char str[512] = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册