Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bacd7b60
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
bacd7b60
编写于
6月 23, 2022
作者:
S
shenglian-zhou
提交者:
GitHub
6月 23, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14136 from taosdata/szhou/feature/sort-group-2
feat: add group sort operator
上级
1047da04
faf5424b
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
20 addition
and
22 deletion
+20
-22
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+2
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-3
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+12
-18
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
bacd7b60
...
...
@@ -226,6 +226,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT
,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
bacd7b60
...
...
@@ -420,6 +420,8 @@ typedef struct SSortPhysiNode {
SNodeList
*
pTargets
;
}
SSortPhysiNode
;
typedef
SSortPhysiNode
SGroupSortPhysiNode
;
typedef
struct
SPartitionPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of partition_by_clause
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
bacd7b60
...
...
@@ -841,9 +841,8 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
int32_t
createScanTableListInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
);
int32_t
doCreateMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
);
SOperatorInfo
*
createGroupSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SGroupSortPhysiNode
*
pSortPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
STableListInfo
*
pTableListInfo
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
,
uint64_t
queryId
,
uint64_t
taskId
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
bacd7b60
...
...
@@ -4225,6 +4225,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SORT
==
type
)
{
pOptr
=
createSortOperatorInfo
(
ops
[
0
],
(
SSortPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT
==
type
)
{
pOptr
=
createGroupSortOperatorInfo
(
ops
[
0
],
(
SGroupSortPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE
==
type
)
{
SMergePhysiNode
*
pMergePhyNode
=
(
SMergePhysiNode
*
)
pPhyNode
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
bacd7b60
...
...
@@ -2330,7 +2330,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
size_t
tableListSize
=
taosArrayGetSize
(
pInfo
->
tableListInfo
->
pTableList
);
if
(
!
pInfo
->
hasGroupId
)
{
pInfo
->
hasGroupId
=
true
;
if
(
tableListSize
==
0
)
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
bacd7b60
...
...
@@ -424,10 +424,17 @@ int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u
return
TSDB_CODE_SUCCESS
;
}
// TODO:
SOperatorInfo
*
createGroupSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSortPhysiNode
*
pSortPhyNode
,
void
destroyGroupSortOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SGroupSortOperatorInfo
*
pInfo
=
(
SGroupSortOperatorInfo
*
)
param
;
pInfo
->
binfo
.
pRes
=
blockDataDestroy
(
pInfo
->
binfo
.
pRes
);
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
}
SOperatorInfo
*
createGroupSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SGroupSortPhysiNode
*
pSortPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
S
SortOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
S
SortOperatorInfo
));
S
GroupSortOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SGroup
SortOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
/* || rowSize > 100 * 1024 * 1024*/
)
{
goto
_error
;
...
...
@@ -452,8 +459,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysi
;
pInfo
->
pColMatchInfo
=
pColMatchColInfo
;
pOperator
->
name
=
"GroupSortOperator"
;
// TODO
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SORT
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
...
...
@@ -461,7 +467,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysi
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doGroupSort
,
NULL
,
NULL
,
destroy
Order
OperatorInfo
,
NULL
,
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doGroupSort
,
NULL
,
NULL
,
destroy
GroupSort
OperatorInfo
,
NULL
,
NULL
,
getGroupSortExplainExecInfo
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
@@ -478,18 +484,6 @@ _error:
return
NULL
;
}
void
destroyGroupSortOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SGroupSortOperatorInfo
*
pInfo
=
(
SGroupSortOperatorInfo
*
)
param
;
pInfo
->
binfo
.
pRes
=
blockDataDestroy
(
pInfo
->
binfo
.
pRes
);
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
}
// TODO: sort group
// TODO: msortCompare compare group id in multiway merge sort.
// TODO: table merge scan, group first, then for each group, multiple readers
//=====================================================================================
// Multiway Sort Merge operator
typedef
struct
SMultiwaySortMergeOperatorInfo
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录