未验证 提交 61ca9527 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #12098 from taosdata/feature/3.0_liaohj

refactor: do some internal refactor.
......@@ -119,6 +119,17 @@ typedef struct SLimit {
int64_t offset;
} SLimit;
typedef struct SFileBlockLoadRecorder {
uint64_t totalRows;
uint64_t totalCheckedRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockStatis;
uint32_t skipBlocks;
uint32_t filterOutBlocks;
uint64_t elapsedTime;
} SFileBlockLoadRecorder;
typedef struct STaskCostInfo {
int64_t created;
int64_t start;
......@@ -132,14 +143,10 @@ typedef struct STaskCostInfo {
uint64_t loadDataInCacheSize;
uint64_t loadDataTime;
uint64_t totalRows;
uint64_t totalCheckedRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockStatis;
uint32_t skipBlocks;
uint32_t filterOutBlocks;
SFileBlockLoadRecorder* pRecoder;
uint64_t elapsedTime;
uint64_t firstStageMergeTime;
uint64_t winInfoSize;
uint64_t tableInfoSize;
......@@ -268,7 +275,7 @@ typedef struct SOperatorFpSet {
typedef struct SOperatorInfo {
uint8_t operatorType;
bool blockingOptr; // block operator or not
bool blocking; // block operator or not
uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results
char* name; // name, used to show the query execution plan
......@@ -333,17 +340,14 @@ typedef struct SScanInfo {
typedef struct STableScanInfo {
void* dataReader;
int32_t numOfBlocks; // extract basic running information.
int32_t numOfSkipped;
int32_t numOfBlockStatis;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
int64_t elapsedTime;
int32_t prevGroupId; // previous table group id
// int32_t prevGroupId; // previous table group id
SScanInfo scanInfo;
int32_t current;
SNode* pFilterNode; // filter operator info
SqlFunctionCtx* pCtx; // next operator query context
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset;
SExprInfo* pExpr;
......@@ -397,7 +401,6 @@ typedef struct SSysTableScanInfo {
SArray* scanCols; // SArray<int16_t> scan column id list
SName name;
SSDataBlock* pRes;
int32_t capacity;
int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo;
} SSysTableScanInfo;
......
......@@ -132,6 +132,7 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator) {
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -1592,8 +1593,8 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
STaskCostInfo* pCost = &pTaskInfo->cost;
pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows;
// pCost->totalBlocks += 1;
// pCost->totalRows += pBlock->info.rows;
#if 0
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
......@@ -2411,12 +2412,13 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
//
// calculateOperatorProfResults(pQInfo);
qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks,
pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
//
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
if (pSummary->pRecoder != NULL) {
qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 " us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks,
pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
}
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
......@@ -3282,7 +3284,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blockingOptr = false;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pBlock->info.numOfCols;
......@@ -3673,7 +3675,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->name = "SortedMerge";
// pOperator->operatorType = OP_SortedMerge;
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = num;
......@@ -3756,7 +3758,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
......@@ -4419,7 +4421,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
......@@ -4532,7 +4534,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blockingOptr = false;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
......@@ -4615,7 +4617,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
}
pOperator->name = "FillOperator";
pOperator->blockingOptr = false;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->pExpr = pExpr;
......@@ -4861,15 +4863,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
SInterval interval = extractIntervalInfo(pTableScanNode);
return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired,
SOperatorInfo* pOperator = createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired,
pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions,
&interval, pTableScanNode->ratio, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
......@@ -4877,8 +4883,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
......@@ -5628,7 +5633,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
pOperator->name = "JoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
pOperator->blockingOptr = false;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
......
......@@ -353,7 +353,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExprInfo;
......@@ -612,7 +612,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pOperator->name = "PartitionOperator";
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResultBlock;
......
......@@ -1078,7 +1078,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -1137,7 +1137,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
pOperator->name = "StreamTimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -1343,7 +1343,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->name = "TimeSliceOperator";
// pOperator->operatorType = OP_AllTimeWindow;
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
......@@ -1385,7 +1385,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pInfo->tsSlotId = tsSlotId;
pOperator->name = "StateWindowOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols;
......@@ -1437,7 +1437,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册