提交 81d795c2 编写于 作者: H Haojun Liao

fix(query): enable limit in grouped scan.

上级 8d2de8d7
......@@ -352,7 +352,7 @@ typedef struct STableMergeScanInfo {
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
SScanInfo scanInfo;
......@@ -369,6 +369,7 @@ typedef struct STableMergeScanInfo {
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval interval;
......
......@@ -355,6 +355,33 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo
}
}
static void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
SOperatorInfo* pOperator) {
SLimit* pLimit = &pLimitInfo->limit;
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
pBlock->info.rows = 0;
qDebug("current block ignore due to offset, current:%"PRId64", %s", pLimitInfo->remainOffset, GET_TASKID(pTaskInfo));
} else {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0;
}
}
if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
// limit the output rows
int32_t overflowRows = pLimitInfo->numOfOutputRows + pBlock->info.rows - pLimit->limit;
int32_t keep = pBlock->info.rows - overflowRows;
blockDataKeepFirstNRows(pBlock, keep);
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
setTaskStatus(pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE;
}
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -467,29 +494,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
}
SLimit* pLimit = &pInfo->limitInfo.limit;
if (pLimit->offset > 0 && pInfo->limitInfo.remainOffset > 0) {
if (pInfo->limitInfo.remainOffset >= pBlock->info.rows) {
pInfo->limitInfo.remainOffset -= pBlock->info.rows;
pBlock->info.rows = 0;
qDebug("current block ignore due to offset, current:%"PRId64", %s", pInfo->limitInfo.remainOffset, GET_TASKID(pTaskInfo));
} else {
blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset);
pInfo->limitInfo.remainOffset = 0;
}
}
if (pLimit->limit != -1 && pLimit->limit <= (pInfo->limitInfo.numOfOutputRows + pBlock->info.rows)) {
// limit the output rows
int32_t overflowRows = pInfo->limitInfo.numOfOutputRows + pBlock->info.rows - pLimit->limit;
int32_t keep = pBlock->info.rows - overflowRows;
blockDataKeepFirstNRows(pBlock, keep);
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
setTaskStatus(pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE;
}
applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo, pOperator);
pCost->totalRows += pBlock->info.rows;
pInfo->limitInfo.numOfOutputRows = pCost->totalRows;
......@@ -4468,6 +4473,9 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
}
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator);
pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
}
......@@ -4483,6 +4491,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
......@@ -4495,6 +4504,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
}
SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) {
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
......@@ -4582,6 +4592,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
if (pTableScanNode->pGroupTags) {
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
}
......@@ -4619,6 +4630,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
......
......@@ -39,9 +39,9 @@ endi
if $data01 != 1 then
return -1
endi
if $data41 != 5 then
return -1
endi
#if $data41 != 5 then
# return -1
#endi
sql select * from $stb order by ts desc limit 5
if $rows != 5 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册