Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a380e0cd
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a380e0cd
编写于
7月 04, 2022
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: merge condition, on condition and other conditions
上级
dbbff4ea
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
93 addition
and
15 deletion
+93
-15
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-1
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+16
-6
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+1
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+11
-0
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+3
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+34
-0
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+8
-4
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+17
-4
未找到文件。
include/libs/nodes/querynodes.h
浏览文件 @
a380e0cd
...
...
@@ -375,6 +375,7 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit
typedef
enum
ECollectColType
{
COLLECT_COL_TYPE_COL
=
1
,
COLLECT_COL_TYPE_TAG
,
COLLECT_COL_TYPE_ALL
}
ECollectColType
;
int32_t
nodesCollectColumns
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
const
char
*
pTableAlias
,
ECollectColType
type
,
SNodeList
**
pCols
);
int32_t
nodesCollectColumnsFromNode
(
SNode
*
node
,
const
char
*
pTableAlias
,
ECollectColType
type
,
SNodeList
**
pCols
);
typedef
bool
(
*
FFuncClassifier
)(
int32_t
funcId
);
int32_t
nodesCollectFuncs
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
FFuncClassifier
classifier
,
SNodeList
**
pFuncs
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
a380e0cd
...
...
@@ -678,7 +678,8 @@ typedef struct SJoinOperatorInfo {
SSDataBlock
*
pRight
;
int32_t
rightPos
;
SColumnInfo
rightCol
;
SNode
*
pOnCondition
;
SNode
*
pOnConditions
;
SNode
*
pOtherConditions
;
}
SJoinOperatorInfo
;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
a380e0cd
...
...
@@ -53,13 +53,20 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
SNode
*
p
OnCondition
=
pJoinNode
->
pOnConditions
;
if
(
nodeType
(
p
On
Condition
)
==
QUERY_NODE_OPERATOR
)
{
SOperatorNode
*
pNode
=
(
SOperatorNode
*
)
p
On
Condition
;
SNode
*
p
MergeCondition
=
pJoinNode
->
pMergeCondition
;
if
(
nodeType
(
p
Merge
Condition
)
==
QUERY_NODE_OPERATOR
)
{
SOperatorNode
*
pNode
=
(
SOperatorNode
*
)
p
Merge
Condition
;
setJoinColumnInfo
(
&
pInfo
->
leftCol
,
(
SColumnNode
*
)
pNode
->
pLeft
);
setJoinColumnInfo
(
&
pInfo
->
rightCol
,
(
SColumnNode
*
)
pNode
->
pRight
);
}
else
if
(
nodeType
(
pOnCondition
)
==
QUERY_NODE_LOGIC_CONDITION
)
{
extractTimeCondition
(
pInfo
,
(
SLogicConditionNode
*
)
pOnCondition
);
}
else
{
ASSERT
(
false
);
}
//TODO: merge these two conditions
ASSERT
(
pJoinNode
->
pOnConditions
);
pInfo
->
pOnConditions
=
nodesCloneNode
(
pJoinNode
->
pOnConditions
);
if
(
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
pInfo
->
pOtherConditions
=
pJoinNode
->
node
.
pConditions
;
}
pOperator
->
fpSet
=
...
...
@@ -88,6 +95,8 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
void
destroyMergeJoinOperator
(
void
*
param
,
int32_t
numOfOutput
)
{
SJoinOperatorInfo
*
pJoinOperator
=
(
SJoinOperatorInfo
*
)
param
;
nodesDestroyNode
(
pJoinOperator
->
pOnConditions
);
nodesDestroyNode
(
pJoinOperator
->
pOtherConditions
);
}
static
void
doMergeJoinImpl
(
struct
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pRes
)
{
...
...
@@ -192,7 +201,8 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
if
(
numOfNewRows
==
0
)
{
break
;
}
doFilter
(
pJoinInfo
->
pOnCondition
,
pRes
);
doFilter
(
pJoinInfo
->
pOnConditions
,
pRes
);
doFilter
(
pJoinInfo
->
pOtherConditions
,
pRes
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
}
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
a380e0cd
...
...
@@ -359,6 +359,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
static
int32_t
logicJoinCopy
(
const
SJoinLogicNode
*
pSrc
,
SJoinLogicNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
COPY_SCALAR_FIELD
(
joinType
);
CLONE_NODE_FIELD
(
pMergeCondition
);
CLONE_NODE_FIELD
(
pOnConditions
);
COPY_SCALAR_FIELD
(
isSingleTableJoin
);
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
a380e0cd
...
...
@@ -1253,6 +1253,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) {
static
const
char
*
jkJoinLogicPlanJoinType
=
"JoinType"
;
static
const
char
*
jkJoinLogicPlanOnConditions
=
"OnConditions"
;
static
const
char
*
jkJoinLogicPlanMergeCondition
=
"MergeConditions"
;
static
int32_t
logicJoinNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SJoinLogicNode
*
pNode
=
(
const
SJoinLogicNode
*
)
pObj
;
...
...
@@ -1261,6 +1262,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinLogicPlanJoinType
,
pNode
->
joinType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinLogicPlanMergeCondition
,
nodeToJson
,
pNode
->
pMergeCondition
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinLogicPlanOnConditions
,
nodeToJson
,
pNode
->
pOnConditions
);
}
...
...
@@ -1616,6 +1620,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
}
static
const
char
*
jkJoinPhysiPlanJoinType
=
"JoinType"
;
static
const
char
*
jkJoinPhysiPlanMergeCondition
=
"MergeCondition"
;
static
const
char
*
jkJoinPhysiPlanOnConditions
=
"OnConditions"
;
static
const
char
*
jkJoinPhysiPlanTargets
=
"Targets"
;
...
...
@@ -1626,6 +1631,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinPhysiPlanJoinType
,
pNode
->
joinType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinPhysiPlanMergeCondition
,
nodeToJson
,
pNode
->
pMergeCondition
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinPhysiPlanOnConditions
,
nodeToJson
,
pNode
->
pOnConditions
);
}
...
...
@@ -1647,6 +1655,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkJoinPhysiPlanOnConditions
,
&
pNode
->
pOnConditions
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkJoinPhysiPlanMergeCondition
,
&
pNode
->
pMergeCondition
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkJoinPhysiPlanTargets
,
&
pNode
->
pTargets
);
}
...
...
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
a380e0cd
...
...
@@ -470,6 +470,9 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
{
SJoinPhysiNode
*
pJoin
=
(
SJoinPhysiNode
*
)
pNode
;
res
=
walkPhysiNode
((
SPhysiNode
*
)
pNode
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkPhysiPlan
(
pJoin
->
pMergeCondition
,
order
,
walker
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkPhysiPlan
(
pJoin
->
pOnConditions
,
order
,
walker
,
pContext
);
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
a380e0cd
...
...
@@ -717,6 +717,7 @@ void nodesDestroyNode(SNode* pNode) {
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
{
SJoinLogicNode
*
pLogicNode
=
(
SJoinLogicNode
*
)
pNode
;
destroyLogicNode
((
SLogicNode
*
)
pLogicNode
);
nodesDestroyNode
(
pLogicNode
->
pMergeCondition
);
nodesDestroyNode
(
pLogicNode
->
pOnConditions
);
break
;
}
...
...
@@ -827,6 +828,7 @@ void nodesDestroyNode(SNode* pNode) {
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
{
SJoinPhysiNode
*
pPhyNode
=
(
SJoinPhysiNode
*
)
pNode
;
destroyPhysiNode
((
SPhysiNode
*
)
pPhyNode
);
nodesDestroyNode
(
pPhyNode
->
pMergeCondition
);
nodesDestroyNode
(
pPhyNode
->
pOnConditions
);
nodesDestroyList
(
pPhyNode
->
pTargets
);
break
;
...
...
@@ -1492,6 +1494,38 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
return
TSDB_CODE_SUCCESS
;
}
int32_t
nodesCollectColumnsFromNode
(
SNode
*
node
,
const
char
*
pTableAlias
,
ECollectColType
type
,
SNodeList
**
pCols
)
{
if
(
NULL
==
pCols
)
{
return
TSDB_CODE_FAILED
;
}
SCollectColumnsCxt
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
pTableAlias
=
pTableAlias
,
.
collectType
=
type
,
.
pCols
=
(
NULL
==
*
pCols
?
nodesMakeList
()
:
*
pCols
),
.
pColHash
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
)};
if
(
NULL
==
cxt
.
pCols
||
NULL
==
cxt
.
pColHash
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pCols
=
NULL
;
nodesWalkExpr
(
node
,
collectColumns
,
&
cxt
);
taosHashCleanup
(
cxt
.
pColHash
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesDestroyList
(
cxt
.
pCols
);
return
cxt
.
errCode
;
}
if
(
LIST_LENGTH
(
cxt
.
pCols
)
>
0
)
{
*
pCols
=
cxt
.
pCols
;
}
else
{
nodesDestroyList
(
cxt
.
pCols
);
}
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SCollectFuncsCxt
{
int32_t
errCode
;
FFuncClassifier
classifier
;
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
a380e0cd
...
...
@@ -540,11 +540,15 @@ static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode*
}
return
hasPrimaryKeyEqualCond
;
}
else
{
return
pushDownCondOptIsPriKeyEqualCond
(
pJoin
,
pCond
);
bool
isPriKeyEqualCond
=
pushDownCondOptIsPriKeyEqualCond
(
pJoin
,
pCond
);
if
(
isPriKeyEqualCond
)
{
pJoin
->
pMergeCondition
=
nodesCloneNode
(
pCond
);
}
return
isPriKeyEqualCond
;
}
}
static
int32_t
pushDownCondOpt
CheckJoinOn
Cond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
static
int32_t
pushDownCondOpt
ExtractJoinMerge
Cond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
if
(
NULL
==
pJoin
->
pOnConditions
)
{
return
generateUsageErrMsg
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN
);
}
...
...
@@ -560,7 +564,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
}
if
(
NULL
==
pJoin
->
node
.
pConditions
)
{
return
pushDownCondOpt
CheckJoinOn
Cond
(
pCxt
,
pJoin
);
return
pushDownCondOpt
ExtractJoinMerge
Cond
(
pCxt
,
pJoin
);
}
SNode
*
pOnCond
=
NULL
;
...
...
@@ -582,7 +586,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
if
(
TSDB_CODE_SUCCESS
==
code
)
{
OPTIMIZE_FLAG_SET_MASK
(
pJoin
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_PUSH_DOWN_CONDE
);
pCxt
->
optimized
=
true
;
code
=
pushDownCondOpt
CheckJoinOn
Cond
(
pCxt
,
pJoin
);
code
=
pushDownCondOpt
ExtractJoinMerge
Cond
(
pCxt
,
pJoin
);
}
else
{
nodesDestroyNode
(
pOnCond
);
nodesDestroyNode
(
pLeftChildCond
);
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
a380e0cd
...
...
@@ -609,10 +609,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
int32_t
code
=
TSDB_CODE_SUCCESS
;
pJoin
->
joinType
=
pJoinLogicNode
->
joinType
;
if
(
NULL
!=
pJoinLogicNode
->
pOnConditions
)
{
code
=
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pOnConditions
,
&
pJoin
->
pOnConditions
);
}
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pMergeCondition
,
&
pJoin
->
pMergeCondition
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setListSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
node
.
pTargets
,
&
pJoin
->
pTargets
);
...
...
@@ -620,6 +618,21 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlockSlots
(
pCxt
,
pJoin
->
pTargets
,
pJoin
->
node
.
pOutputDataBlockDesc
);
}
SNodeList
*
condCols
=
nodesMakeList
();
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pJoinLogicNode
->
pOnConditions
)
{
code
=
nodesCollectColumnsFromNode
(
pJoinLogicNode
->
pOnConditions
,
NULL
,
COLLECT_COL_TYPE_ALL
,
&
condCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlockSlots
(
pCxt
,
condCols
,
pJoin
->
node
.
pOutputDataBlockDesc
);
nodesDestroyList
(
condCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pJoinLogicNode
->
pOnConditions
)
{
code
=
setNodeSlotId
(
pCxt
,
((
SPhysiNode
*
)
pJoin
)
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pJoinLogicNode
->
pOnConditions
,
&
pJoin
->
pOnConditions
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pJoinLogicNode
,
(
SPhysiNode
*
)
pJoin
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录