Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a5bf32a5
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a5bf32a5
编写于
7月 06, 2022
作者:
S
shenglian-zhou
提交者:
GitHub
7月 06, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14590 from taosdata/szhou/feature/push-project-condition
feat: add filter to aggregate operator
上级
13597298
c040659a
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
18 addition
and
7 deletion
+18
-7
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+3
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+15
-6
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
a5bf32a5
...
...
@@ -479,6 +479,8 @@ typedef struct SAggOperatorInfo {
uint64_t
groupId
;
SGroupResInfo
groupResInfo
;
SExprSupp
scalarExprSup
;
SNode
*
pCondition
;
}
SAggOperatorInfo
;
typedef
struct
SProjectOperatorInfo
{
...
...
@@ -758,7 +760,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSysTableScanOperatorInfo
(
void
*
readHandle
,
SSystemTableScanPhysiNode
*
pScanPhyNode
,
const
char
*
pUser
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExprInfo
*
pScalarExprInfo
,
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
S
Node
*
pCondition
,
S
ExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIndefinitOutputOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
a5bf32a5
...
...
@@ -3012,11 +3012,19 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pOperator
,
pInfo
,
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
pRes
->
info
.
rows
==
0
||
!
hasDataInGroupInfo
(
&
pAggInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
pInfo
,
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pAggInfo
->
pCondition
,
pInfo
->
pRes
);
if
(
!
hasDataInGroupInfo
(
&
pAggInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
break
;
}
}
size_t
rows
=
blockDataGetNumOfRows
(
pInfo
->
pRes
);
pOperator
->
resultInfo
.
totalRows
+=
rows
;
...
...
@@ -3557,7 +3565,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
}
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExprInfo
*
pScalarExprInfo
,
SSDataBlock
*
pResultBlock
,
S
Node
*
pCondition
,
S
ExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
)
{
SAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -3581,6 +3589,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pInfo
->
groupId
=
INT32_MIN
;
pInfo
->
pCondition
=
pCondition
;
pOperator
->
name
=
"TableAggregate"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
;
pOperator
->
blocking
=
true
;
...
...
@@ -4328,7 +4337,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pScalarExprInfo
,
numOfScalarExpr
,
pTaskInfo
);
}
else
{
pOptr
=
createAggregateOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
pScalarExprInfo
,
numOfScalarExpr
,
pTaskInfo
);
createAggregateOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
p
AggNode
->
node
.
pConditions
,
p
ScalarExprInfo
,
numOfScalarExpr
,
pTaskInfo
);
}
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
==
type
||
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
)
{
SIntervalPhysiNode
*
pIntervalPhyNode
=
(
SIntervalPhysiNode
*
)
pPhyNode
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录