提交 8cf3339b 编写于 作者: H Haojun Liao

enh(query): enable the limit clause to be push down to the table scan operator.

上级 4b15cd5e
......@@ -316,27 +316,25 @@ typedef struct {
} SAggOptrPushDownInfo;
typedef struct STableScanInfo {
STsdbReader* dataReader;
SReadHandle readHandle;
STsdbReader* dataReader;
SReadHandle readHandle;
SLimitInfo limitInfo;
SFileBlockLoadRecorder readRecorder;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SSDataBlock* pResBlock;
SColMatchInfo matchInfo;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t noTable;
SAggOptrPushDownInfo pdInfo;
int8_t assignBlockUid;
SSDataBlock* pResBlock;
SColMatchInfo matchInfo;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
SAggOptrPushDownInfo pdInfo;
int8_t assignBlockUid;
} STableScanInfo;
typedef struct STableMergeScanInfo {
......
......@@ -210,8 +210,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->status = OP_OPENED;
}
qDebug("enter project");
if (pOperator->status == OP_EXEC_DONE) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pOperator->status = OP_OPENED;
......
......@@ -364,6 +364,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows;
bool loadSMA = false;
*status = pInfo->dataBlockLoadFlag;
......@@ -379,6 +380,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
pCost->totalRows += pBlock->info.rows;
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
......@@ -447,6 +449,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
if (pTableScanInfo->pFilterNode != NULL) {
// restore the previous value
pCost->totalRows -= pBlock->info.rows;
int64_t st = taosGetTimestampUs();
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo);
......@@ -460,6 +466,22 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
} else {
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
}
pCost->totalRows += pBlock->info.rows;
}
pInfo->limitInfo.numOfOutputRows = pCost->totalRows;
SLimit* pLimit = &pInfo->limitInfo.limit;
if (pLimit->limit != -1 && pLimit->limit < pInfo->limitInfo.numOfOutputRows && pBlock->info.rows > 0) {
// limit the output rows
int32_t overflowRows = pInfo->limitInfo.numOfOutputRows - pLimit->limit;
int32_t keep = pBlock->info.rows - overflowRows;
blockDataKeepFirstNRows(pBlock, keep);
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
setTaskStatus(pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE;
}
return TSDB_CODE_SUCCESS;
......@@ -691,10 +713,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if scan table by table
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
if (pInfo->noTable) {
return NULL;
}
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
while (1) {
......@@ -727,7 +745,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbReaderClose(pInfo->dataReader);
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
......@@ -749,9 +766,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL;
}
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
// tsdbSetTableList(pInfo->dataReader, tableList);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->scanTimes = 0;
......@@ -798,9 +812,15 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
}
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
int32_t numOfCols = 0;
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
&pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
......@@ -825,6 +845,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
if (pInfo->pFilterNode != NULL) {
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_OUT_OF_MEMORY) {
goto _error;
}
}
pInfo->scanFlag = MAIN_SCAN;
......
......@@ -2498,7 +2498,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
{.pName = "TagScan", .optimizeFunc = tagScanOptimize},
// {.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize}
{.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize}
};
// clang-format on
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册