未验证 提交 3fad3b68 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #11537 from taosdata/feature/3.0_liaohj

enh(query): add the file data block load optimization check.
......@@ -139,10 +139,10 @@ bool fmIsSpecialDataRequiredFunc(int32_t funcId);
bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
typedef enum EFuncDataRequired {
FUNC_DATA_REQUIRED_ALL_NEEDED = 1,
FUNC_DATA_REQUIRED_STATIS_NEEDED,
FUNC_DATA_REQUIRED_NO_NEEDED,
FUNC_DATA_REQUIRED_DISCARD
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
FUNC_DATA_REQUIRED_STATIS_LOAD,
FUNC_DATA_REQUIRED_NOT_LOAD,
FUNC_DATA_REQUIRED_FILTEROUT,
} EFuncDataRequired;
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
......
......@@ -42,10 +42,6 @@
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
typedef struct SGroupResInfo {
int32_t totalGroup;
int32_t currentGroup;
......
......@@ -38,8 +38,6 @@ extern "C" {
#include "tpagedbuf.h"
#include "tmsg.h"
struct SColumnFilterElem;
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
......@@ -225,8 +223,6 @@ typedef struct STaskRuntimeEnv {
void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not
void* pTsdbReadHandle;
int32_t prevGroupId; // previous executed group id
bool enableGroupData;
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
......@@ -241,8 +237,6 @@ typedef struct STaskRuntimeEnv {
char* tagVal; // tag value of current data block
struct SScalarFunctionSupport* scalarSup;
SSDataBlock* outputBuf;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo* proot;
SGroupResInfo groupResInfo;
......@@ -250,7 +244,6 @@ typedef struct STaskRuntimeEnv {
STableQueryInfo* current;
SResultInfo resultInfo;
SHashObj* pTableRetrieveTsMap;
struct SUdfInfo* pUdfInfo;
} STaskRuntimeEnv;
......@@ -350,6 +343,7 @@ typedef struct STableScanInfo {
int64_t elapsedTime;
int32_t prevGroupId; // previous table group id
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
} STableScanInfo;
typedef struct STagScanInfo {
......@@ -616,7 +610,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
......
......@@ -271,7 +271,7 @@ static int compareRowData(const void* a, const void* b, const void* userData) {
}
// setup the output buffer for each operator
SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
......@@ -6518,8 +6518,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1);
......@@ -6739,21 +6737,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t type = nodeType(pPhyNode);
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode* ) pPhyNode;
int32_t numOfCols = 0;
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
if (pDataReader == NULL) {
return NULL;
}
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pTableScanNode->dataRequired, pScanPhyNode->count,
pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
......@@ -6761,7 +6760,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
......@@ -6770,7 +6769,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
......@@ -6799,14 +6798,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pScalarExprInfo = NULL;
int32_t numOfScalarExpr = 0;
......@@ -6824,7 +6823,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {
.interval = pIntervalPhyNode->interval,
......@@ -6840,7 +6839,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo);
......@@ -6848,12 +6847,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
......@@ -6861,11 +6860,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
......
......@@ -64,16 +64,19 @@ static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#endif
}
int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableScanInfo* pInfo = pOperator->info;
STaskCostInfo* pCost = &pTaskInfo->cost;
pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows;
pCost->loadBlocks += 1;
pCost->totalRows += pBlock->info.rows;
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
*status = BLK_DATA_DATA_LOAD;
*status = pInfo->dataBlockLoadFlag;
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
if (pCols == NULL) {
......@@ -139,7 +142,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
// this function never returns error?
uint32_t status = BLK_DATA_DATA_LOAD;
int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status);
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
......@@ -217,7 +220,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return p;
}
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag,
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock,
SNode* pCondition, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0);
......@@ -232,6 +235,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
return NULL;
}
pInfo->dataBlockLoadFlag= dataLoadFlag;
pInfo->pResBlock = pResBlock;
pInfo->pFilterNode = pCondition;
pInfo->dataReader = pTsdbReadHandle;
......
......@@ -58,9 +58,9 @@ void functionFinalize(SqlFunctionCtx *pCtx) {
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
return FUNC_DATA_REQUIRED_NO_NEEDED;
return FUNC_DATA_REQUIRED_NOT_LOAD;
}
return FUNC_DATA_REQUIRED_STATIS_NEEDED;
return FUNC_DATA_REQUIRED_STATIS_LOAD;
}
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
......
......@@ -78,10 +78,10 @@ int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
return FUNC_DATA_REQUIRED_ALL_NEEDED;
return FUNC_DATA_REQUIRED_DATA_LOAD;
}
if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
return FUNC_DATA_REQUIRED_ALL_NEEDED;
return FUNC_DATA_REQUIRED_DATA_LOAD;
}
return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
}
......
......@@ -200,7 +200,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
strcpy(pScan->tableName.tname, pRealTable->table.tableName);
pScan->showRewrite = pCxt->pPlanCxt->showRewrite;
pScan->ratio = pRealTable->ratio;
pScan->dataRequired = FUNC_DATA_REQUIRED_ALL_NEEDED;
pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
// set columns to scan
SNodeList* pCols = NULL;
......
......@@ -125,12 +125,12 @@ static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo
static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l , EFuncDataRequired r) {
switch (l) {
case FUNC_DATA_REQUIRED_ALL_NEEDED:
case FUNC_DATA_REQUIRED_DATA_LOAD:
return l;
case FUNC_DATA_REQUIRED_STATIS_NEEDED:
return FUNC_DATA_REQUIRED_ALL_NEEDED == r ? r : l;
case FUNC_DATA_REQUIRED_NO_NEEDED:
return FUNC_DATA_REQUIRED_DISCARD == r ? l : r;
case FUNC_DATA_REQUIRED_STATIS_LOAD:
return FUNC_DATA_REQUIRED_DATA_LOAD == r ? r : l;
case FUNC_DATA_REQUIRED_NOT_LOAD:
return FUNC_DATA_REQUIRED_FILTEROUT == r ? l : r;
default:
break;
}
......@@ -139,9 +139,9 @@ static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l , EFuncDataR
static int32_t osdGetDataRequired(SNodeList* pFuncs) {
if (NULL == pFuncs) {
return FUNC_DATA_REQUIRED_ALL_NEEDED;
return FUNC_DATA_REQUIRED_DATA_LOAD;
}
EFuncDataRequired dataRequired = FUNC_DATA_REQUIRED_DISCARD;
EFuncDataRequired dataRequired = FUNC_DATA_REQUIRED_FILTEROUT;
SNode* pFunc = NULL;
FOREACH(pFunc, pFuncs) {
dataRequired = osdPromoteDataRequired(dataRequired, fmFuncDataRequired((SFunctionNode*)pFunc, NULL));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册