From ddb39a8dc236284ae9d26a46a26aac34c3825369 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 14 Jul 2022 16:06:28 +0800 Subject: [PATCH] fix: some problems of planner --- include/libs/nodes/plannodes.h | 7 ++- source/libs/command/src/explain.c | 20 +++---- source/libs/executor/src/cachescanoperator.c | 8 +-- source/libs/executor/src/executorimpl.c | 8 +-- source/libs/function/src/builtinsimpl.c | 53 +++++++++---------- source/libs/nodes/src/nodesCodeFuncs.c | 38 +++++++++++-- source/libs/planner/src/planPhysiCreater.c | 23 ++++++-- source/libs/planner/src/planSpliter.c | 3 ++ source/libs/planner/test/planBasicTest.cpp | 2 + source/libs/planner/test/planSubqueryTest.cpp | 2 + 10 files changed, 109 insertions(+), 55 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 6a865b4e2a..b807aeca22 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -275,7 +275,12 @@ typedef struct SScanPhysiNode { typedef SScanPhysiNode STagScanPhysiNode; typedef SScanPhysiNode SBlockDistScanPhysiNode; -typedef SScanPhysiNode SLastRowScanPhysiNode; + +typedef struct SLastRowScanPhysiNode { + SScanPhysiNode scan; + SNodeList* pGroupTags; + bool groupSort; +} SLastRowScanPhysiNode; typedef struct SSystemTableScanPhysiNode { SScanPhysiNode scan; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index fde53b7064..3fa419e220 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -206,7 +206,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo } case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: { SLastRowScanPhysiNode *lastRowPhysiNode = (SLastRowScanPhysiNode *)pNode; - pPhysiChildren = lastRowPhysiNode->node.pChildren; + pPhysiChildren = lastRowPhysiNode->scan.node.pChildren; break; } case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: { @@ -1209,19 +1209,19 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: { SLastRowScanPhysiNode *pLastRowNode = (SLastRowScanPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_LASTROW_SCAN_FORMAT, pLastRowNode->tableName.tname); + EXPLAIN_ROW_NEW(level, EXPLAIN_LASTROW_SCAN_FORMAT, pLastRowNode->scan.tableName.tname); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } - EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pLastRowNode->pScanCols->length); + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pLastRowNode->scan.pScanCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - if (pLastRowNode->pScanPseudoCols) { - EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pLastRowNode->pScanPseudoCols->length); + if (pLastRowNode->scan.pScanPseudoCols) { + EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pLastRowNode->scan.pScanPseudoCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } - EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->node.pOutputDataBlockDesc->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->scan.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); @@ -1230,15 +1230,15 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (verbose) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, - nodesGetOutputNumFromSlotList(pLastRowNode->node.pOutputDataBlockDesc->pSlots)); + nodesGetOutputNumFromSlotList(pLastRowNode->scan.node.pOutputDataBlockDesc->pSlots)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->node.pOutputDataBlockDesc->outputRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->scan.node.pOutputDataBlockDesc->outputRowSize); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - if (pLastRowNode->node.pConditions) { + if (pLastRowNode->scan.node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pLastRowNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, + QRY_ERR_RET(nodesNodeToSQL(pLastRowNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 0f6817cd6b..9034397d0f 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -40,10 +40,10 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead pInfo->pTableList = pTableList; pInfo->readHandle = *readHandle; - pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc); + pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); int32_t numOfCols = 0; - pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols, + pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, &numOfCols, COL_MATCH_FROM_COL_ID); int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); if (code != TSDB_CODE_SUCCESS) { @@ -53,10 +53,10 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_SINGLE, pTableList, taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); - if (pScanNode->pScanPseudoCols != NULL) { + if (pScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup; - pPseudoExpr->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs); + pPseudoExpr->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs); pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 674bbfef0b..851e3bc5bf 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4444,21 +4444,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo // return NULL; // } - int32_t code = extractTableSchemaInfo(pHandle, pScanNode->uid, pTaskInfo); + int32_t code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; } pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); - if (pScanNode->tableType == TSDB_SUPER_TABLE) { - code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList); + if (pScanNode->scan.tableType == TSDB_SUPER_TABLE) { + code = vnodeGetAllTableList(pHandle->vnode, pScanNode->scan.uid, pTableListInfo->pTableList); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; } } else { // Create one table group. - STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->uid, .groupId = 0}; + STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->scan.uid, .groupId = 0}; taosArrayPush(pTableListInfo->pTableList, &info); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e622c0c1af..320474ffb3 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -80,7 +80,7 @@ typedef struct STopBotRes { } STopBotRes; typedef struct SFirstLastRes { - bool hasResult; + bool hasResult; // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, // this attribute is required bool isNull; @@ -402,7 +402,6 @@ typedef struct SGroupKeyInfo { (x) += step; \ } while (0) - #define STATE_COMP(_op, _lval, _param) STATE_COMP_IMPL(_op, _lval, GET_STATE_VAL(_param)) #define GET_STATE_VAL(param) ((param.nType == TSDB_DATA_TYPE_BIGINT) ? (param.i) : (param.d)) @@ -986,8 +985,8 @@ int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { int32_t start = pInput->startRowIndex; - for(int32_t i = start; i < start + pInput->numOfRows; ++i) { - char* data = colDataGetData(pCol, i); + for (int32_t i = start; i < start + pInput->numOfRows; ++i) { + char* data = colDataGetData(pCol, i); SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data); avgTransferInfo(pInputInfo, pInfo); } @@ -2559,8 +2558,8 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) { int32_t start = pInput->startRowIndex; - for(int32_t i = start; i < start + pInput->numOfRows; ++i) { - char* data = colDataGetData(pCol, i); + for (int32_t i = start; i < start + pInput->numOfRows; ++i) { + char* data = colDataGetData(pCol, i); SAPercentileInfo* pInputInfo = (SAPercentileInfo*)varDataVal(data); apercentileTransferInfo(pInputInfo, pInfo); } @@ -2925,8 +2924,8 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer int32_t start = pInput->startRowIndex; int32_t numOfElems = 0; - for(int32_t i = start; i < start + pInput->numOfRows; ++i) { - char* data = colDataGetData(pCol, i); + for (int32_t i = start; i < start + pInput->numOfRows; ++i) { + char* data = colDataGetData(pCol, i); SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data); firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery); if (!numOfElems) { @@ -2951,7 +2950,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); - colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes); + colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes); // handle selectivity STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); @@ -3754,8 +3753,8 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) { int32_t start = pInput->startRowIndex; - for(int32_t i = start; i < start + pInput->numOfRows; ++i) { - char* data = colDataGetData(pCol, i); + for (int32_t i = start; i < start + pInput->numOfRows; ++i) { + char* data = colDataGetData(pCol, i); SSpreadInfo* pInputInfo = (SSpreadInfo*)varDataVal(data); spreadTransferInfo(pInputInfo, pInfo); } @@ -3926,8 +3925,8 @@ int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx) { int32_t start = pInput->startRowIndex; - for(int32_t i = start; i < start + pInput->numOfRows; ++i) { - char* data = colDataGetData(pCol, i); + for (int32_t i = start; i < start + pInput->numOfRows; ++i) { + char* data = colDataGetData(pCol, i); SElapsedInfo* pInputInfo = (SElapsedInfo*)varDataVal(data); elapsedTransferInfo(pInputInfo, pInfo); } @@ -4191,13 +4190,9 @@ static int32_t histogramFunctionImpl(SqlFunctionCtx* pCtx, bool isPartial) { return TSDB_CODE_SUCCESS; } -int32_t histogramFunction(SqlFunctionCtx* pCtx) { - return histogramFunctionImpl(pCtx, false); -} +int32_t histogramFunction(SqlFunctionCtx* pCtx) { return histogramFunctionImpl(pCtx, false); } -int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx) { - return histogramFunctionImpl(pCtx, true); -} +int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx) { return histogramFunctionImpl(pCtx, true); } static void histogramTransferInfo(SHistoFuncInfo* pInput, SHistoFuncInfo* pOutput) { pOutput->normalized = pInput->normalized; @@ -4219,8 +4214,8 @@ int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx) { int32_t start = pInput->startRowIndex; - for(int32_t i = start; i < start + pInput->numOfRows; ++i) { - char* data = colDataGetData(pCol, i); + for (int32_t i = start; i < start + pInput->numOfRows; ++i) { + char* data = colDataGetData(pCol, i); SHistoFuncInfo* pInputInfo = (SHistoFuncInfo*)varDataVal(data); histogramTransferInfo(pInputInfo, pInfo); } @@ -4267,9 +4262,9 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getHistogramInfoSize(); - char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getHistogramInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); @@ -4440,8 +4435,8 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) { int32_t start = pInput->startRowIndex; - for(int32_t i = start; i < start + pInput->numOfRows; ++i) { - char* data = colDataGetData(pCol, i); + for (int32_t i = start; i < start + pInput->numOfRows; ++i) { + char* data = colDataGetData(pCol, i); SHLLInfo* pInputInfo = (SHLLInfo*)varDataVal(data); hllTransferInfo(pInputInfo, pInfo); } @@ -5998,7 +5993,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t type = pInputCol->info.type; + int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; pInfo->bytes = bytes; @@ -6010,7 +6005,6 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - if (colDataIsNull_s(pInputCol, i)) { pInfo->isNull = true; } else { @@ -6023,7 +6017,6 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { } pInfo->ts = cts; - pInfo->hasResult = true; pResInfo->numOfRes = 1; if (pCtx->subsidiaries.num > 0) { @@ -6034,6 +6027,8 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); } } + + pInfo->hasResult = true; } } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 75f9cd7b48..2d24b01ca1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1468,9 +1468,36 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) { return code; } -static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) { return physiScanNodeToJson(pObj, pJson); } +static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags"; +static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort"; -static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiScanNode(pJson, pObj); } +static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) { + const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj; + + int32_t code = physiScanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkLastRowScanPhysiPlanGroupTags, pNode->pGroupTags); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkLastRowScanPhysiPlanGroupSort, pNode->groupSort); + } + + return code; +} + +static int32_t jsonToPhysiLastRowScanNode(const SJson* pJson, void* pObj) { + SLastRowScanPhysiNode* pNode = (SLastRowScanPhysiNode*)pObj; + + int32_t code = jsonToPhysiScanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkLastRowScanPhysiPlanGroupTags, &pNode->pGroupTags); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkLastRowScanPhysiPlanGroupSort, &pNode->groupSort); + } + + return code; +} static const char* jkTableScanPhysiPlanScanCount = "ScanCount"; static const char* jkTableScanPhysiPlanReverseScanCount = "ReverseScanCount"; @@ -4315,8 +4342,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return logicPlanToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: + return physiScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: - return physiTagScanNodeToJson(pObj, pJson); + return physiLastRowScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: @@ -4461,8 +4489,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToLogicPlan(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: + return jsonToPhysiScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: - return jsonToPhysiTagScanNode(pJson, pObj); + + return jsonToPhysiLastRowScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 8c9c06be35..c4cc31c535 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -481,8 +481,6 @@ static ENodeType getScanOperatorType(EScanType scanType) { return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; case SCAN_TYPE_BLOCK_INFO: return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; - case SCAN_TYPE_LAST_ROW: - return QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; default: break; } @@ -502,6 +500,24 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode); } +static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, + SPhysiNode** pPhyNode) { + SLastRowScanPhysiNode* pScan = + (SLastRowScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN); + if (NULL == pScan) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags); + if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) { + nodesDestroyNode((SNode*)pScan); + return TSDB_CODE_OUT_OF_MEMORY; + } + pScan->groupSort = pScanLogicNode->groupSort; + + return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); +} + static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, @@ -583,8 +599,9 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, switch (pScanLogicNode->scanType) { case SCAN_TYPE_TAG: case SCAN_TYPE_BLOCK_INFO: - case SCAN_TYPE_LAST_ROW: return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); + case SCAN_TYPE_LAST_ROW: + return createLastRowScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); case SCAN_TYPE_TABLE: return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); case SCAN_TYPE_SYSTEM_TABLE: diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 7b9d553501..871bcf015b 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -364,6 +364,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic pMergeWindow->node.pTargets = NULL; SNodeList* pChildren = pMergeWindow->node.pChildren; pMergeWindow->node.pChildren = NULL; + SNode* pConditions = pMergeWindow->node.pConditions; + pMergeWindow->node.pConditions = NULL; int32_t code = TSDB_CODE_SUCCESS; SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow); @@ -373,6 +375,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic if (TSDB_CODE_SUCCESS == code) { pMergeWindow->node.pTargets = pTargets; + pMergeWindow->node.pConditions = pConditions; pPartWin->node.pChildren = pChildren; splSetParent((SLogicNode*)pPartWin); code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs); diff --git a/source/libs/planner/test/planBasicTest.cpp b/source/libs/planner/test/planBasicTest.cpp index 0d6bab145a..8482698cd3 100644 --- a/source/libs/planner/test/planBasicTest.cpp +++ b/source/libs/planner/test/planBasicTest.cpp @@ -108,6 +108,8 @@ TEST_F(PlanBasicTest, lastRowFunc) { run("SELECT LAST_ROW(c1) FROM st1"); + run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME"); + run("SELECT LAST_ROW(c1), SUM(c3) FROM t1"); } diff --git a/source/libs/planner/test/planSubqueryTest.cpp b/source/libs/planner/test/planSubqueryTest.cpp index 4d4f780473..2ba3794d5c 100644 --- a/source/libs/planner/test/planSubqueryTest.cpp +++ b/source/libs/planner/test/planSubqueryTest.cpp @@ -36,6 +36,8 @@ TEST_F(PlanSubqeuryTest, basic) { run("SELECT * FROM (SELECT NOW() FROM t1)"); run("SELECT NOW() FROM (SELECT * FROM t1) ORDER BY ts"); + + run("SELECT * FROM (SELECT AVG(c1) a FROM st1 INTERVAL(10s)) WHERE a > 1"); } TEST_F(PlanSubqeuryTest, doubleGroupBy) { -- GitLab