未验证 提交 77053a31 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #14775 from taosdata/fix/TD-17231

metaReader/decoder: clear to release tDecoderMalloc
...@@ -576,14 +576,15 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -576,14 +576,15 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t numOfRows = 0; int32_t numOfRows = 0;
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
int32_t outputSlotId = pExpr[k].base.resSchema.slotId; int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
SqlFunctionCtx* pfCtx = &pCtx[k]; SqlFunctionCtx* pfCtx = &pCtx[k];
SInputColumnInfoData* pInputData = &pfCtx->input; SInputColumnInfoData* pInputData = &pfCtx->input;
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
if (pResult->info.rows > 0 && !createNewColModel) { if (pResult->info.rows > 0 && !createNewColModel) {
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows); colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
pInputData->numOfRows);
} else { } else {
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
} }
...@@ -641,11 +642,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -641,11 +642,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
} else if (fmIsAggFunc(pfCtx->functionId)) { } else if (fmIsAggFunc(pfCtx->functionId)) {
// _group_key function for "partition by tbname" + csum(col_name) query // _group_key function for "partition by tbname" + csum(col_name) query
SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
int32_t slotId = pfCtx->param[0].pCol->slotId; int32_t slotId = pfCtx->param[0].pCol->slotId;
// todo handle the json tag // todo handle the json tag
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) { for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
bool isNull = colDataIsNull_s(pInput, f); bool isNull = colDataIsNull_s(pInput, f);
if (isNull) { if (isNull) {
colDataAppendNULL(pOutput, pResult->info.rows + f); colDataAppendNULL(pOutput, pResult->info.rows + f);
...@@ -3820,7 +3821,8 @@ _error: ...@@ -3820,7 +3821,8 @@ _error:
return NULL; return NULL;
} }
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) { static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
SExecTaskInfo* pTaskInfo) {
int32_t order = 0; int32_t order = 0;
int32_t scanFlag = 0; int32_t scanFlag = 0;
...@@ -3875,9 +3877,9 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { ...@@ -3875,9 +3877,9 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while (1) {
// here we need to handle the existsed group results // here we need to handle the existsed group results
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
for (int32_t k = 0; k < pSup->numOfExprs; ++k) { for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
SqlFunctionCtx* pCtx = &pSup->pCtx[k]; SqlFunctionCtx* pCtx = &pSup->pCtx[k];
...@@ -3975,15 +3977,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -3975,15 +3977,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pCondition = pPhyNode->node.pConditions; pInfo->pCondition = pPhyNode->node.pConditions;
pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator"; pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
...@@ -4048,8 +4050,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -4048,8 +4050,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo = SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc,
extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type); pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
...@@ -4057,18 +4059,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -4057,18 +4059,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error; goto _error;
} }
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult; pInfo->multigroupResult = multigroupResult;
pInfo->pCondition = pPhyFillNode->node.pConditions; pInfo->pCondition = pPhyFillNode->node.pConditions;
pInfo->pColMatchColInfo = pColMatchColInfo; pInfo->pColMatchColInfo = pColMatchColInfo;
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num; pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
...@@ -4118,6 +4120,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo ...@@ -4118,6 +4120,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
} else if (mr.me.type == TSDB_CHILD_TABLE) { } else if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder);
tb_uid_t suid = mr.me.ctbEntry.suid; tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid); metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册