Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d5541cf1
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d5541cf1
编写于
6月 14, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: super table join
上级
b472ed11
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
109 addition
and
100 deletion
+109
-100
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+98
-100
source/libs/planner/test/planOtherTest.cpp
source/libs/planner/test/planOtherTest.cpp
+7
-0
source/libs/planner/test/planTestUtil.h
source/libs/planner/test/planTestUtil.h
+4
-0
未找到文件。
source/libs/planner/src/planSpliter.c
浏览文件 @
d5541cf1
...
...
@@ -15,6 +15,7 @@
#include "functionMgt.h"
#include "planInt.h"
#include "tglobal.h"
#define SPLIT_FLAG_MASK(n) (1 << n)
...
...
@@ -37,7 +38,8 @@ typedef struct SSplitRule {
FSplit
splitFunc
;
}
SSplitRule
;
typedef
bool
(
*
FSplFindSplitNode
)(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
void
*
pInfo
);
// typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
typedef
bool
(
*
FSplFindSplitNode
)(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
void
*
pInfo
);
static
void
splSetSubplanVgroups
(
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
))
{
...
...
@@ -95,9 +97,23 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla
return
code
;
}
static
bool
splMatchByNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
FSplFindSplitNode
func
,
void
*
pInfo
)
{
if
(
func
(
pCxt
,
pSubplan
,
pNode
,
pInfo
))
{
return
true
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
if
(
splMatchByNode
(
pCxt
,
pSubplan
,
(
SLogicNode
*
)
pChild
,
func
,
pInfo
))
{
return
true
;
}
}
return
NULL
;
}
static
bool
splMatch
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
flag
,
FSplFindSplitNode
func
,
void
*
pInfo
)
{
if
(
!
SPLIT_FLAG_TEST_MASK
(
pSubplan
->
splitFlag
,
flag
))
{
if
(
func
(
pCxt
,
pSubplan
,
pInfo
))
{
if
(
splMatchByNode
(
pCxt
,
pSubplan
,
pSubplan
->
pNode
,
func
,
pInfo
))
{
return
true
;
}
}
...
...
@@ -110,6 +126,11 @@ static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag,
return
false
;
}
static
void
splSetParent
(
SLogicNode
*
pNode
)
{
SNode
*
pChild
=
NULL
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
((
SLogicNode
*
)
pChild
)
->
pParent
=
pNode
;
}
}
typedef
struct
SStableSplitInfo
{
SLogicNode
*
pSplitNode
;
SLogicSubplan
*
pSubplan
;
...
...
@@ -149,8 +170,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
stbSplIsMultiTbScan
(
streamQuery
,
(
SScanLogicNode
*
)
pNode
);
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
return
!
(((
SJoinLogicNode
*
)
pNode
)
->
isSingleTableJoin
);
//
case QUERY_NODE_LOGIC_PLAN_JOIN:
//
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
case
QUERY_NODE_LOGIC_PLAN_AGG
:
return
!
stbSplHasGatherExecFunc
(((
SAggLogicNode
*
)
pNode
)
->
pAggFuncs
)
&&
stbSplHasMultiTbScan
(
streamQuery
,
pNode
);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
{
...
...
@@ -168,27 +189,14 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return
false
;
}
static
SLogicNode
*
stbSplMatchByNode
(
bool
streamQuery
,
SLogicNode
*
pNode
)
{
if
(
stbSplNeedSplit
(
streamQuery
,
pNode
))
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
stbSplMatchByNode
(
streamQuery
,
(
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
stbSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
stbSplMatchByNode
(
pCxt
->
pPlanCxt
->
streamQuery
,
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pSplitNode
=
pSplitNode
;
static
bool
stbSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SStableSplitInfo
*
pInfo
)
{
if
(
stbSplNeedSplit
(
pCxt
->
pPlanCxt
->
streamQuery
,
pNode
))
{
pInfo
->
pSplitNode
=
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pSplitNod
e
;
return
fals
e
;
}
static
int32_t
stbSplRewriteFuns
(
const
SNodeList
*
pFuncs
,
SNodeList
**
pPartialFuncs
,
SNodeList
**
pMergeFuncs
)
{
...
...
@@ -266,6 +274,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pMergeWindow
->
node
.
pTargets
=
pTargets
;
pPartWin
->
node
.
pChildren
=
pChildren
;
splSetParent
((
SLogicNode
*
)
pPartWin
);
code
=
stbSplRewriteFuns
(
pFunc
,
&
pPartWin
->
pFuncs
,
&
pMergeWindow
->
pFuncs
);
}
int32_t
index
=
0
;
...
...
@@ -458,6 +467,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
pMergeAgg
->
node
.
pConditions
=
pConditions
;
pMergeAgg
->
node
.
pTargets
=
pTargets
;
pPartAgg
->
node
.
pChildren
=
pChildren
;
splSetParent
((
SLogicNode
*
)
pPartAgg
);
code
=
stbSplRewriteFuns
(
pFunc
,
&
pPartAgg
->
pAggFuncs
,
&
pMergeAgg
->
pAggFuncs
);
}
...
...
@@ -572,6 +582,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
SNodeList
*
pMergeKeys
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pPartSort
->
node
.
pChildren
=
pChildren
;
splSetParent
((
SLogicNode
*
)
pPartSort
);
pPartSort
->
pSortKeys
=
pSortKeys
;
code
=
stbSplCreateMergeKeys
(
pPartSort
->
pSortKeys
,
pPartSort
->
node
.
pTargets
,
&
pMergeKeys
);
}
...
...
@@ -703,7 +714,12 @@ typedef struct SSigTbJoinSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SSigTbJoinSplitInfo
;
static
bool
sigTbJoinSplNeedSplit
(
SJoinLogicNode
*
pJoin
)
{
static
bool
sigTbJoinSplNeedSplit
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
!=
nodeType
(
pNode
))
{
return
false
;
}
SJoinLogicNode
*
pJoin
=
(
SJoinLogicNode
*
)
pNode
;
if
(
!
pJoin
->
isSingleTableJoin
)
{
return
false
;
}
...
...
@@ -711,28 +727,15 @@ static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) {
QUERY_NODE_LOGIC_PLAN_EXCHANGE
!=
nodeType
(
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
));
}
static
SJoinLogicNode
*
sigTbJoinSplMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
==
nodeType
(
pNode
)
&&
sigTbJoinSplNeedSplit
((
SJoinLogicNode
*
)
pNode
))
{
return
(
SJoinLogicNode
*
)
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SJoinLogicNode
*
pSplitNode
=
sigTbJoinSplMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
sigTbJoinSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SSigTbJoinSplitInfo
*
pInfo
)
{
SJoinLogicNode
*
pJoin
=
sigTbJoinSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pJoin
)
{
pInfo
->
pJoin
=
pJoin
;
pInfo
->
pSplitNode
=
(
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
);
static
bool
sigTbJoinSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SSigTbJoinSplitInfo
*
pInfo
)
{
if
(
sigTbJoinSplNeedSplit
(
pNode
))
{
pInfo
->
pJoin
=
(
SJoinLogicNode
*
)
pNode
;
pInfo
->
pSplitNode
=
(
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
1
);
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pJoin
;
return
false
;
}
static
int32_t
singleTableJoinSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
...
...
@@ -825,27 +828,14 @@ typedef struct SUnionAllSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SUnionAllSplitInfo
;
static
SLogicNode
*
unAllSplMatchByNode
(
SLogicNode
*
pNode
)
{
static
bool
unAllSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SUnionAllSplitInfo
*
pInfo
)
{
if
(
QUERY_NODE_LOGIC_PLAN_PROJECT
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
unAllSplMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
unAllSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SUnionAllSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
unAllSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pProject
=
(
SProjectLogicNode
*
)
pSplitNode
;
pInfo
->
pProject
=
(
SProjectLogicNode
*
)
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pSplitNod
e
;
return
fals
e
;
}
static
int32_t
unAllSplCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SProjectLogicNode
*
pProject
)
{
...
...
@@ -900,20 +890,6 @@ typedef struct SUnionDistinctSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SUnionDistinctSplitInfo
;
static
SLogicNode
*
unDistSplMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_AGG
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
unDistSplMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
int32_t
unDistSplCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SAggLogicNode
*
pAgg
)
{
SExchangeLogicNode
*
pExchange
=
(
SExchangeLogicNode
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
...
...
@@ -931,13 +907,14 @@ static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* p
return
nodesListMakeAppend
(
&
pAgg
->
node
.
pChildren
,
(
SNode
*
)
pExchange
);
}
static
bool
unDistSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
S
UnionDistinctSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
unDistSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pAgg
=
(
SAggLogicNode
*
)
p
Split
Node
;
static
bool
unDistSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
S
LogicNode
*
pNode
,
SUnionDistinctSplitInfo
*
pInfo
)
{
if
(
QUERY_NODE_LOGIC_PLAN_AGG
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
pInfo
->
pAgg
=
(
SAggLogicNode
*
)
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pSplitNod
e
;
return
fals
e
;
}
static
int32_t
unionDistinctSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
...
...
@@ -960,27 +937,14 @@ typedef struct SSmaIndexSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SSmaIndexSplitInfo
;
static
SLogicNode
*
smaIdxSplMatchByNode
(
SLogicNode
*
pNode
)
{
static
bool
smaIdxSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SSmaIndexSplitInfo
*
pInfo
)
{
if
(
QUERY_NODE_LOGIC_PLAN_MERGE
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
smaIdxSplMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
smaIdxSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SSmaIndexSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
smaIdxSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pMerge
=
(
SMergeLogicNode
*
)
pSplitNode
;
pInfo
->
pMerge
=
(
SMergeLogicNode
*
)
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pSplitNod
e
;
return
fals
e
;
}
static
int32_t
smaIndexSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
...
...
@@ -998,13 +962,47 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return
code
;
}
typedef
struct
SQnodeSplitInfo
{
SLogicNode
*
pSplitNode
;
SLogicSubplan
*
pSubplan
;
}
SQnodeSplitInfo
;
static
bool
qndSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SQnodeSplitInfo
*
pInfo
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
)
&&
NULL
!=
pNode
->
pParent
)
{
pInfo
->
pSplitNode
=
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
false
;
}
static
int32_t
qnodeSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
if
(
QUERY_POLICY_QNODE
!=
tsQueryPolicy
)
{
return
TSDB_CODE_SUCCESS
;
}
SQnodeSplitInfo
info
=
{
0
};
if
(
!
splMatch
(
pCxt
,
pSubplan
,
0
,
(
FSplFindSplitNode
)
qndSplFindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
splCreateExchangeNodeForSubplan
(
pCxt
,
info
.
pSubplan
,
info
.
pSplitNode
,
info
.
pSubplan
->
subplanType
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
info
.
pSubplan
->
pChildren
,
(
SNode
*
)
splCreateScanSubplan
(
pCxt
,
info
.
pSplitNode
,
0
));
}
++
(
pCxt
->
groupId
);
pCxt
->
split
=
true
;
return
code
;
}
// clang-format off
static
const
SSplitRule
splitRuleSet
[]
=
{
{.
pName
=
"SuperTableSplit"
,
.
splitFunc
=
stableSplit
},
{.
pName
=
"SingleTableJoinSplit"
,
.
splitFunc
=
singleTableJoinSplit
},
{.
pName
=
"UnionAllSplit"
,
.
splitFunc
=
unionAllSplit
},
{.
pName
=
"UnionDistinctSplit"
,
.
splitFunc
=
unionDistinctSplit
},
{.
pName
=
"SmaIndexSplit"
,
.
splitFunc
=
smaIndexSplit
}
{.
pName
=
"SmaIndexSplit"
,
.
splitFunc
=
smaIndexSplit
},
{.
pName
=
"QnodeSplit"
,
.
splitFunc
=
qnodeSplit
}
};
// clang-format on
...
...
source/libs/planner/test/planOtherTest.cpp
浏览文件 @
d5541cf1
...
...
@@ -83,3 +83,10 @@ TEST_F(PlanOtherTest, delete) {
run
(
"DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10"
);
}
TEST_F
(
PlanOtherTest
,
queryPolicy
)
{
useDb
(
"root"
,
"test"
);
tsQueryPolicy
=
QUERY_POLICY_QNODE
;
run
(
"SELECT COUNT(*) FROM st1"
);
}
source/libs/planner/test/planTestUtil.h
浏览文件 @
d5541cf1
...
...
@@ -18,6 +18,10 @@
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "planInt.h"
class
PlannerTestBaseImpl
;
struct
TAOS_MULTI_BIND
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录