From 2a38443640e4547db2c0358ff012c7bc5a16b180 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 11 Jul 2022 18:01:55 +0800 Subject: [PATCH] metaReader/decoder: clear to release tDecoderMalloc --- source/libs/executor/src/executorimpl.c | 60 +++++++++++++------------ 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 29818e56bb..a85c7180ee 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -576,14 +576,15 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t numOfRows = 0; for (int32_t k = 0; k < numOfOutput; ++k) { - int32_t outputSlotId = pExpr[k].base.resSchema.slotId; - SqlFunctionCtx* pfCtx = &pCtx[k]; + int32_t outputSlotId = pExpr[k].base.resSchema.slotId; + SqlFunctionCtx* pfCtx = &pCtx[k]; SInputColumnInfoData* pInputData = &pfCtx->input; if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); if (pResult->info.rows > 0 && !createNewColModel) { - colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows); + colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], + pInputData->numOfRows); } else { colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); } @@ -641,11 +642,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } else if (fmIsAggFunc(pfCtx->functionId)) { // _group_key function for "partition by tbname" + csum(col_name) query SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - int32_t slotId = pfCtx->param[0].pCol->slotId; + int32_t slotId = pfCtx->param[0].pCol->slotId; // todo handle the json tag SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); - for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) { + for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) { bool isNull = colDataIsNull_s(pInput, f); if (isNull) { colDataAppendNULL(pOutput, pResult->info.rows + f); @@ -3819,7 +3820,8 @@ _error: return NULL; } -static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) { +static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, + SExecTaskInfo* pTaskInfo) { int32_t order = 0; int32_t scanFlag = 0; @@ -3874,9 +3876,9 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; - while(1) { + while (1) { // here we need to handle the existsed group results - if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method + if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method for (int32_t k = 0; k < pSup->numOfExprs; ++k) { SqlFunctionCtx* pCtx = &pSup->pCtx[k]; @@ -3974,15 +3976,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); - pInfo->binfo.pRes = pResBlock; - pInfo->pCondition = pPhyNode->node.pConditions; - pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); + pInfo->binfo.pRes = pResBlock; + pInfo->pCondition = pPhyNode->node.pConditions; + pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); - pOperator->name = "IndefinitOperator"; + pOperator->name = "IndefinitOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, @@ -4047,8 +4049,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = - extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, + &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type); @@ -4056,18 +4058,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - pInfo->pRes = pResBlock; - pInfo->multigroupResult = multigroupResult; - pInfo->pCondition = pPhyFillNode->node.pConditions; - pInfo->pColMatchColInfo = pColMatchColInfo; - pOperator->name = "FillOperator"; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; - pOperator->exprSupp.pExprInfo = pExprInfo; + pInfo->pRes = pResBlock; + pInfo->multigroupResult = multigroupResult; + pInfo->pCondition = pPhyFillNode->node.pConditions; + pInfo->pColMatchColInfo = pColMatchColInfo; + pOperator->name = "FillOperator"; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; + pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = num; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL); @@ -4117,6 +4119,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; } else if (mr.me.type == TSDB_CHILD_TABLE) { + tDecoderClear(&mr.coder); + tb_uid_t suid = mr.me.ctbEntry.suid; metaGetTableEntryByUid(&mr, suid); pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); -- GitLab