提交 376e7ea5 编写于 作者: H Haojun Liao

fix(query): support limit/offset in merge sort operator.

上级 d6eb9af3
...@@ -899,6 +899,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, ...@@ -899,6 +899,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo,
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
......
...@@ -355,8 +355,8 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo ...@@ -355,8 +355,8 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo
} }
} }
static void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, // todo handle the slimit info
SOperatorInfo* pOperator) { void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
SLimit* pLimit = &pLimitInfo->limit; SLimit* pLimit = &pLimitInfo->limit;
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) { if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
...@@ -377,7 +377,8 @@ static void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecT ...@@ -377,7 +377,8 @@ static void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecT
blockDataKeepFirstNRows(pBlock, keep); blockDataKeepFirstNRows(pBlock, keep);
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
setTaskStatus(pTaskInfo, TASK_COMPLETED);
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
} }
......
...@@ -531,7 +531,7 @@ typedef struct SMultiwayMergeOperatorInfo { ...@@ -531,7 +531,7 @@ typedef struct SMultiwayMergeOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t bufPageSize; int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SLimitInfo limitInfo;
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
...@@ -592,6 +592,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -592,6 +592,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
_retry:
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = NULL;
if (pInfo->groupSort) { if (pInfo->groupSort) {
...@@ -626,14 +627,22 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -626,14 +627,22 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
} else { } else {
appendOneRowToDataBlock(p, pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
} }
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
} }
if (pInfo->groupSort) { if (pInfo->groupSort) {
pInfo->hasGroupId = false; pInfo->hasGroupId = false;
} }
if (p->info.rows > 0) { // todo extract method if (p->info.rows > 0) { // todo extract method
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
if (p->info.rows == 0) {
goto _retry;
}
blockDataEnsureCapacity(pDataBlock, p->info.rows); blockDataEnsureCapacity(pDataBlock, p->info.rows);
int32_t numOfCols = taosArrayGetSize(pColMatchInfo); int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
...@@ -650,9 +659,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -650,9 +659,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
} }
blockDataDestroy(p); blockDataDestroy(p);
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.groupId, qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.groupId,
pDataBlock->info.rows); pDataBlock->info.rows);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
} }
...@@ -717,6 +726,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -717,6 +726,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
goto _error; goto _error;
} }
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = createResDataBlock(pDescNode); pInfo->binfo.pRes = createResDataBlock(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize; int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
ASSERT(rowSize < 100 * 1024 * 1024); ASSERT(rowSize < 100 * 1024 * 1024);
...@@ -725,6 +735,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -725,6 +735,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册