提交 16b6ce82 编写于 作者: S slzhou@taodata.com

feat: add filter to operator merge aligned interval agg

上级 7cb886e7
...@@ -806,7 +806,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -806,7 +806,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo); SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
......
...@@ -4449,7 +4449,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4449,7 +4449,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
......
...@@ -4363,6 +4363,8 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo { ...@@ -4363,6 +4363,8 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
uint64_t groupId; uint64_t groupId;
SSDataBlock* prefetchedBlock; SSDataBlock* prefetchedBlock;
bool inputBlocksFinished; bool inputBlocksFinished;
SNode* pCondition;
} SMergeAlignedIntervalAggOperatorInfo; } SMergeAlignedIntervalAggOperatorInfo;
void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -4498,8 +4500,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4498,8 +4500,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); getTableScanInfo(pOperator, &iaInfo->order, &scanFlag);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
doFilter(miaInfo->pCondition, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows > 0) {
break; break;
} }
} }
...@@ -4507,7 +4509,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4507,7 +4509,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
pRes->info.groupId = miaInfo->groupId; pRes->info.groupId = miaInfo->groupId;
} }
if (pRes->info.rows == 0) { if (miaInfo->inputBlocksFinished) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -4518,7 +4520,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4518,7 +4520,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
int32_t primaryTsSlotId, SExecTaskInfo* pTaskInfo) { int32_t primaryTsSlotId, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo)); SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (miaInfo == NULL || pOperator == NULL) { if (miaInfo == NULL || pOperator == NULL) {
...@@ -4528,6 +4530,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -4528,6 +4530,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
miaInfo->pCondition = pCondition;
iaInfo->win = pTaskInfo->window; iaInfo->win = pTaskInfo->window;
iaInfo->order = TSDB_ORDER_ASC; iaInfo->order = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval; iaInfo->interval = *pInterval;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册