Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
226ee062
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
226ee062
编写于
6月 29, 2023
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: add physical plan processing
上级
9f0da17c
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
600 addition
and
21 deletion
+600
-21
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+2
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+10
-0
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+19
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+144
-5
source/libs/nodes/src/nodesMsgFuncs.c
source/libs/nodes/src/nodesMsgFuncs.c
+191
-4
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+26
-0
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+4
-1
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+204
-11
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
226ee062
...
...
@@ -276,6 +276,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_INSERT
,
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT
,
QUERY_NODE_PHYSICAL_PLAN_DELETE
,
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
,
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
,
QUERY_NODE_PHYSICAL_SUBPLAN
,
QUERY_NODE_PHYSICAL_PLAN
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
226ee062
...
...
@@ -435,6 +435,16 @@ typedef struct SHashJoinPhysiNode {
SQueryStat
inputStat
[
2
];
}
SHashJoinPhysiNode
;
typedef
struct
SGroupCachePhysiNode
{
SPhysiNode
node
;
SNodeList
*
pGroupCols
;
}
SGroupCachePhysiNode
;
typedef
struct
SDynQueryCtrlPhysiNode
{
SPhysiNode
node
;
EDynQueryType
qType
;
}
SDynQueryCtrlPhysiNode
;
typedef
struct
SAggPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of group_by_clause and parameter expression of aggregate function
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
226ee062
...
...
@@ -407,6 +407,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
CLONE_NODE_FIELD
(
pPrimKeyEqCond
);
CLONE_NODE_FIELD
(
pColEqCond
);
CLONE_NODE_FIELD
(
pTagEqCond
);
CLONE_NODE_FIELD
(
pTagOnCond
);
CLONE_NODE_FIELD
(
pOtherOnCond
);
COPY_SCALAR_FIELD
(
isSingleTableJoin
);
COPY_SCALAR_FIELD
(
hasSubQuery
);
...
...
@@ -534,6 +535,18 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
logicGroupCacheCopy
(
const
SGroupCacheLogicNode
*
pSrc
,
SGroupCacheLogicNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
CLONE_NODE_LIST_FIELD
(
pGroupCols
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
logicDynQueryCtrlCopy
(
const
SDynQueryCtrlLogicNode
*
pSrc
,
SDynQueryCtrlLogicNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
COPY_SCALAR_FIELD
(
qType
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
logicSubplanCopy
(
const
SLogicSubplan
*
pSrc
,
SLogicSubplan
*
pDst
)
{
COPY_OBJECT_FIELD
(
id
,
sizeof
(
SSubplanId
));
CLONE_NODE_FIELD_EX
(
pNode
,
SLogicNode
*
);
...
...
@@ -807,6 +820,12 @@ SNode* nodesCloneNode(const SNode* pNode) {
case
QUERY_NODE_LOGIC_PLAN_INTERP_FUNC
:
code
=
logicInterpFuncCopy
((
const
SInterpFuncLogicNode
*
)
pNode
,
(
SInterpFuncLogicNode
*
)
pDst
);
break
;
case
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE
:
code
=
logicGroupCacheCopy
((
const
SGroupCacheLogicNode
*
)
pNode
,
(
SGroupCacheLogicNode
*
)
pDst
);
break
;
case
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL
:
code
=
logicDynQueryCtrlCopy
((
const
SDynQueryCtrlLogicNode
*
)
pNode
,
(
SDynQueryCtrlLogicNode
*
)
pDst
);
break
;
case
QUERY_NODE_LOGIC_SUBPLAN
:
code
=
logicSubplanCopy
((
const
SLogicSubplan
*
)
pNode
,
(
SLogicSubplan
*
)
pDst
);
break
;
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
226ee062
...
...
@@ -328,7 +328,9 @@ const char* nodesNodeName(ENodeType type) {
case
QUERY_NODE_PHYSICAL_PLAN_PROJECT
:
return
"PhysiProject"
;
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
return
"PhysiJoin"
;
return
"PhysiMergeJoin"
;
case
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
:
return
"PhysiHashJoin"
;
case
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
:
return
"PhysiAgg"
;
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
...
...
@@ -380,6 +382,10 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiQueryInsert"
;
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
return
"PhysiDelete"
;
case
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
:
return
"PhysiGroupCache"
;
case
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
:
return
"PhysiDynamicQueryCtrl"
;
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
"PhysiSubplan"
;
case
QUERY_NODE_PHYSICAL_PLAN
:
...
...
@@ -1960,12 +1966,16 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
static
const
char
*
jkJoinPhysiPlanJoinType
=
"JoinType"
;
static
const
char
*
jkJoinPhysiPlanInputTsOrder
=
"InputTsOrder"
;
static
const
char
*
jkJoinPhysiPlanOnLeftCols
=
"OnLeftColumns"
;
static
const
char
*
jkJoinPhysiPlanOnRightCols
=
"OnRightColumns"
;
static
const
char
*
jkJoinPhysiPlanPrimKeyCondition
=
"PrimKeyCondition"
;
static
const
char
*
jkJoinPhysiPlanOnConditions
=
"OnConditions"
;
static
const
char
*
jkJoinPhysiPlanTargets
=
"Targets"
;
static
const
char
*
jkJoinPhysiPlanColEqualOnConditions
=
"ColumnEqualOnConditions"
;
static
const
char
*
jkJoinPhysiPlanInputRowNum
=
"InputRowNum"
;
static
const
char
*
jkJoinPhysiPlanInputRowSize
=
"InputRowSize"
;
static
int32_t
physiJoinNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
static
int32_t
physi
Merge
JoinNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SSortMergeJoinPhysiNode
*
pNode
=
(
const
SSortMergeJoinPhysiNode
*
)
pObj
;
int32_t
code
=
physicPlanNodeToJson
(
pObj
,
pJson
);
...
...
@@ -1987,7 +1997,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
return
code
;
}
static
int32_t
jsonToPhysiJoinNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
static
int32_t
jsonToPhysi
Merge
JoinNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SSortMergeJoinPhysiNode
*
pNode
=
(
SSortMergeJoinPhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicPlanNode
(
pJson
,
pObj
);
...
...
@@ -2009,6 +2019,76 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
return
code
;
}
static
int32_t
physiHashJoinNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SHashJoinPhysiNode
*
pNode
=
(
const
SHashJoinPhysiNode
*
)
pObj
;
int32_t
code
=
physicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinPhysiPlanJoinType
,
pNode
->
joinType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkJoinPhysiPlanOnLeftCols
,
pNode
->
pOnLeft
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkJoinPhysiPlanOnRightCols
,
pNode
->
pOnRight
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinPhysiPlanOnConditions
,
nodeToJson
,
pNode
->
pFilterConditions
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkJoinPhysiPlanTargets
,
pNode
->
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinPhysiPlanInputRowNum
,
pNode
->
inputStat
[
0
].
inputRowNum
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinPhysiPlanInputRowSize
,
pNode
->
inputStat
[
0
].
inputRowSize
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinPhysiPlanInputRowNum
,
pNode
->
inputStat
[
1
].
inputRowNum
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinPhysiPlanInputRowSize
,
pNode
->
inputStat
[
1
].
inputRowSize
);
}
return
code
;
}
static
int32_t
jsonToPhysiHashJoinNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SHashJoinPhysiNode
*
pNode
=
(
SHashJoinPhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkJoinPhysiPlanJoinType
,
pNode
->
joinType
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkJoinPhysiPlanOnLeftCols
,
&
pNode
->
pOnLeft
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkJoinPhysiPlanOnRightCols
,
&
pNode
->
pOnRight
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkJoinPhysiPlanOnConditions
,
&
pNode
->
pFilterConditions
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkJoinPhysiPlanTargets
,
&
pNode
->
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkJoinPhysiPlanInputRowNum
,
pNode
->
inputStat
[
0
].
inputRowNum
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkJoinPhysiPlanInputRowSize
,
pNode
->
inputStat
[
0
].
inputRowSize
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkJoinPhysiPlanInputRowNum
,
pNode
->
inputStat
[
1
].
inputRowNum
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkJoinPhysiPlanInputRowSize
,
pNode
->
inputStat
[
1
].
inputRowSize
,
code
);
}
return
code
;
}
static
const
char
*
jkAggPhysiPlanExprs
=
"Exprs"
;
static
const
char
*
jkAggPhysiPlanGroupKeys
=
"GroupKeys"
;
static
const
char
*
jkAggPhysiPlanAggFuncs
=
"AggFuncs"
;
...
...
@@ -2830,6 +2910,53 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) {
return
code
;
}
static
const
char
*
jkGroupCachePhysiPlanGroupCols
=
"GroupColumns"
;
static
int32_t
physiGroupCacheNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SGroupCachePhysiNode
*
pNode
=
(
const
SGroupCachePhysiNode
*
)
pObj
;
int32_t
code
=
physicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkGroupCachePhysiPlanGroupCols
,
pNode
->
pGroupCols
);
}
return
code
;
}
static
int32_t
jsonToPhysiGroupCacheNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SGroupCachePhysiNode
*
pNode
=
(
SGroupCachePhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkGroupCachePhysiPlanGroupCols
,
&
pNode
->
pGroupCols
);
}
return
code
;
}
static
const
char
*
jkDynQueryCtrlPhysiPlanQueryType
=
"QueryType"
;
static
int32_t
physiDynQueryCtrlNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SDynQueryCtrlPhysiNode
*
pNode
=
(
const
SDynQueryCtrlPhysiNode
*
)
pObj
;
int32_t
code
=
physicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDynQueryCtrlPhysiPlanQueryType
,
pNode
->
qType
);
}
return
code
;
}
static
int32_t
jsonToPhysiDynQueryCtrlNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SDynQueryCtrlPhysiNode
*
pNode
=
(
SDynQueryCtrlPhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkDynQueryCtrlPhysiPlanQueryType
,
pNode
->
qType
,
code
);
}
return
code
;
}
static
const
char
*
jkQueryNodeAddrId
=
"Id"
;
static
const
char
*
jkQueryNodeAddrInUse
=
"InUse"
;
static
const
char
*
jkQueryNodeAddrNumOfEps
=
"NumOfEps"
;
...
...
@@ -6675,7 +6802,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case
QUERY_NODE_PHYSICAL_PLAN_PROJECT
:
return
physiProjectNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
return
physiJoinNodeToJson
(
pObj
,
pJson
);
return
physiMergeJoinNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
:
return
physiHashJoinNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
:
return
physiAggNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
...
...
@@ -6721,6 +6850,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return
physiQueryInsertNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
return
physiDeleteNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
:
return
physiGroupCacheNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
:
return
physiDynQueryCtrlNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
subplanToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN
:
...
...
@@ -6997,7 +7130,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case
QUERY_NODE_PHYSICAL_PLAN_PROJECT
:
return
jsonToPhysiProjectNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
return
jsonToPhysiJoinNode
(
pJson
,
pObj
);
return
jsonToPhysiMergeJoinNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
:
return
jsonToPhysiHashJoinNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
:
return
jsonToPhysiAggNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
...
...
@@ -7041,6 +7176,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToPhysiQueryInsertNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
return
jsonToPhysiDeleteNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
:
return
jsonToPhysiGroupCacheNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
:
return
jsonToPhysiDynQueryCtrlNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
jsonToSubplan
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN
:
...
...
source/libs/nodes/src/nodesMsgFuncs.c
浏览文件 @
226ee062
...
...
@@ -2337,7 +2337,7 @@ enum {
PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS
};
static
int32_t
physiJoinNodeToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
static
int32_t
physi
Merge
JoinNodeToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
const
SSortMergeJoinPhysiNode
*
pNode
=
(
const
SSortMergeJoinPhysiNode
*
)
pObj
;
int32_t
code
=
tlvEncodeObj
(
pEncoder
,
PHY_SORT_MERGE_JOIN_CODE_BASE_NODE
,
physiNodeToMsg
,
&
pNode
->
node
);
...
...
@@ -2359,7 +2359,7 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
return
code
;
}
static
int32_t
msgToPhysiJoinNode
(
STlvDecoder
*
pDecoder
,
void
*
pObj
)
{
static
int32_t
msgToPhysi
Merge
JoinNode
(
STlvDecoder
*
pDecoder
,
void
*
pObj
)
{
SSortMergeJoinPhysiNode
*
pNode
=
(
SSortMergeJoinPhysiNode
*
)
pObj
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -2392,6 +2392,100 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) {
return
code
;
}
enum
{
PHY_HASH_JOIN_CODE_BASE_NODE
=
1
,
PHY_HASH_JOIN_CODE_JOIN_TYPE
,
PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN
,
PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN
,
PHY_HASH_JOIN_CODE_ON_CONDITIONS
,
PHY_HASH_JOIN_CODE_TARGETS
,
PHY_HASH_JOIN_CODE_INPUT_ROW_NUM0
,
PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE0
,
PHY_HASH_JOIN_CODE_INPUT_ROW_NUM1
,
PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1
};
static
int32_t
physiHashJoinNodeToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
const
SHashJoinPhysiNode
*
pNode
=
(
const
SHashJoinPhysiNode
*
)
pObj
;
int32_t
code
=
tlvEncodeObj
(
pEncoder
,
PHY_HASH_JOIN_CODE_BASE_NODE
,
physiNodeToMsg
,
&
pNode
->
node
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeEnum
(
pEncoder
,
PHY_HASH_JOIN_CODE_JOIN_TYPE
,
pNode
->
joinType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeObj
(
pEncoder
,
PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN
,
nodeListToMsg
,
pNode
->
pOnLeft
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeObj
(
pEncoder
,
PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN
,
nodeListToMsg
,
pNode
->
pOnRight
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeObj
(
pEncoder
,
PHY_HASH_JOIN_CODE_ON_CONDITIONS
,
nodeToMsg
,
pNode
->
pFilterConditions
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeObj
(
pEncoder
,
PHY_HASH_JOIN_CODE_TARGETS
,
nodeListToMsg
,
pNode
->
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeI64
(
pEncoder
,
PHY_HASH_JOIN_CODE_INPUT_ROW_NUM0
,
pNode
->
inputStat
[
0
].
inputRowNum
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeI64
(
pEncoder
,
PHY_HASH_JOIN_CODE_INPUT_ROW_NUM1
,
pNode
->
inputStat
[
1
].
inputRowNum
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeI32
(
pEncoder
,
PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE0
,
pNode
->
inputStat
[
0
].
inputRowSize
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeI32
(
pEncoder
,
PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1
,
pNode
->
inputStat
[
1
].
inputRowSize
);
}
return
code
;
}
static
int32_t
msgToPhysiHashJoinNode
(
STlvDecoder
*
pDecoder
,
void
*
pObj
)
{
SHashJoinPhysiNode
*
pNode
=
(
SHashJoinPhysiNode
*
)
pObj
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
STlv
*
pTlv
=
NULL
;
tlvForEach
(
pDecoder
,
pTlv
,
code
)
{
switch
(
pTlv
->
type
)
{
case
PHY_HASH_JOIN_CODE_BASE_NODE
:
code
=
tlvDecodeObjFromTlv
(
pTlv
,
msgToPhysiNode
,
&
pNode
->
node
);
break
;
case
PHY_HASH_JOIN_CODE_JOIN_TYPE
:
code
=
tlvDecodeEnum
(
pTlv
,
&
pNode
->
joinType
,
sizeof
(
pNode
->
joinType
));
break
;
case
PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN
:
code
=
msgToNodeListFromTlv
(
pTlv
,
(
void
**
)
&
pNode
->
pOnLeft
);
break
;
case
PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN
:
code
=
msgToNodeListFromTlv
(
pTlv
,
(
void
**
)
&
pNode
->
pOnRight
);
break
;
case
PHY_HASH_JOIN_CODE_ON_CONDITIONS
:
code
=
msgToNodeFromTlv
(
pTlv
,
(
void
**
)
&
pNode
->
pFilterConditions
);
break
;
case
PHY_HASH_JOIN_CODE_TARGETS
:
code
=
msgToNodeListFromTlv
(
pTlv
,
(
void
**
)
&
pNode
->
pTargets
);
break
;
case
PHY_HASH_JOIN_CODE_INPUT_ROW_NUM0
:
code
=
tlvDecodeI64
(
pTlv
,
&
pNode
->
inputStat
[
0
].
inputRowNum
);
break
;
case
PHY_HASH_JOIN_CODE_INPUT_ROW_NUM1
:
code
=
tlvDecodeI64
(
pTlv
,
&
pNode
->
inputStat
[
1
].
inputRowNum
);
break
;
case
PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE0
:
code
=
tlvDecodeI32
(
pTlv
,
&
pNode
->
inputStat
[
0
].
inputRowSize
);
break
;
case
PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1
:
code
=
tlvDecodeI32
(
pTlv
,
&
pNode
->
inputStat
[
1
].
inputRowSize
);
break
;
default:
break
;
}
}
return
code
;
}
enum
{
PHY_AGG_CODE_BASE_NODE
=
1
,
PHY_AGG_CODE_EXPR
,
...
...
@@ -3430,6 +3524,81 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) {
return
code
;
}
enum
{
PHY_GROUP_CACHE_CODE_BASE_NODE
=
1
,
PHY_GROUP_CACHE_CODE_GROUP_COLUMNS
};
static
int32_t
physiGroupCacheNodeToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
const
SGroupCachePhysiNode
*
pNode
=
(
const
SGroupCachePhysiNode
*
)
pObj
;
int32_t
code
=
tlvEncodeObj
(
pEncoder
,
PHY_GROUP_CACHE_CODE_BASE_NODE
,
physiNodeToMsg
,
&
pNode
->
node
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeObj
(
pEncoder
,
PHY_GROUP_CACHE_CODE_GROUP_COLUMNS
,
nodeListToMsg
,
pNode
->
pGroupCols
);
}
return
code
;
}
static
int32_t
msgToPhysiGroupCacheNode
(
STlvDecoder
*
pDecoder
,
void
*
pObj
)
{
SGroupCachePhysiNode
*
pNode
=
(
SGroupCachePhysiNode
*
)
pObj
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
STlv
*
pTlv
=
NULL
;
tlvForEach
(
pDecoder
,
pTlv
,
code
)
{
switch
(
pTlv
->
type
)
{
case
PHY_GROUP_CACHE_CODE_BASE_NODE
:
code
=
tlvDecodeObjFromTlv
(
pTlv
,
msgToPhysiNode
,
&
pNode
->
node
);
break
;
case
PHY_GROUP_CACHE_CODE_GROUP_COLUMNS
:
code
=
msgToNodeListFromTlv
(
pTlv
,
(
void
**
)
&
pNode
->
pGroupCols
);
break
;
default:
break
;
}
}
return
code
;
}
enum
{
PHY_DYN_QUERY_CTRL_CODE_BASE_NODE
=
1
,
PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE
};
static
int32_t
physiDynQueryCtrlNodeToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
const
SDynQueryCtrlPhysiNode
*
pNode
=
(
const
SDynQueryCtrlPhysiNode
*
)
pObj
;
int32_t
code
=
tlvEncodeObj
(
pEncoder
,
PHY_DYN_QUERY_CTRL_CODE_BASE_NODE
,
physiNodeToMsg
,
&
pNode
->
node
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeEnum
(
pEncoder
,
PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE
,
pNode
->
qType
);
}
return
code
;
}
static
int32_t
msgToPhysiDynQueryCtrlNode
(
STlvDecoder
*
pDecoder
,
void
*
pObj
)
{
SDynQueryCtrlPhysiNode
*
pNode
=
(
SDynQueryCtrlPhysiNode
*
)
pObj
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
STlv
*
pTlv
=
NULL
;
tlvForEach
(
pDecoder
,
pTlv
,
code
)
{
switch
(
pTlv
->
type
)
{
case
PHY_DYN_QUERY_CTRL_CODE_BASE_NODE
:
code
=
tlvDecodeObjFromTlv
(
pTlv
,
msgToPhysiNode
,
&
pNode
->
node
);
break
;
case
PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE
:
code
=
tlvDecodeEnum
(
pTlv
,
&
pNode
->
qType
,
sizeof
(
pNode
->
qType
));
break
;
default:
break
;
}
}
return
code
;
}
enum
{
SUBPLAN_ID_CODE_QUERY_ID
=
1
,
SUBPLAN_ID_CODE_GROUP_ID
,
SUBPLAN_ID_CODE_SUBPLAN_ID
};
static
int32_t
subplanIdInlineToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
...
...
@@ -3738,7 +3907,10 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code
=
physiProjectNodeToMsg
(
pObj
,
pEncoder
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
code
=
physiJoinNodeToMsg
(
pObj
,
pEncoder
);
code
=
physiMergeJoinNodeToMsg
(
pObj
,
pEncoder
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
:
code
=
physiHashJoinNodeToMsg
(
pObj
,
pEncoder
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
:
code
=
physiAggNodeToMsg
(
pObj
,
pEncoder
);
...
...
@@ -3799,6 +3971,12 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
code
=
physiDeleteNodeToMsg
(
pObj
,
pEncoder
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
:
code
=
physiGroupCacheNodeToMsg
(
pObj
,
pEncoder
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
:
code
=
physiDynQueryCtrlNodeToMsg
(
pObj
,
pEncoder
);
break
;
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
code
=
subplanToMsg
(
pObj
,
pEncoder
);
break
;
...
...
@@ -3881,7 +4059,10 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
code
=
msgToPhysiProjectNode
(
pDecoder
,
pObj
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
code
=
msgToPhysiJoinNode
(
pDecoder
,
pObj
);
code
=
msgToPhysiMergeJoinNode
(
pDecoder
,
pObj
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
:
code
=
msgToPhysiHashJoinNode
(
pDecoder
,
pObj
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
:
code
=
msgToPhysiAggNode
(
pDecoder
,
pObj
);
...
...
@@ -3942,6 +4123,12 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
code
=
msgToPhysiDeleteNode
(
pDecoder
,
pObj
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
:
code
=
msgToPhysiGroupCacheNode
(
pDecoder
,
pObj
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
:
code
=
msgToPhysiDynQueryCtrlNode
(
pDecoder
,
pObj
);
break
;
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
code
=
msgToSubplan
(
pDecoder
,
pObj
);
break
;
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
226ee062
...
...
@@ -520,6 +520,8 @@ SNode* nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SProjectPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
return
makeNode
(
type
,
sizeof
(
SSortMergeJoinPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
:
return
makeNode
(
type
,
sizeof
(
SHashJoinPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
:
return
makeNode
(
type
,
sizeof
(
SAggPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
...
...
@@ -575,6 +577,10 @@ SNode* nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SQueryInserterNode
));
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
return
makeNode
(
type
,
sizeof
(
SDataDeleterNode
));
case
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
:
return
makeNode
(
type
,
sizeof
(
SGroupCachePhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
:
return
makeNode
(
type
,
sizeof
(
SDynQueryCtrlPhysiNode
));
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
makeNode
(
type
,
sizeof
(
SSubplan
));
case
QUERY_NODE_PHYSICAL_PLAN
:
...
...
@@ -1243,6 +1249,15 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode
(
pPhyNode
->
pColEqCond
);
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
:
{
SHashJoinPhysiNode
*
pPhyNode
=
(
SHashJoinPhysiNode
*
)
pNode
;
destroyPhysiNode
((
SPhysiNode
*
)
pPhyNode
);
nodesDestroyList
(
pPhyNode
->
pOnLeft
);
nodesDestroyList
(
pPhyNode
->
pOnRight
);
nodesDestroyNode
(
pPhyNode
->
pFilterConditions
);
nodesDestroyList
(
pPhyNode
->
pTargets
);
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
:
{
SAggPhysiNode
*
pPhyNode
=
(
SAggPhysiNode
*
)
pNode
;
destroyPhysiNode
((
SPhysiNode
*
)
pPhyNode
);
...
...
@@ -1361,6 +1376,17 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode
(
pSink
->
pEndTs
);
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
:
{
SGroupCachePhysiNode
*
pPhyNode
=
(
SGroupCachePhysiNode
*
)
pNode
;
destroyPhysiNode
((
SPhysiNode
*
)
pPhyNode
);
nodesDestroyList
(
pPhyNode
->
pGroupCols
);
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
:
{
SDynQueryCtrlPhysiNode
*
pPhyNode
=
(
SDynQueryCtrlPhysiNode
*
)
pNode
;
destroyPhysiNode
((
SPhysiNode
*
)
pPhyNode
);
break
;
}
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
{
SSubplan
*
pSubplan
=
(
SSubplan
*
)
pNode
;
nodesClearList
(
pSubplan
->
pChildren
);
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
226ee062
...
...
@@ -1882,6 +1882,9 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild,
nodesWalkExpr
(
pChild
->
pConditions
,
eliminateProjOptCanUseNewChildTargetsImpl
,
&
cxt
);
if
(
!
cxt
.
canUse
)
return
false
;
}
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
==
nodeType
(
pChild
)
&&
((
SJoinLogicNode
*
)
pChild
)
->
joinAlgo
!=
JOIN_ALGO_UNKNOWN
)
{
return
false
;
}
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
==
nodeType
(
pChild
)
&&
((
SJoinLogicNode
*
)
pChild
)
->
pOtherOnCond
)
{
SJoinLogicNode
*
pJoinLogicNode
=
(
SJoinLogicNode
*
)
pChild
;
CheckNewChildTargetsCxt
cxt
=
{.
pNewChildTargets
=
pNewChildTargets
,
.
canUse
=
false
};
...
...
@@ -3160,7 +3163,7 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh
pJoin
->
node
.
requireDataOrder
=
DATA_ORDER_LEVEL_NONE
;
pJoin
->
node
.
resultDataOrder
=
DATA_ORDER_LEVEL_NONE
;
pJoin
->
pTagEqCond
=
nodesCloneNode
(
pOrigJoin
->
pTagEqCond
);
pJoin
->
p
Tag
OnCond
=
nodesCloneNode
(
pOrigJoin
->
pTagOnCond
);
pJoin
->
p
Other
OnCond
=
nodesCloneNode
(
pOrigJoin
->
pTagOnCond
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
pJoin
->
node
.
pChildren
=
pChildren
;
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
226ee062
...
...
@@ -708,6 +708,19 @@ static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
getJoinDataBlockDescNode
(
SNodeList
*
pChildren
,
int32_t
idx
,
SDataBlockDescNode
**
ppDesc
)
{
if
(
2
==
pChildren
->
length
)
{
*
ppDesc
=
((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
idx
))
->
pOutputDataBlockDesc
;
}
else
if
(
1
==
pChildren
->
length
&&
nodeType
(
nodesListGetNode
(
pChildren
,
0
))
==
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
)
{
SGroupCachePhysiNode
*
pGrpCache
=
(
SGroupCachePhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
);
*
ppDesc
=
((
SPhysiNode
*
)
nodesListGetNode
(
pGrpCache
->
node
.
pChildren
,
idx
))
->
pOutputDataBlockDesc
;
}
else
{
planError
(
"Invalid join children num:%d or child type:%d"
,
pChildren
->
length
,
nodeType
(
nodesListGetNode
(
pChildren
,
0
)));
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
createMergeJoinPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SJoinLogicNode
*
pJoinLogicNode
,
SPhysiNode
**
pPhyNode
)
{
...
...
@@ -717,14 +730,20 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SDataBlockDescNode
*
pLeftDesc
=
((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
pOutputDataBlockDesc
;
SDataBlockDescNode
*
pRightDesc
=
((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
1
))
->
pOutputDataBlockDesc
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
pJoin
->
joinType
=
pJoinLogicNode
->
joinType
;
pJoin
->
node
.
inputTsOrder
=
pJoinLogicNode
->
node
.
inputTsOrder
;
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pPrimKeyEqCond
,
&
pJoin
->
pPrimKeyCond
);
SDataBlockDescNode
*
pLeftDesc
=
NULL
;
SDataBlockDescNode
*
pRightDesc
=
NULL
;
int32_t
code
=
getJoinDataBlockDescNode
(
pChildren
,
0
,
&
pLeftDesc
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
getJoinDataBlockDescNode
(
pChildren
,
1
,
&
pRightDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pPrimKeyEqCond
,
&
pJoin
->
pPrimKeyCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setListSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
node
.
pTargets
,
...
...
@@ -759,6 +778,131 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
return
code
;
}
static
int32_t
extractHashJoinOpCols
(
int16_t
lBlkId
,
int16_t
rBlkId
,
SNode
*
pEq
,
SHashJoinPhysiNode
*
pJoin
)
{
if
(
QUERY_NODE_OPERATOR
==
nodeType
(
pEq
))
{
SOperatorNode
*
pOp
=
(
SOperatorNode
*
)
pEq
;
SColumnNode
*
pLeft
=
(
SColumnNode
*
)
pOp
->
pLeft
;
SColumnNode
*
pRight
=
(
SColumnNode
*
)
pOp
->
pRight
;
if
(
lBlkId
==
pLeft
->
dataBlockId
&&
rBlkId
==
pRight
->
dataBlockId
)
{
nodesListStrictAppend
(
pJoin
->
pOnLeft
,
nodesCloneNode
(
pOp
->
pLeft
));
nodesListStrictAppend
(
pJoin
->
pOnRight
,
nodesCloneNode
(
pOp
->
pRight
));
}
else
if
(
rBlkId
==
pLeft
->
dataBlockId
&&
lBlkId
==
pRight
->
dataBlockId
)
{
nodesListStrictAppend
(
pJoin
->
pOnLeft
,
nodesCloneNode
(
pOp
->
pRight
));
nodesListStrictAppend
(
pJoin
->
pOnRight
,
nodesCloneNode
(
pOp
->
pLeft
));
}
else
{
planError
(
"Invalid join equal cond, lbid:%d, rbid:%d, oplid:%d, oprid:%d"
,
lBlkId
,
rBlkId
,
pLeft
->
dataBlockId
,
pRight
->
dataBlockId
);
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
return
TSDB_CODE_SUCCESS
;
}
planError
(
"Invalid join equal node type:%d"
,
nodeType
(
pEq
));
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
static
int32_t
extractHashJoinOnCols
(
int16_t
lBlkId
,
int16_t
rBlkId
,
SNode
*
pEq
,
SHashJoinPhysiNode
*
pJoin
)
{
if
(
NULL
==
pEq
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
QUERY_NODE_OPERATOR
==
nodeType
(
pEq
))
{
code
=
extractHashJoinOpCols
(
lBlkId
,
rBlkId
,
pEq
,
pJoin
);
}
else
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pEq
))
{
SLogicConditionNode
*
pLogic
=
(
SLogicConditionNode
*
)
pEq
;
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pLogic
->
pParameterList
)
{
code
=
extractHashJoinOpCols
(
lBlkId
,
rBlkId
,
pNode
,
pJoin
);
if
(
code
)
{
break
;
}
}
}
else
{
planError
(
"Invalid join equal node type:%d"
,
nodeType
(
pEq
));
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
return
code
;
}
static
int32_t
createHashJoinColList
(
int16_t
lBlkId
,
int16_t
rBlkId
,
SNode
*
pEq1
,
SNode
*
pEq2
,
SNode
*
pEq3
,
SHashJoinPhysiNode
*
pJoin
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
pJoin
->
pOnLeft
=
nodesMakeList
();
pJoin
->
pOnRight
=
nodesMakeList
();
if
(
NULL
==
pJoin
->
pOnLeft
||
NULL
==
pJoin
->
pOnRight
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
extractHashJoinOnCols
(
lBlkId
,
rBlkId
,
pEq1
,
pJoin
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
extractHashJoinOnCols
(
lBlkId
,
rBlkId
,
pEq2
,
pJoin
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
extractHashJoinOnCols
(
lBlkId
,
rBlkId
,
pEq3
,
pJoin
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
pJoin
->
pOnLeft
->
length
<=
0
)
{
planError
(
"Invalid join equal column num: %d"
,
pJoin
->
pOnLeft
->
length
);
code
=
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
return
code
;
}
static
int32_t
sortHashJoinTargets
(
int16_t
lBlkId
,
int16_t
rBlkId
,
SHashJoinPhysiNode
*
pJoin
)
{
SNode
*
pNode
=
NULL
;
char
name
[
TSDB_TABLE_NAME_LEN
+
TSDB_COL_NAME_LEN
];
SSHashObj
*
pHash
=
tSimpleHashInit
(
pJoin
->
pTargets
->
length
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
));
if
(
NULL
==
pHash
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SNodeList
*
pNew
=
nodesMakeList
();
FOREACH
(
pNode
,
pJoin
->
pTargets
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
int32_t
len
=
getSlotKey
(
pNode
,
NULL
,
name
);
tSimpleHashPut
(
pHash
,
name
,
len
,
&
pCol
,
POINTER_BYTES
);
}
nodesClearList
(
pJoin
->
pTargets
);
pJoin
->
pTargets
=
pNew
;
FOREACH
(
pNode
,
pJoin
->
pOnLeft
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
int32_t
len
=
getSlotKey
(
pNode
,
NULL
,
name
);
SNode
**
p
=
tSimpleHashGet
(
pHash
,
name
,
len
);
if
(
p
)
{
nodesListStrictAppend
(
pJoin
->
pTargets
,
*
p
);
tSimpleHashRemove
(
pHash
,
name
,
len
);
}
}
FOREACH
(
pNode
,
pJoin
->
pOnRight
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
int32_t
len
=
getSlotKey
(
pNode
,
NULL
,
name
);
SNode
**
p
=
tSimpleHashGet
(
pHash
,
name
,
len
);
if
(
p
)
{
nodesListStrictAppend
(
pJoin
->
pTargets
,
*
p
);
tSimpleHashRemove
(
pHash
,
name
,
len
);
}
}
if
(
tSimpleHashGetSize
(
pHash
)
>
0
)
{
SNode
**
p
=
NULL
;
int32_t
iter
=
0
;
while
(
1
)
{
p
=
tSimpleHashIterate
(
pHash
,
p
,
&
iter
);
if
(
p
==
NULL
)
{
break
;
}
nodesListStrictAppend
(
pJoin
->
pTargets
,
*
p
);
}
}
tSimpleHashCleanup
(
pHash
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
createHashJoinPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SJoinLogicNode
*
pJoinLogicNode
,
SPhysiNode
**
pPhyNode
)
{
...
...
@@ -778,7 +922,6 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
SNode
*
pPrimKeyCond
=
NULL
;
SNode
*
pColEqCond
=
NULL
;
SNode
*
pTagEqCond
=
NULL
;
SNode
*
pTagOnCond
=
NULL
;
code
=
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pPrimKeyEqCond
,
&
pPrimKeyCond
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pColEqCond
,
&
pColEqCond
);
...
...
@@ -786,21 +929,29 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pTagEqCond
,
&
pTagEqCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pJoinLogicNode
->
p
Tag
OnCond
)
{
code
=
setNodeSlotId
(
pCxt
,
((
SPhysiNode
*
)
pJoin
)
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pJoinLogicNode
->
p
TagOnCond
,
&
pTagOnCond
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pJoinLogicNode
->
p
Other
OnCond
)
{
code
=
setNodeSlotId
(
pCxt
,
((
SPhysiNode
*
)
pJoin
)
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pJoinLogicNode
->
p
OtherOnCond
,
&
pJoin
->
pFilterConditions
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setListSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
node
.
pTargets
,
&
pJoin
->
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pJoinLogicNode
,
(
SPhysiNode
*
)
pJoin
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createHashJoinColList
(
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pPrimKeyCond
,
pColEqCond
,
pTagEqCond
,
pJoin
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
sortHashJoinTargets
(
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoin
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlockSlots
(
pCxt
,
pJoin
->
pTargets
,
pJoin
->
node
.
pOutputDataBlockDesc
);
}
nodesDestroyNode
(
pPrimKeyCond
);
nodesDestroyNode
(
pColEqCond
);
nodesDestroyNode
(
pTagEqCond
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pJoin
;
}
else
{
...
...
@@ -825,6 +976,44 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
return
TSDB_CODE_FAILED
;
}
static
int32_t
createGroupCachePhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SGroupCacheLogicNode
*
pLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SGroupCachePhysiNode
*
pGrpCache
=
(
SGroupCachePhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE
);
if
(
NULL
==
pGrpCache
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SDataBlockDescNode
*
pChildDesc
=
((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
pOutputDataBlockDesc
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setListSlotId
(
pCxt
,
pChildDesc
->
dataBlockId
,
-
1
,
pLogicNode
->
pGroupCols
,
&
pGrpCache
->
pGroupCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pGrpCache
;
}
else
{
nodesDestroyNode
((
SNode
*
)
pGrpCache
);
}
return
code
;
}
static
int32_t
createDynQueryCtrlPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SDynQueryCtrlLogicNode
*
pLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SDynQueryCtrlPhysiNode
*
pDynCtrl
=
(
SDynQueryCtrlPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
);
if
(
NULL
==
pDynCtrl
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pDynCtrl
->
qType
=
pLogicNode
->
qType
;
*
pPhyNode
=
(
SPhysiNode
*
)
pDynCtrl
;
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SRewritePrecalcExprsCxt
{
int32_t
errCode
;
int32_t
planNodeId
;
...
...
@@ -1737,6 +1926,10 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
return
createInterpFuncPhysiNode
(
pCxt
,
pChildren
,
(
SInterpFuncLogicNode
*
)
pLogicNode
,
pPhyNode
);
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
return
createMergePhysiNode
(
pCxt
,
(
SMergeLogicNode
*
)
pLogicNode
,
pPhyNode
);
case
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE
:
return
createGroupCachePhysiNode
(
pCxt
,
pChildren
,
(
SGroupCacheLogicNode
*
)
pLogicNode
,
pPhyNode
);
case
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL
:
return
createDynQueryCtrlPhysiNode
(
pCxt
,
pChildren
,
(
SDynQueryCtrlLogicNode
*
)
pLogicNode
,
pPhyNode
);
default:
break
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录