提交 bd819f5c 编写于 作者: S slzhou

feat: add filter to sort operator

上级 c040659a
......@@ -682,6 +682,8 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SNode* pCondition;
} SSortOperatorInfo;
typedef struct STagFilterOperatorInfo {
......
......@@ -46,7 +46,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
;
pInfo->pCondition = pSortPhyNode->node.pConditions;
pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
......@@ -205,14 +205,27 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code);
}
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->pColMatchInfo, pInfo);
SSDataBlock* pBlock = NULL;
while (1) {
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->pColMatchInfo, pInfo);
if (pBlock != NULL) {
doFilter(pInfo->pCondition, pBlock);
}
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
if (blockDataGetNumOfRows(pBlock) > 0) {
break;
}
}
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
} else {
doSetOperatorCompleted(pOperator);
}
return pBlock;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册