From 7d850c1a5e8fc283baef118e8090ec8b7709a2ee Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 20 Jul 2023 15:07:55 +0800 Subject: [PATCH] feat: optimize partition by tbname slimit --- include/libs/nodes/plannodes.h | 4 + source/libs/executor/src/aggregateoperator.c | 91 +++++++++++-------- source/libs/executor/src/executil.c | 6 ++ source/libs/nodes/src/nodesCloneFuncs.c | 4 + source/libs/nodes/src/nodesCodeFuncs.c | 7 ++ source/libs/nodes/src/nodesMsgFuncs.c | 17 +++- source/libs/planner/inc/planInt.h | 3 + source/libs/planner/src/planOptimizer.c | 14 +++ source/libs/planner/src/planPhysiCreater.c | 2 + source/libs/planner/src/planSpliter.c | 58 +----------- source/libs/planner/src/planUtil.c | 54 +++++++++++ tests/system-test/2-query/columnLenUpdated.py | 2 +- .../5-taos-tools/taosbenchmark/insertMix.py | 5 + 13 files changed, 171 insertions(+), 96 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 453c5d4914..821defc797 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -55,6 +55,7 @@ typedef struct SLogicNode { EGroupAction groupAction; EOrder inputTsOrder; EOrder outputTsOrder; + bool forceCreateNonBlockingOptr; // true if the operator can use non-blocking(pipeline) mode } SLogicNode; typedef enum EScanType { @@ -105,6 +106,7 @@ typedef struct SScanLogicNode { bool hasNormalCols; // neither tag column nor primary key tag column bool sortPrimaryKey; bool igLastNull; + bool groupOrderScan; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -316,6 +318,7 @@ typedef struct SPhysiNode { struct SPhysiNode* pParent; SNode* pLimit; SNode* pSlimit; + bool forceCreateNonBlockingOptr; } SPhysiNode; typedef struct SScanPhysiNode { @@ -326,6 +329,7 @@ typedef struct SScanPhysiNode { uint64_t suid; int8_t tableType; SName tableName; + bool groupOrderScan; } SScanPhysiNode; typedef SScanPhysiNode STagScanPhysiNode; diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index be0ad1c239..176c4b53be 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -45,6 +45,8 @@ typedef struct SAggOperatorInfo { SGroupResInfo groupResInfo; SExprSupp scalarExprSup; bool groupKeyOptimized; + bool hasValidBlock; + SSDataBlock* pNewGroupBlock; } SAggOperatorInfo; static void destroyAggOperatorInfo(void* param); @@ -53,7 +55,6 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); -static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator); static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator); @@ -111,9 +112,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; - setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, - pTaskInfo); - pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, + setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, + !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { @@ -153,28 +154,42 @@ void destroyAggOperatorInfo(void* param) { taosMemoryFreeClear(param); } -// this is a blocking operator -int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { - if (OPTR_IS_OPENED(pOperator)) { - return TSDB_CODE_SUCCESS; - } - +/** + * @brief get blocks from downstream and fill results into groupedRes after aggragation + * @retval false if no more groups + * @retval true if there could have new groups coming + * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled + * if false, fill results of ONE GROUP + * */ +static bool nextGroupedResult(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; + if (pOperator->blocking && pAggInfo->hasValidBlock) return false; + SExprSupp* pSup = &pOperator->exprSupp; SOperatorInfo* downstream = pOperator->pDownstream[0]; - int64_t st = taosGetTimestampUs(); - int32_t code = TSDB_CODE_SUCCESS; - int32_t order = pAggInfo->binfo.inputTsOrder; - bool hasValidBlock = false; + int64_t st = taosGetTimestampUs(); + int32_t code = TSDB_CODE_SUCCESS; + int32_t order = pAggInfo->binfo.inputTsOrder; + SSDataBlock* pBlock = pAggInfo->pNewGroupBlock; + if (pBlock) { + pAggInfo->pNewGroupBlock = NULL; + tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable); + setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); + code = doAggregateImpl(pOperator, pSup->pCtx); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + } while (1) { bool blockAllocated = false; - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - if (!hasValidBlock) { + if (!pAggInfo->hasValidBlock) { createDataBlockForEmptyInput(pOperator, &pBlock); if (pBlock == NULL) { break; @@ -184,7 +199,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { break; } } - hasValidBlock = true; + pAggInfo->hasValidBlock = true; pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag; // there is an scalar expression that needs to be calculated before apply the group aggregation. @@ -196,7 +211,11 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } } - + // if non-blocking mode and new group arrived, save the block and break + if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) { + pAggInfo->pNewGroupBlock = pBlock; + break; + } // the pDataBlock are always the same one, no need to call this again setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); @@ -215,10 +234,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); - OPTR_SET_OPENED(pOperator); - - pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - return pTaskInfo->code; + return pBlock != NULL; } SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { @@ -230,26 +246,25 @@ SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { } SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pTaskInfo->code = pOperator->fpSet._openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - setOperatorCompleted(pOperator); - return NULL; - } + bool hasNewGroups = false; + do { + hasNewGroups = nextGroupedResult(pOperator); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - while (1) { - doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + while (1) { + doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - if (!hasRemainResults(&pAggInfo->groupResInfo)) { - setOperatorCompleted(pOperator); - break; - } + if (!hasRemainResults(&pAggInfo->groupResInfo)) { + if (!hasNewGroups) setOperatorCompleted(pOperator); + break; + } - if (pInfo->pRes->info.rows > 0) { - break; + if (pInfo->pRes->info.rows > 0) { + break; + } } - } + } while (pInfo->pRes->info.rows == 0 && hasNewGroups); size_t rows = blockDataGetNumOfRows(pInfo->pRes); pOperator->resultInfo.totalRows += rows; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 832750e967..e1bf4e7cb0 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -127,6 +127,10 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in if (pGroupResInfo->pRows != NULL) { taosArrayDestroy(pGroupResInfo->pRows); } + if (pGroupResInfo->pBuf) { + taosMemoryFree(pGroupResInfo->pBuf); + pGroupResInfo->pBuf = NULL; + } // extract the result rows information from the hash map int32_t size = tSimpleHashGetSize(pHashmap); @@ -2104,6 +2108,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* if (groupSort && groupByTbname) { taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); pTableListInfo->numOfOuputGroups = numOfTables; + } else if (groupByTbname && pScanNode->groupOrderScan){ + pTableListInfo->numOfOuputGroups = numOfTables; } else { pTableListInfo->numOfOuputGroups = 1; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 6e4dde4ec1..f5eacf0bd5 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -361,6 +361,7 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { COPY_SCALAR_FIELD(groupAction); COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(outputTsOrder); + COPY_SCALAR_FIELD(forceCreateNonBlockingOptr); return TSDB_CODE_SUCCESS; } @@ -397,6 +398,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_FIELD(pSubtable); COPY_SCALAR_FIELD(igLastNull); + COPY_SCALAR_FIELD(groupOrderScan); return TSDB_CODE_SUCCESS; } @@ -545,6 +547,7 @@ static int32_t physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) { CLONE_NODE_LIST_FIELD(pChildren); COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(outputTsOrder); + COPY_SCALAR_FIELD(forceCreateNonBlockingOptr); return TSDB_CODE_SUCCESS; } @@ -556,6 +559,7 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) { COPY_SCALAR_FIELD(suid); COPY_SCALAR_FIELD(tableType); COPY_OBJECT_FIELD(tableName, sizeof(SName)); + COPY_SCALAR_FIELD(groupOrderScan); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 81116a60b0..f25616065e 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1559,6 +1559,7 @@ static const char* jkScanPhysiPlanTableId = "TableId"; static const char* jkScanPhysiPlanSTableId = "STableId"; static const char* jkScanPhysiPlanTableType = "TableType"; static const char* jkScanPhysiPlanTableName = "TableName"; +static const char* jkScanPhysiPlanGroupOrderScan = "GroupOrderScan"; static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj; @@ -1582,6 +1583,9 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkScanPhysiPlanGroupOrderScan, pNode->groupOrderScan); + } return code; } @@ -1608,6 +1612,9 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkScanPhysiPlanGroupOrderScan, &pNode->groupOrderScan); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 1ca37defa4..20e829766d 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -1853,7 +1853,8 @@ enum { PHY_NODE_CODE_LIMIT, PHY_NODE_CODE_SLIMIT, PHY_NODE_CODE_INPUT_TS_ORDER, - PHY_NODE_CODE_OUTPUT_TS_ORDER + PHY_NODE_CODE_OUTPUT_TS_ORDER, + PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR }; static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -1878,6 +1879,9 @@ static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR, pNode->forceCreateNonBlockingOptr); + } return code; } @@ -1910,6 +1914,8 @@ static int32_t msgToPhysiNode(STlvDecoder* pDecoder, void* pObj) { case PHY_NODE_CODE_OUTPUT_TS_ORDER: code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder)); break; + case PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR: + code = tlvDecodeBool(pTlv, &pNode->forceCreateNonBlockingOptr); default: break; } @@ -1925,7 +1931,8 @@ enum { PHY_SCAN_CODE_BASE_UID, PHY_SCAN_CODE_BASE_SUID, PHY_SCAN_CODE_BASE_TABLE_TYPE, - PHY_SCAN_CODE_BASE_TABLE_NAME + PHY_SCAN_CODE_BASE_TABLE_NAME, + PHY_SCAN_CODE_BASE_GROUP_ORDER_SCAN }; static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -1950,6 +1957,9 @@ static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_SCAN_CODE_BASE_GROUP_ORDER_SCAN, pNode->groupOrderScan); + } return code; } @@ -1982,6 +1992,9 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SCAN_CODE_BASE_TABLE_NAME: code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName); break; + case PHY_SCAN_CODE_BASE_GROUP_ORDER_SCAN: + code = tlvDecodeBool(pTlv, &pNode->groupOrderScan); + break; default: break; } diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index 82abc5d1a9..092fe17411 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -43,6 +43,9 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan); int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList); +bool isPartTableAgg(SAggLogicNode* pAgg); +bool isPartTableWinodw(SWindowLogicNode* pWindow); + #ifdef __cplusplus } #endif diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 05f478b116..32721d8060 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -363,6 +363,18 @@ static void scanPathOptSetScanOrder(EScanOrder scanOrder, SScanLogicNode* pScan) } } +static void scanPathOptSetGroupOrderScan(SScanLogicNode* pScan) { + if (pScan->tableType != TSDB_SUPER_TABLE) return; + + if (pScan->node.pParent && nodeType(pScan->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) { + SAggLogicNode* pAgg = (SAggLogicNode*)pScan->node.pParent; + bool withSlimit = pAgg->node.pSlimit != NULL || (pAgg->node.pParent && pAgg->node.pParent->pSlimit); + if (withSlimit && isPartTableAgg(pAgg)) { + pScan->groupOrderScan = pAgg->node.forceCreateNonBlockingOptr = true; + } + } +} + static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SOsdInfo info = {.scanOrder = SCAN_ORDER_ASC}; int32_t code = scanPathOptMatch(pCxt, pLogicSubplan->pNode, &info); @@ -371,6 +383,7 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub if (!pCxt->pPlanCxt->streamQuery) { scanPathOptSetScanOrder(info.scanOrder, info.pScan); } + scanPathOptSetGroupOrderScan(info.pScan); } if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) { info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs); @@ -1675,6 +1688,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub if (TSDB_CODE_SUCCESS == code) { if (QUERY_NODE_LOGIC_PLAN_AGG == pNode->pParent->type) { SAggLogicNode* pParent = (SAggLogicNode*)(pNode->pParent); + scanPathOptSetGroupOrderScan(pScan); pParent->hasGroupKeyOptimized = true; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index b3d94a5e47..1b92dcd2e7 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -447,6 +447,7 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS pScanPhysiNode->uid = pScanLogicNode->tableId; pScanPhysiNode->suid = pScanLogicNode->stableId; pScanPhysiNode->tableType = pScanLogicNode->tableType; + pScanPhysiNode->groupOrderScan = pScanLogicNode->groupOrderScan; memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName)); if (NULL != pScanLogicNode->pTagCond) { pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond); @@ -880,6 +881,7 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true); pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized; + pAgg->node.forceCreateNonBlockingOptr = pAggLogicNode->node.forceCreateNonBlockingOptr; SNodeList* pPrecalcExprs = NULL; SNodeList* pGroupKeys = NULL; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 246ee13fb0..84a486649e 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -306,54 +306,6 @@ static bool stbSplIsTableCountQuery(SLogicNode* pNode) { return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType; } -static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { - return ((SScanLogicNode*)pNode)->pGroupTags; - } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { - return ((SPartitionLogicNode*)pNode)->pPartitionKeys; - } else { - return NULL; - } -} - -static bool stbSplHasPartTbname(SNodeList* pPartKeys) { - if (NULL == pPartKeys) { - return false; - } - SNode* pPartKey = NULL; - FOREACH(pPartKey, pPartKeys) { - if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) { - pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0); - } - if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) || - (QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) { - return true; - } - } - return false; -} - -static bool stbSplNotSystemScan(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { - return SCAN_TYPE_SYSTEM_TABLE != ((SScanLogicNode*)pNode)->scanType; - } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { - return stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0)); - } else { - return true; - } -} - -static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) { - if (1 != LIST_LENGTH(pAgg->node.pChildren)) { - return false; - } - if (NULL != pAgg->pGroupKeys) { - return stbSplHasPartTbname(pAgg->pGroupKeys) && - stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)); - } - return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); -} - static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: @@ -364,7 +316,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_AGG: return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) || - stbSplIsPartTableAgg((SAggLogicNode*)pNode)) && + isPartTableAgg((SAggLogicNode*)pNode)) && stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: return stbSplNeedSplitWindow(streamQuery, pNode); @@ -778,10 +730,6 @@ static int32_t stbSplSplitEvent(SSplitContext* pCxt, SStableSplitInfo* pInfo) { } } -static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) { - return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); -} - static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) { case WINDOW_TYPE_INTERVAL: @@ -834,7 +782,7 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn } static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - if (stbSplIsPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) { + if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) { return stbSplSplitWindowForPartTable(pCxt, pInfo); } else { return stbSplSplitWindowForCrossTable(pCxt, pInfo); @@ -920,7 +868,7 @@ static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplit } static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - if (stbSplIsPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) { + if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) { return stbSplSplitAggNodeForPartTable(pCxt, pInfo); } return stbSplSplitAggNodeForCrossTable(pCxt, pInfo); diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 29e87b34ce..88086cde1d 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -321,3 +321,57 @@ int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requir } return code; } + +static bool stbNotSystemScan(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + return SCAN_TYPE_SYSTEM_TABLE != ((SScanLogicNode*)pNode)->scanType; + } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { + return stbNotSystemScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0)); + } else { + return true; + } +} + +static bool stbHasPartTbname(SNodeList* pPartKeys) { + if (NULL == pPartKeys) { + return false; + } + SNode* pPartKey = NULL; + FOREACH(pPartKey, pPartKeys) { + if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) { + pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0); + } + if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) || + (QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) { + return true; + } + } + return false; +} + +static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + return ((SScanLogicNode*)pNode)->pGroupTags; + } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { + return ((SPartitionLogicNode*)pNode)->pPartitionKeys; + } else { + return NULL; + } +} + +bool isPartTableAgg(SAggLogicNode* pAgg) { + if (1 != LIST_LENGTH(pAgg->node.pChildren)) { + return false; + } + if (NULL != pAgg->pGroupKeys) { + return stbHasPartTbname(pAgg->pGroupKeys) && + stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)); + } + return stbHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); +} + +bool isPartTableWinodw(SWindowLogicNode* pWindow) { + return stbHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); +} + + diff --git a/tests/system-test/2-query/columnLenUpdated.py b/tests/system-test/2-query/columnLenUpdated.py index e43b32a716..93d9a492f9 100644 --- a/tests/system-test/2-query/columnLenUpdated.py +++ b/tests/system-test/2-query/columnLenUpdated.py @@ -202,7 +202,7 @@ class TDTestCase: if retCode != "TAOS_OK": tdLog.exit("taos -s fail") - tdSql.query("select count(*) from stb group by tg1") + tdSql.query("select count(*) from stb group by tg1 order by count(*) desc") tdSql.checkData(0, 0, 2) tdSql.checkData(1, 0, 1) diff --git a/tests/system-test/5-taos-tools/taosbenchmark/insertMix.py b/tests/system-test/5-taos-tools/taosbenchmark/insertMix.py index 60daa8cdc2..b4046b8c98 100644 --- a/tests/system-test/5-taos-tools/taosbenchmark/insertMix.py +++ b/tests/system-test/5-taos-tools/taosbenchmark/insertMix.py @@ -79,6 +79,11 @@ class TDTestCase: tdSql.query("select count(*) from (select * from meters order by ts desc)") tdSql.checkData(0, 0, allCnt) + rowCnt = tdSql.query("select tbname, count(*) from meters partition by tbname slimit 11") + if rowCnt != 10: + tdLog.exit("partition by tbname should return 10 rows of table data which is " + str(rowCnt)) + return + def run(self): binPath = self.getPath() -- GitLab