提交 2b092f65 编写于 作者: L Liu Jicong

fix(query): memory leak

上级 1a5ae2bf
...@@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
ASSERT(smaObj.uid != 0); ASSERT(smaObj.uid != 0);
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name); snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN); smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
smaObj.stbUid = pStb->uid; smaObj.stbUid = pStb->uid;
......
...@@ -341,7 +341,7 @@ FAIL: ...@@ -341,7 +341,7 @@ FAIL:
return -1; return -1;
} }
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
if (pReader->tbIdHash) { if (pReader->tbIdHash) {
......
...@@ -3217,8 +3217,8 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa ...@@ -3217,8 +3217,8 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
} }
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SExecTaskInfo* pTaskInfo) { SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
SSDataBlock* pResBlock = pInfo->pFinalRes; SSDataBlock* pResBlock = pInfo->pFinalRes;
...@@ -3242,8 +3242,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOp ...@@ -3242,8 +3242,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOp
pInfo->existNewGroupBlock = NULL; pInfo->existNewGroupBlock = NULL;
} }
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SExecTaskInfo* pTaskInfo) { SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows); taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
...@@ -3259,8 +3259,8 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera ...@@ -3259,8 +3259,8 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) { static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
SFillOperatorInfo* pInfo = pOperator->info; SFillOperatorInfo* pInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pResBlock = pInfo->pFinalRes; SSDataBlock* pResBlock = pInfo->pFinalRes;
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL); projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
...@@ -3270,13 +3270,13 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo ...@@ -3270,13 +3270,13 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId); SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info); colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr]; SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
ASSERT(pCol->notFillCol); ASSERT(pCol->notFillCol);
SExprInfo* pExpr = pCol->pExpr; SExprInfo* pExpr = pCol->pExpr;
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
int32_t dstSlotId = pExpr->base.resSchema.slotId; int32_t dstSlotId = pExpr->base.resSchema.slotId;
SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId); SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId);
SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId); SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId);
...@@ -3664,7 +3664,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -3664,7 +3664,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
taosRemoveRef(exchangeObjRefPool, pExInfo->self); taosRemoveRef(exchangeObjRefPool, pExInfo->self);
} }
void freeSourceDataInfo(void *p) { void freeSourceDataInfo(void* p) {
SSourceDataInfo* pInfo = (SSourceDataInfo*)p; SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
taosMemoryFreeClear(pInfo->pRsp); taosMemoryFreeClear(pInfo->pRsp);
} }
...@@ -3694,8 +3694,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t ...@@ -3694,8 +3694,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
pInfo->pFillInfo = pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id); pInfo->primaryTsCol, order, id);
pInfo->win = win; pInfo->win = win;
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
...@@ -3721,10 +3721,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3721,10 +3721,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr); pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
SInterval* pInterval = SInterval* pInterval =
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
: &((SIntervalAggOperatorInfo*)downstream->info)->interval; : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
int32_t type = convertFillType(pPhyFillNode->mode); int32_t type = convertFillType(pPhyFillNode->mode);
...@@ -3741,9 +3741,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3741,9 +3741,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID); &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
int32_t code = int32_t code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order); pTaskInfo->id.str, pInterval, type, order);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -1649,6 +1649,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1649,6 +1649,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} }
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond)); memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond));
} else {
taosArrayDestroy(pColIds);
} }
// create the pseduo columns info // create the pseduo columns info
......
...@@ -1706,14 +1706,19 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1706,14 +1706,19 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pInfo->pPullDataRes); blockDataDestroy(pInfo->pPullDataRes);
taosArrayDestroy(pInfo->pRecycledPages); taosArrayDestroy(pInfo->pRecycledPages);
blockDataDestroy(pInfo->pUpdateRes); blockDataDestroy(pInfo->pUpdateRes);
taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes);
if (pInfo->pChildren) { if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput); destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
taosMemoryFree(pChildOp->pDownstream);
cleanupExprSupp(&pChildOp->exprSupp);
taosMemoryFreeClear(pChildOp); taosMemoryFreeClear(pChildOp);
} }
taosArrayDestroy(pInfo->pChildren);
} }
nodesDestroyNode((SNode*)pInfo->pPhyNode); nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
...@@ -2644,7 +2649,6 @@ void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2644,7 +2649,6 @@ void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......
...@@ -152,11 +152,12 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -152,11 +152,12 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
} }
void tFreeSStreamTask(SStreamTask* pTask) { void tFreeSStreamTask(SStreamTask* pTask) {
qDebug("free stream task %d", pTask->taskId);
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
taosArrayDestroy(pTask->childEpInfo); taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema); taosMemoryFree(pTask->tbSink.pTSchema);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册