未验证 提交 c5c5b5aa 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #17662 from taosdata/feature/3_liaohj

enh(query): enable the limit clause to be push down
...@@ -319,27 +319,25 @@ typedef struct { ...@@ -319,27 +319,25 @@ typedef struct {
} SAggOptrPushDownInfo; } SAggOptrPushDownInfo;
typedef struct STableScanInfo { typedef struct STableScanInfo {
STsdbReader* dataReader; STsdbReader* dataReader;
SReadHandle readHandle; SReadHandle readHandle;
SLimitInfo limitInfo;
SFileBlockLoadRecorder readRecorder; SFileBlockLoadRecorder readRecorder;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer SNode* pFilterNode; // filter info, which is push down by optimizer
SSDataBlock* pResBlock;
SSDataBlock* pResBlock; SColMatchInfo matchInfo;
SColMatchInfo matchInfo; SExprSupp pseudoSup;
SExprSupp pseudoSup; SQueryTableDataCond cond;
SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag;
int32_t dataBlockLoadFlag; SSampleExecInfo sample; // sample execution info
SSampleExecInfo sample; // sample execution info int32_t currentGroupId;
int32_t currentGroupId; int32_t currentTable;
int32_t currentTable; int8_t scanMode;
int8_t scanMode; SAggOptrPushDownInfo pdInfo;
int8_t noTable; int8_t assignBlockUid;
SAggOptrPushDownInfo pdInfo;
int8_t assignBlockUid;
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
...@@ -357,7 +355,7 @@ typedef struct STableMergeScanInfo { ...@@ -357,7 +355,7 @@ typedef struct STableMergeScanInfo {
SSDataBlock* pSortInputBlock; SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
SArray* sortSourceParams; SArray* sortSourceParams;
SLimitInfo limitInfo;
SFileBlockLoadRecorder readRecorder; SFileBlockLoadRecorder readRecorder;
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
...@@ -374,6 +372,7 @@ typedef struct STableMergeScanInfo { ...@@ -374,6 +372,7 @@ typedef struct STableMergeScanInfo {
SQueryTableDataCond cond; SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time // if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded. // window to check if current data block needs to be loaded.
SInterval interval; SInterval interval;
...@@ -903,6 +902,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG ...@@ -903,6 +902,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
......
...@@ -210,8 +210,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -210,8 +210,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
} }
qDebug("enter project");
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
......
...@@ -355,6 +355,34 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo ...@@ -355,6 +355,34 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo
} }
} }
// todo handle the slimit info
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
SLimit* pLimit = &pLimitInfo->limit;
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
pBlock->info.rows = 0;
qDebug("current block ignore due to offset, current:%"PRId64", %s", pLimitInfo->remainOffset, GET_TASKID(pTaskInfo));
} else {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0;
}
}
if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
// limit the output rows
int32_t overflowRows = pLimitInfo->numOfOutputRows + pBlock->info.rows - pLimit->limit;
int32_t keep = pBlock->info.rows - overflowRows;
blockDataKeepFirstNRows(pBlock, keep);
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE;
}
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) { uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -364,6 +392,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -364,6 +392,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
pCost->totalBlocks += 1; pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows; pCost->totalRows += pBlock->info.rows;
bool loadSMA = false; bool loadSMA = false;
*status = pInfo->dataBlockLoadFlag; *status = pInfo->dataBlockLoadFlag;
...@@ -379,6 +408,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -379,6 +408,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1; pCost->filterOutBlocks += 1;
pCost->totalRows += pBlock->info.rows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
...@@ -446,6 +476,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -446,6 +476,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
// restore the previous value
pCost->totalRows -= pBlock->info.rows;
if (pTableScanInfo->pFilterNode != NULL) { if (pTableScanInfo->pFilterNode != NULL) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo); doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo);
...@@ -462,6 +495,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -462,6 +495,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
} }
} }
applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo, pOperator);
pCost->totalRows += pBlock->info.rows;
pInfo->limitInfo.numOfOutputRows = pCost->totalRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -691,10 +728,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -691,10 +728,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if scan table by table // if scan table by table
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
if (pInfo->noTable) {
return NULL;
}
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
while (1) { while (1) {
...@@ -727,7 +760,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -727,7 +760,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} }
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbReaderClose(pInfo->dataReader); tsdbReaderClose(pInfo->dataReader);
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
...@@ -749,9 +781,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -749,9 +781,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
// tsdbSetTableList(pInfo->dataReader, tableList);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond); tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
...@@ -798,9 +827,15 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -798,9 +827,15 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
int32_t numOfCols = 0;
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
&pInfo->matchInfo); &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -825,6 +860,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -825,6 +860,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
if (pInfo->pFilterNode != NULL) { if (pInfo->pFilterNode != NULL) {
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} }
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
...@@ -847,10 +885,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -847,10 +885,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
return pOperator; return pOperator;
_error: _error:
taosMemoryFreeClear(pInfo); if (pInfo != NULL) {
taosMemoryFreeClear(pOperator); destroyTableScanOperatorInfo(pInfo);
}
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL; return NULL;
} }
...@@ -4439,6 +4479,9 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* ...@@ -4439,6 +4479,9 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
} }
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows); qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator);
pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;
return (pResBlock->info.rows > 0) ? pResBlock : NULL; return (pResBlock->info.rows > 0) ? pResBlock : NULL;
} }
...@@ -4454,6 +4497,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4454,6 +4497,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
if (!pInfo->hasGroupId) { if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true; pInfo->hasGroupId = true;
...@@ -4466,6 +4510,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4466,6 +4510,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
} }
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) { while (pInfo->tableStartIndex < tableListSize) {
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
...@@ -4553,6 +4598,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -4553,6 +4598,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
if (pTableScanNode->pGroupTags) { if (pTableScanNode->pGroupTags) {
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid); taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
} }
...@@ -4591,6 +4637,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -4591,6 +4637,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order); pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
......
...@@ -531,7 +531,7 @@ typedef struct SMultiwayMergeOperatorInfo { ...@@ -531,7 +531,7 @@ typedef struct SMultiwayMergeOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t bufPageSize; int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SLimitInfo limitInfo;
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
...@@ -592,6 +592,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -592,6 +592,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
_retry:
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = NULL;
if (pInfo->groupSort) { if (pInfo->groupSort) {
...@@ -626,14 +627,22 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -626,14 +627,22 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
} else { } else {
appendOneRowToDataBlock(p, pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
} }
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
} }
if (pInfo->groupSort) { if (pInfo->groupSort) {
pInfo->hasGroupId = false; pInfo->hasGroupId = false;
} }
if (p->info.rows > 0) { // todo extract method if (p->info.rows > 0) { // todo extract method
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
if (p->info.rows == 0) {
goto _retry;
}
blockDataEnsureCapacity(pDataBlock, p->info.rows); blockDataEnsureCapacity(pDataBlock, p->info.rows);
int32_t numOfCols = taosArrayGetSize(pColMatchInfo); int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
...@@ -650,9 +659,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -650,9 +659,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
} }
blockDataDestroy(p); blockDataDestroy(p);
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.groupId, qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.groupId,
pDataBlock->info.rows); pDataBlock->info.rows);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
} }
...@@ -717,6 +726,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -717,6 +726,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
goto _error; goto _error;
} }
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = createResDataBlock(pDescNode); pInfo->binfo.pRes = createResDataBlock(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize; int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
ASSERT(rowSize < 100 * 1024 * 1024); ASSERT(rowSize < 100 * 1024 * 1024);
...@@ -725,6 +735,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -725,6 +735,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
......
...@@ -2498,7 +2498,7 @@ static const SOptimizeRule optimizeRuleSet[] = { ...@@ -2498,7 +2498,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize}, {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}, {.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
{.pName = "TagScan", .optimizeFunc = tagScanOptimize}, {.pName = "TagScan", .optimizeFunc = tagScanOptimize},
// {.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize} {.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize}
}; };
// clang-format on // clang-format on
......
...@@ -97,6 +97,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE ...@@ -97,6 +97,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
if (NULL == pExchange->node.pLimit) { if (NULL == pExchange->node.pLimit) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
((SLimitNode*)pChild->pLimit)->limit += ((SLimitNode*)pChild->pLimit)->offset;
((SLimitNode*)pChild->pLimit)->offset = 0; ((SLimitNode*)pChild->pLimit)->offset = 0;
} }
...@@ -470,6 +471,12 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla ...@@ -470,6 +471,12 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) { if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
if (TSDB_CODE_SUCCESS == code && NULL != pSplitNode->pLimit) {
pMerge->node.pLimit = nodesCloneNode(pSplitNode->pLimit);
if (NULL == pMerge->node.pLimit) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (NULL == pSubplan) { if (NULL == pSubplan) {
code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge); code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
...@@ -934,6 +941,7 @@ static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSp ...@@ -934,6 +941,7 @@ static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSp
if (NULL == pSplitNode->pLimit) { if (NULL == pSplitNode->pLimit) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
((SLimitNode*)pInfo->pSplitNode->pLimit)->limit += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset;
((SLimitNode*)pInfo->pSplitNode->pLimit)->offset = 0; ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset = 0;
} }
} }
...@@ -1021,6 +1029,10 @@ static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSub ...@@ -1021,6 +1029,10 @@ static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSub
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
int32_t code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys); int32_t code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (NULL != pMergeScan->pLimit) {
((SLimitNode*)pMergeScan->pLimit)->limit += ((SLimitNode*)pMergeScan->pLimit)->offset;
((SLimitNode*)pMergeScan->pLimit)->offset = 0;
}
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort); code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
...@@ -39,9 +39,9 @@ endi ...@@ -39,9 +39,9 @@ endi
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
if $data41 != 5 then #if $data41 != 5 then
return -1 # return -1
endi #endi
sql select * from $stb order by ts desc limit 5 sql select * from $stb order by ts desc limit 5
if $rows != 5 then if $rows != 5 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册