diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 177d09be764b046ae7683bbba4232e3603d67058..2db94064182bd2e8af5380870ae598473962269c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5569,7 +5569,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { blockDataCleanup(pInfo->pRes); - int32_t tableNameSlotId = 1; + int32_t tableNameSlotId = 0; SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId); char * name = NULL; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index fb9a2fc5325f4ccf866723a945142261f516575d..c3ad61dd3751f5c09d323b72f9d32af55674a74a 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -41,61 +41,145 @@ static int32_t getSlotKey(SNode* pNode, char* pKey) { return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName); } -static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId) { +static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId, bool output) { SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC); - CHECK_ALLOC(pSlot, NULL); + if (NULL == pSlot) { + return NULL; + } pSlot->slotId = slotId; pSlot->dataType = ((SExprNode*)pNode)->resType; pSlot->reserve = false; - pSlot->output = true; + pSlot->output = output; return (SNode*)pSlot; } -static SNode* createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId) { +static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) { STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET); if (NULL == pTarget) { - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } + pTarget->dataBlockId = dataBlockId; pTarget->slotId = slotId; pTarget->pExpr = pNode; - return (SNode*)pTarget; + + *pOutput = (SNode*)pTarget; + return TSDB_CODE_SUCCESS; } -static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { - SHashObj* pHash = NULL; +static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) { + SSlotIndex index = { .dataBlockId = dataBlockId, .slotId = slotId }; + return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex)); +} + +static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) { + char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; + int32_t len = getSlotKey(pNode, name); + return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash); +} + +static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId, SHashObj** pDescHash) { + SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (NULL == pHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) { + taosHashCleanup(pHash); + return TSDB_CODE_OUT_OF_MEMORY; + } + + *pDescHash = pHash; + return TSDB_CODE_SUCCESS; +} + +static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, SHashObj* pHash) { + pDataBlockDesc->pSlots = nodesMakeList(); if (NULL == pDataBlockDesc->pSlots) { - pDataBlockDesc->pSlots = nodesMakeList(); - CHECK_ALLOC(pDataBlockDesc->pSlots, TSDB_CODE_OUT_OF_MEMORY); + return TSDB_CODE_OUT_OF_MEMORY; + } - pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY); - if (NULL == taosArrayInsert(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId, &pHash)) { - taosHashCleanup(pHash); - return TSDB_CODE_OUT_OF_MEMORY; + int32_t code = TSDB_CODE_SUCCESS; + int16_t slotId = 0; + SNode* pNode = NULL; + FOREACH(pNode, pList) { + code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId, true)); + if (TSDB_CODE_SUCCESS == code) { + code = putSlotToHash(pDataBlockDesc->dataBlockId, slotId, pNode, pHash); + } + if (TSDB_CODE_SUCCESS == code) { + pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes; + ++slotId; + } else { + break; } + } + return code; +} + +static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) { + SDataBlockDescNode* pDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC); + if (NULL == pDesc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pDesc->dataBlockId = pCxt->nextDataBlockId++; + + SHashObj* pHash = NULL; + int32_t code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash); + if (TSDB_CODE_SUCCESS == code) { + code = buildDataBlockSlots(pCxt, pList, pDesc, pHash); + } + + if (TSDB_CODE_SUCCESS == code) { + *pDataBlockDesc = pDesc; } else { - pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); + nodesDestroyNode(pDesc); } - + + return code; +} + +static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, bool output) { + int32_t code = TSDB_CODE_SUCCESS; + SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); + int16_t nextSlotId = taosHashGetSize(pHash), slotId = 0; SNode* pNode = NULL; - int16_t slotId = taosHashGetSize(pHash); FOREACH(pNode, pList) { - CHECK_CODE_EXT(nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId))); - - SSlotIndex index = { .dataBlockId = pDataBlockDesc->dataBlockId, .slotId = slotId }; - char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; + char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0}; int32_t len = getSlotKey(pNode, name); - CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY); - - SNode* pTarget = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId); - CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY); - REPLACE_NODE(pTarget); + SSlotIndex* pIndex = taosHashGet(pHash, name, len); + if (NULL == pIndex) { + code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, nextSlotId, output)); + if (TSDB_CODE_SUCCESS == code) { + code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash); + } + pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes; + slotId = nextSlotId; + ++nextSlotId; + } else { + slotId = pIndex->slotId; + } - pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes; - ++slotId; + if (TSDB_CODE_SUCCESS == code) { + SNode* pTarget = NULL; + code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget); + if (TSDB_CODE_SUCCESS == code) { + REPLACE_NODE(pTarget); + } + } + + if (TSDB_CODE_SUCCESS != code) { + break; + } } - return TSDB_CODE_SUCCESS; + return code; +} + +static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { + return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, false); +} + +static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { + return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, false); } typedef struct SSetSlotIdCxt { @@ -114,10 +198,11 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { pIndex = taosHashGet(pCxt->pRightHash, name, len); } // pIndex is definitely not NULL, otherwise it is a bug - CHECK_ALLOC(pIndex, DEAL_RES_ERROR); + if (NULL == pIndex) { + return DEAL_RES_ERROR; + } ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId; ((SColumnNode*)pNode)->slotId = pIndex->slotId; - CHECK_ALLOC(pNode, DEAL_RES_ERROR); return DEAL_RES_IGNORE_CHILD; } return DEAL_RES_CONTINUE; @@ -144,7 +229,7 @@ static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i return TSDB_CODE_SUCCESS; } -static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNodeList* pList, SNodeList** pOutput) { +static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, const SNodeList* pList, SNodeList** pOutput) { SNodeList* pRes = nodesCloneList(pList); if (NULL == pRes) { return TSDB_CODE_OUT_OF_MEMORY; @@ -164,18 +249,17 @@ static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i return TSDB_CODE_SUCCESS; } -static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, ENodeType type) { +static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) { SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type); if (NULL == pPhysiNode) { return NULL; } - pPhysiNode->pOutputDataBlockDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC); - if (NULL == pPhysiNode->pOutputDataBlockDesc) { + + int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc); + if (TSDB_CODE_SUCCESS != code) { nodesDestroyNode(pPhysiNode); return NULL; } - pPhysiNode->pOutputDataBlockDesc->dataBlockId = pCxt->nextDataBlockId++; - pPhysiNode->pOutputDataBlockDesc->type = QUERY_NODE_DATABLOCK_DESC; return pPhysiNode; } @@ -186,24 +270,11 @@ static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pL return TSDB_CODE_SUCCESS; } -static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, SDataBlockDescNode* pDataBlockDesc) { - SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); - char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; - SNode* pNode; - FOREACH(pNode, pTargets) { - int32_t len = getSlotKey(pNode, name); - SSlotIndex* pIndex = taosHashGet(pHash, name, len); - // pIndex is definitely not NULL, otherwise it is a bug - CHECK_ALLOC(pIndex, TSDB_CODE_FAILED); - ((SSlotDescNode*)nodesListGetNode(pDataBlockDesc->pSlots, pIndex->slotId))->output = true; - } - - return TSDB_CODE_SUCCESS; -} - static SNodeptr createPrimaryKeyCol(SPhysiPlanContext* pCxt, uint64_t tableId) { SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); - CHECK_ALLOC(pCol, NULL); + if (NULL == pCol) { + return NULL; + } pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; pCol->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; pCol->tableId = tableId; @@ -244,8 +315,12 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanPhysiNode) || QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN == nodeType(pScanPhysiNode)) { pScanPhysiNode->pScanCols = nodesMakeList(); - CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid))); + if (NULL == pScanPhysiNode->pScanCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid))) { + return TSDB_CODE_OUT_OF_MEMORY; + } SNode* pNode; FOREACH(pNode, pScanCols) { @@ -255,29 +330,29 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys strcpy(pCol->colName, ((SColumnNode*)pNode)->colName); continue; } - CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode))); + if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode))) { + return TSDB_CODE_OUT_OF_MEMORY; + } } } else { pScanPhysiNode->pScanCols = nodesCloneList(pScanCols); - CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pScanPhysiNode->pScanCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } } - // return sortScanCols(pScanPhysiNode->pScanCols); - return TSDB_CODE_SUCCESS; + return sortScanCols(pScanPhysiNode->pScanCols); } static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) { int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols); if (TSDB_CODE_SUCCESS == code) { // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t - code = addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode); } - if (TSDB_CODE_SUCCESS == code) { - code = setSlotOutput(pCxt, pScanLogicNode->node.pTargets, pScanPhysiNode->node.pOutputDataBlockDesc); - } if (TSDB_CODE_SUCCESS == code) { pScanPhysiNode->uid = pScanLogicNode->pMeta->uid; pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType; @@ -302,7 +377,7 @@ static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAdd } static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN); + STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN); if (NULL == pTagScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -310,7 +385,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* p } static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); if (NULL == pTableScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -326,7 +401,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp } static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN); + SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN); if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -348,7 +423,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* } static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); + SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -412,7 +487,7 @@ static int32_t createJoinOutputCols(SPhysiPlanContext* pCxt, SDataBlockDescNode* } static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { - SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN); + SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_JOIN); if (NULL == pJoin) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -425,14 +500,11 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren code = createJoinOutputCols(pCxt, pLeftDesc, pRightDesc, &pJoin->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); } - if (TSDB_CODE_SUCCESS == code) { - code = setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, pJoin->node.pOutputDataBlockDesc); - } if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pJoin; @@ -452,7 +524,9 @@ typedef struct SRewritePrecalcExprsCxt { static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) { SNode* pExpr = nodesCloneNode(*pNode); - CHECK_ALLOC(pExpr, DEAL_RES_ERROR); + if (NULL == pExpr) { + return DEAL_RES_ERROR; + } if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) { nodesDestroyNode(pExpr); return DEAL_RES_ERROR; @@ -500,11 +574,15 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN if (NULL == *pPrecalcExprs) { *pPrecalcExprs = nodesMakeList(); - CHECK_ALLOC(*pPrecalcExprs, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == *pPrecalcExprs) { + return TSDB_CODE_OUT_OF_MEMORY; + } } if (NULL == *pRewrittenList) { *pRewrittenList = nodesMakeList(); - CHECK_ALLOC(*pRewrittenList, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == *pRewrittenList) { + return TSDB_CODE_OUT_OF_MEMORY; + } } SNode* pNode = NULL; FOREACH(pNode, pList) { @@ -514,8 +592,12 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN } else { pNew = nodesCloneNode(pNode); } - CHECK_ALLOC(pNew, TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE(nodesListAppend(*pRewrittenList, pNew), TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pNew) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) { + return TSDB_CODE_OUT_OF_MEMORY; + } } SRewritePrecalcExprsCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs }; nodesRewriteList(*pRewrittenList, doRewritePrecalcExprs, &cxt); @@ -527,7 +609,7 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN } static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode, SPhysiNode** pPhyNode) { - SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_AGG); + SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG); if (NULL == pAgg) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -545,30 +627,27 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pAgg->pExprs, pChildTupe); + code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe); } } if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc); } } if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc); } } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg); } - if (TSDB_CODE_SUCCESS == code) { - code = setSlotOutput(pCxt, pAggLogicNode->node.pTargets, pAgg->node.pOutputDataBlockDesc); - } if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pAgg; @@ -576,18 +655,22 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, nodesDestroyNode(pAgg); } + nodesDestroyList(pPrecalcExprs); + nodesDestroyList(pGroupKeys); + nodesDestroyList(pAggFuncs); + return code; } static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) { - SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT); + SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT); if (NULL == pProject) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1, pProjectLogicNode->pProjections, &pProject->pProjections); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pProject->pProjections, pProject->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pProject->pProjections, pProject->node.pOutputDataBlockDesc); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject); @@ -603,34 +686,30 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild } static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) { - SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); + SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; } pExchange->srcGroupId = pExchangeLogicNode->srcGroupId; - int32_t code = addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc); - - if (TSDB_CODE_SUCCESS == code) { - *pPhyNode = (SPhysiNode*)pExchange; - } else { - nodesDestroyNode(pExchange); - } + *pPhyNode = (SPhysiNode*)pExchange; - return code; + return TSDB_CODE_SUCCESS; } static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) { - SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); + SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t code = addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pScan->node.pOutputDataBlockDesc); + int32_t code = TSDB_CODE_SUCCESS; + + pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets); + if (NULL == pScan->pScanCols) { + code = TSDB_CODE_OUT_OF_MEMORY; + } if (TSDB_CODE_SUCCESS == code) { - pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets); - if (NULL == pScan->pScanCols) { - code = TSDB_CODE_OUT_OF_MEMORY; - } + code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc); } if (TSDB_CODE_SUCCESS == code) { @@ -660,21 +739,17 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pWindow->pExprs, pChildTupe); + code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe); } } if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc); } } - if (TSDB_CODE_SUCCESS == code) { - code = setSlotOutput(pCxt, pWindowLogicNode->node.pTargets, pWindow->node.pOutputDataBlockDesc); - } - if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pWindow; } else { @@ -685,7 +760,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* } static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { - SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_INTERVAL); + SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERVAL); if (NULL == pInterval) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -706,7 +781,7 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil } static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { - SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW); + SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW); if (NULL == pSession) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -875,17 +950,22 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l SNodeListNode* pGroup; if (level >= LIST_LENGTH(pSubplans)) { pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST); - CHECK_ALLOC(pGroup, TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE(nodesListStrictAppend(pSubplans, pGroup), TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pGroup) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, pGroup)) { + return TSDB_CODE_OUT_OF_MEMORY; + } } else { pGroup = nodesListGetNode(pSubplans, level); } if (NULL == pGroup->pNodeList) { pGroup->pNodeList = nodesMakeList(); - CHECK_ALLOC(pGroup->pNodeList, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pGroup->pNodeList) { + return TSDB_CODE_OUT_OF_MEMORY; + } } - CHECK_CODE(nodesListStrictAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY); - return TSDB_CODE_SUCCESS; + return nodesListStrictAppend(pGroup->pNodeList, pSubplan); } static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) {