diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index a471de3147a999afbec8346afb08ebc833699b12..992b99df60608c04337f8351987361ebfb89e7ce 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -139,10 +139,10 @@ bool fmIsSpecialDataRequiredFunc(int32_t funcId); bool fmIsDynamicScanOptimizedFunc(int32_t funcId); typedef enum EFuncDataRequired { - FUNC_DATA_REQUIRED_ALL_NEEDED = 1, - FUNC_DATA_REQUIRED_STATIS_NEEDED, - FUNC_DATA_REQUIRED_NO_NEEDED, - FUNC_DATA_REQUIRED_DISCARD + FUNC_DATA_REQUIRED_DATA_LOAD = 1, + FUNC_DATA_REQUIRED_STATIS_LOAD, + FUNC_DATA_REQUIRED_NOT_LOAD, + FUNC_DATA_REQUIRED_FILTEROUT, } EFuncDataRequired; EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 8d88b94d6c717e783263eae0d8d617fa17cb9a53..1f3c003dfb1fe03ac54f8320f02eedf557b515b6 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -42,10 +42,6 @@ #define curTimeWindowIndex(_winres) ((_winres)->curIndex) -struct SColumnFilterElem; - -typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type); - typedef struct SGroupResInfo { int32_t totalGroup; int32_t currentGroup; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 78bc6947cd9cedbcd44800b1445756e92ca83aea..ea2939055edcfe427389b55428929b5b6d8222a8 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -38,8 +38,6 @@ extern "C" { #include "tpagedbuf.h" #include "tmsg.h" -struct SColumnFilterElem; - typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) @@ -225,8 +223,6 @@ typedef struct STaskRuntimeEnv { void* qinfo; uint8_t scanFlag; // denotes reversed scan of data or not void* pTsdbReadHandle; - - int32_t prevGroupId; // previous executed group id bool enableGroupData; SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result @@ -241,8 +237,6 @@ typedef struct STaskRuntimeEnv { char* tagVal; // tag value of current data block struct SScalarFunctionSupport* scalarSup; - - SSDataBlock* outputBuf; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo* proot; SGroupResInfo groupResInfo; @@ -250,7 +244,6 @@ typedef struct STaskRuntimeEnv { STableQueryInfo* current; SResultInfo resultInfo; - SHashObj* pTableRetrieveTsMap; struct SUdfInfo* pUdfInfo; } STaskRuntimeEnv; @@ -350,6 +343,7 @@ typedef struct STableScanInfo { int64_t elapsedTime; int32_t prevGroupId; // previous table group id int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; } STableScanInfo; typedef struct STagScanInfo { @@ -616,7 +610,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c911aef45f3fc9deeae30d4432ffdbbfd114b776..cfa17ecd3c2d0366233eeb313a6ede6149be4927 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -271,7 +271,7 @@ static int compareRowData(const void* a, const void* b, const void* userData) { } // setup the output buffer for each operator -SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { +SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { int32_t numOfCols = LIST_LENGTH(pNode->pSlots); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); @@ -6518,8 +6518,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) { SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); - // pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); - size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); assert(numOfGroup == 0 || numOfGroup == 1); @@ -6747,13 +6745,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); - SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); - return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, + return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, dataLoadFlag, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; - SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. @@ -6761,7 +6759,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); SArray* tableIdList = extractTableIdList(pTableGroupInfo); - SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); int32_t numOfCols = 0; SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); @@ -6770,7 +6768,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; - SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan; SArray* colList = extractScanColumnId(pScanNode->pScanCols); @@ -6799,14 +6797,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SExprInfo* pScalarExprInfo = NULL; int32_t numOfScalarExpr = 0; @@ -6824,7 +6822,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SInterval interval = { .interval = pIntervalPhyNode->interval, @@ -6840,7 +6838,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets); SArray* slotMap = createIndexMap(pSortPhyNode->pTargets); pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); @@ -6848,12 +6846,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); @@ -6861,11 +6859,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode; - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 901bfde6a4ae679bf04c365d9cb68fcdb0872c19..a2e68e34f1ee0dd3e95626b2e42d9b961fef408a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -64,16 +64,19 @@ static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) { #endif } -int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { +int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableScanInfo* pInfo = pOperator->info; + STaskCostInfo* pCost = &pTaskInfo->cost; pCost->totalBlocks += 1; - pCost->totalRows += pBlock->info.rows; + pCost->loadBlocks += 1; + pCost->totalRows += pBlock->info.rows; pCost->totalCheckedRows += pBlock->info.rows; - pCost->loadBlocks += 1; - *status = BLK_DATA_DATA_LOAD; + *status = pInfo->dataBlockLoadFlag; SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); if (pCols == NULL) { @@ -217,7 +220,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { return p; } -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); @@ -232,6 +235,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, return NULL; } + pInfo->dataBlockLoadFlag= dataLoadFlag; pInfo->pResBlock = pResBlock; pInfo->pFilterNode = pCondition; pInfo->dataReader = pTsdbReadHandle; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 50c65439c0d1c29d6018dc691f74641381f20935..71e8bcc84221e5b0dba155f859951c2718a197ad 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -58,9 +58,9 @@ void functionFinalize(SqlFunctionCtx *pCtx) { EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) { - return FUNC_DATA_REQUIRED_NO_NEEDED; + return FUNC_DATA_REQUIRED_NOT_LOAD; } - return FUNC_DATA_REQUIRED_STATIS_NEEDED; + return FUNC_DATA_REQUIRED_STATIS_LOAD; } bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index aec75a0365ef42c8a23b021af4050d18fd6246c6..d0af3487ce39f948386586d386b6ba124d1935d7 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -78,10 +78,10 @@ int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) { - return FUNC_DATA_REQUIRED_ALL_NEEDED; + return FUNC_DATA_REQUIRED_DATA_LOAD; } if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) { - return FUNC_DATA_REQUIRED_ALL_NEEDED; + return FUNC_DATA_REQUIRED_DATA_LOAD; } return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 3c60b62434cc0e202771883cc8dc0d52249410be..fd0d2758bf9436700b1ac19933b592d3f9bf93e7 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -200,7 +200,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect strcpy(pScan->tableName.tname, pRealTable->table.tableName); pScan->showRewrite = pCxt->pPlanCxt->showRewrite; pScan->ratio = pRealTable->ratio; - pScan->dataRequired = FUNC_DATA_REQUIRED_ALL_NEEDED; + pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; // set columns to scan SNodeList* pCols = NULL; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 19e6718fe8236149d3717f0e0f1344d83160c800..a8513bf3764573b6b6a9916c74ef9a687629a14c 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -125,12 +125,12 @@ static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l , EFuncDataRequired r) { switch (l) { - case FUNC_DATA_REQUIRED_ALL_NEEDED: + case FUNC_DATA_REQUIRED_DATA_LOAD: return l; - case FUNC_DATA_REQUIRED_STATIS_NEEDED: - return FUNC_DATA_REQUIRED_ALL_NEEDED == r ? r : l; - case FUNC_DATA_REQUIRED_NO_NEEDED: - return FUNC_DATA_REQUIRED_DISCARD == r ? l : r; + case FUNC_DATA_REQUIRED_STATIS_LOAD: + return FUNC_DATA_REQUIRED_DATA_LOAD == r ? r : l; + case FUNC_DATA_REQUIRED_NOT_LOAD: + return FUNC_DATA_REQUIRED_FILTEROUT == r ? l : r; default: break; } @@ -139,9 +139,9 @@ static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l , EFuncDataR static int32_t osdGetDataRequired(SNodeList* pFuncs) { if (NULL == pFuncs) { - return FUNC_DATA_REQUIRED_ALL_NEEDED; + return FUNC_DATA_REQUIRED_DATA_LOAD; } - EFuncDataRequired dataRequired = FUNC_DATA_REQUIRED_DISCARD; + EFuncDataRequired dataRequired = FUNC_DATA_REQUIRED_FILTEROUT; SNode* pFunc = NULL; FOREACH(pFunc, pFuncs) { dataRequired = osdPromoteDataRequired(dataRequired, fmFuncDataRequired((SFunctionNode*)pFunc, NULL));