Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a08a6e57
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a08a6e57
编写于
7月 05, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/mnode
上级
25c5a248
6c853fc5
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
203 addition
and
22 deletion
+203
-22
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+2
-0
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-0
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+2
-3
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+1
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+43
-10
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+4
-0
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+1
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+11
-0
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+3
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+34
-0
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+83
-3
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+17
-4
未找到文件。
include/libs/nodes/plannodes.h
浏览文件 @
a08a6e57
...
...
@@ -82,6 +82,7 @@ typedef struct SScanLogicNode {
typedef
struct
SJoinLogicNode
{
SLogicNode
node
;
EJoinType
joinType
;
SNode
*
pMergeCondition
;
SNode
*
pOnConditions
;
bool
isSingleTableJoin
;
}
SJoinLogicNode
;
...
...
@@ -329,6 +330,7 @@ typedef struct SInterpFuncPhysiNode {
typedef
struct
SJoinPhysiNode
{
SPhysiNode
node
;
EJoinType
joinType
;
SNode
*
pMergeCondition
;
SNode
*
pOnConditions
;
SNodeList
*
pTargets
;
}
SJoinPhysiNode
;
...
...
include/libs/nodes/querynodes.h
浏览文件 @
a08a6e57
...
...
@@ -376,6 +376,7 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit
typedef
enum
ECollectColType
{
COLLECT_COL_TYPE_COL
=
1
,
COLLECT_COL_TYPE_TAG
,
COLLECT_COL_TYPE_ALL
}
ECollectColType
;
int32_t
nodesCollectColumns
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
const
char
*
pTableAlias
,
ECollectColType
type
,
SNodeList
**
pCols
);
int32_t
nodesCollectColumnsFromNode
(
SNode
*
node
,
const
char
*
pTableAlias
,
ECollectColType
type
,
SNodeList
**
pCols
);
typedef
bool
(
*
FFuncClassifier
)(
int32_t
funcId
);
int32_t
nodesCollectFuncs
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
FFuncClassifier
classifier
,
SNodeList
**
pFuncs
);
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
a08a6e57
...
...
@@ -274,15 +274,14 @@ static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
SDnodeEp
dnodeEp
=
{
0
};
dnodeEp
.
id
=
pDnode
->
id
;
dnodeEp
.
isMnode
=
0
;
dnodeEp
.
ep
.
port
=
pDnode
->
port
;
memcpy
(
dnodeEp
.
ep
.
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
sdbRelease
(
pSdb
,
pDnode
);
dnodeEp
.
isMnode
=
0
;
if
(
mndIsMnode
(
pMnode
,
pDnode
->
id
))
{
dnodeEp
.
isMnode
=
1
;
}
sdbRelease
(
pSdb
,
pDnode
);
taosArrayPush
(
pDnodeEps
,
&
dnodeEp
);
}
}
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
a08a6e57
...
...
@@ -131,7 +131,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
hashType
=
TSDB_DATA_TYPE_BINARY
;
}
SHashObj
*
hash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
hashType
),
true
,
HASH_
NO
_LOCK
);
SHashObj
*
hash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
hashType
),
true
,
HASH_
ENTRY
_LOCK
);
if
(
hash
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
a08a6e57
...
...
@@ -680,7 +680,7 @@ typedef struct SJoinOperatorInfo {
SSDataBlock
*
pRight
;
int32_t
rightPos
;
SColumnInfo
rightCol
;
SNode
*
p
OnCondition
;
SNode
*
p
CondAfterMerge
;
}
SJoinOperatorInfo
;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
a08a6e57
...
...
@@ -53,13 +53,28 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
SNode
*
p
OnCondition
=
pJoinNode
->
pOnConditions
;
if
(
nodeType
(
p
On
Condition
)
==
QUERY_NODE_OPERATOR
)
{
SOperatorNode
*
pNode
=
(
SOperatorNode
*
)
p
On
Condition
;
SNode
*
p
MergeCondition
=
pJoinNode
->
pMergeCondition
;
if
(
nodeType
(
p
Merge
Condition
)
==
QUERY_NODE_OPERATOR
)
{
SOperatorNode
*
pNode
=
(
SOperatorNode
*
)
p
Merge
Condition
;
setJoinColumnInfo
(
&
pInfo
->
leftCol
,
(
SColumnNode
*
)
pNode
->
pLeft
);
setJoinColumnInfo
(
&
pInfo
->
rightCol
,
(
SColumnNode
*
)
pNode
->
pRight
);
}
else
if
(
nodeType
(
pOnCondition
)
==
QUERY_NODE_LOGIC_CONDITION
)
{
extractTimeCondition
(
pInfo
,
(
SLogicConditionNode
*
)
pOnCondition
);
}
else
{
ASSERT
(
false
);
}
if
(
pJoinNode
->
pOnConditions
!=
NULL
&&
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
pInfo
->
pCondAfterMerge
=
nodesMakeNode
(
QUERY_NODE_LOGIC_CONDITION
);
SLogicConditionNode
*
pLogicCond
=
(
SLogicConditionNode
*
)(
pInfo
->
pCondAfterMerge
);
pLogicCond
->
pParameterList
=
nodesMakeList
();
nodesListMakeAppend
(
&
pLogicCond
->
pParameterList
,
nodesCloneNode
(
pJoinNode
->
pOnConditions
));
nodesListMakeAppend
(
&
pLogicCond
->
pParameterList
,
nodesCloneNode
(
pJoinNode
->
node
.
pConditions
));
pLogicCond
->
condType
=
LOGIC_COND_TYPE_AND
;
}
else
if
(
pJoinNode
->
pOnConditions
!=
NULL
)
{
pInfo
->
pCondAfterMerge
=
nodesCloneNode
(
pJoinNode
->
pOnConditions
);
}
else
if
(
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
pInfo
->
pCondAfterMerge
=
nodesCloneNode
(
pJoinNode
->
node
.
pConditions
);
}
else
{
pInfo
->
pCondAfterMerge
=
NULL
;
}
pOperator
->
fpSet
=
...
...
@@ -88,15 +103,12 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
void
destroyMergeJoinOperator
(
void
*
param
,
int32_t
numOfOutput
)
{
SJoinOperatorInfo
*
pJoinOperator
=
(
SJoinOperatorInfo
*
)
param
;
nodesDestroyNode
(
pJoinOperator
->
pCondAfterMerge
);
}
SSDataBlock
*
doMergeJoin
(
struct
SOperatorInfo
*
pOperator
)
{
static
void
doMergeJoinImpl
(
struct
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pRes
)
{
SJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pJoinInfo
->
pRes
;
blockDataCleanup
(
pRes
);
blockDataEnsureCapacity
(
pRes
,
4096
);
int32_t
nrows
=
0
;
while
(
1
)
{
...
...
@@ -181,7 +193,28 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
break
;
}
}
}
SSDataBlock
*
doMergeJoin
(
struct
SOperatorInfo
*
pOperator
)
{
SJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pJoinInfo
->
pRes
;
blockDataCleanup
(
pRes
);
blockDataEnsureCapacity
(
pRes
,
4096
);
while
(
true
)
{
int32_t
numOfRowsBefore
=
pRes
->
info
.
rows
;
doMergeJoinImpl
(
pOperator
,
pRes
);
int32_t
numOfNewRows
=
pRes
->
info
.
rows
-
numOfRowsBefore
;
if
(
numOfNewRows
==
0
)
{
break
;
}
if
(
pJoinInfo
->
pCondAfterMerge
!=
NULL
)
{
doFilter
(
pJoinInfo
->
pCondAfterMerge
,
pRes
);
}
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
}
}
return
(
pRes
->
info
.
rows
>
0
)
?
pRes
:
NULL
;
}
...
...
source/libs/function/src/tudf.c
浏览文件 @
a08a6e57
...
...
@@ -1565,6 +1565,10 @@ void constructUdfService(void *argsThread) {
//TODO return value of uv_run
uv_run
(
&
udfc
->
uvLoop
,
UV_RUN_DEFAULT
);
uv_loop_close
(
&
udfc
->
uvLoop
);
uv_walk
(
&
udfc
->
uvLoop
,
udfUdfdCloseWalkCb
,
NULL
);
uv_run
(
&
udfc
->
uvLoop
,
UV_RUN_DEFAULT
);
uv_loop_close
(
&
udfc
->
uvLoop
);
}
int32_t
udfcOpen
()
{
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
a08a6e57
...
...
@@ -368,6 +368,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
static
int32_t
logicJoinCopy
(
const
SJoinLogicNode
*
pSrc
,
SJoinLogicNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
COPY_SCALAR_FIELD
(
joinType
);
CLONE_NODE_FIELD
(
pMergeCondition
);
CLONE_NODE_FIELD
(
pOnConditions
);
COPY_SCALAR_FIELD
(
isSingleTableJoin
);
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
a08a6e57
...
...
@@ -1254,6 +1254,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) {
static
const
char
*
jkJoinLogicPlanJoinType
=
"JoinType"
;
static
const
char
*
jkJoinLogicPlanOnConditions
=
"OnConditions"
;
static
const
char
*
jkJoinLogicPlanMergeCondition
=
"MergeConditions"
;
static
int32_t
logicJoinNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SJoinLogicNode
*
pNode
=
(
const
SJoinLogicNode
*
)
pObj
;
...
...
@@ -1262,6 +1263,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinLogicPlanJoinType
,
pNode
->
joinType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinLogicPlanMergeCondition
,
nodeToJson
,
pNode
->
pMergeCondition
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinLogicPlanOnConditions
,
nodeToJson
,
pNode
->
pOnConditions
);
}
...
...
@@ -1617,6 +1621,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
}
static
const
char
*
jkJoinPhysiPlanJoinType
=
"JoinType"
;
static
const
char
*
jkJoinPhysiPlanMergeCondition
=
"MergeCondition"
;
static
const
char
*
jkJoinPhysiPlanOnConditions
=
"OnConditions"
;
static
const
char
*
jkJoinPhysiPlanTargets
=
"Targets"
;
...
...
@@ -1627,6 +1632,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinPhysiPlanJoinType
,
pNode
->
joinType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinPhysiPlanMergeCondition
,
nodeToJson
,
pNode
->
pMergeCondition
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinPhysiPlanOnConditions
,
nodeToJson
,
pNode
->
pOnConditions
);
}
...
...
@@ -1648,6 +1656,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkJoinPhysiPlanOnConditions
,
&
pNode
->
pOnConditions
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkJoinPhysiPlanMergeCondition
,
&
pNode
->
pMergeCondition
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkJoinPhysiPlanTargets
,
&
pNode
->
pTargets
);
}
...
...
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
a08a6e57
...
...
@@ -470,6 +470,9 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
{
SJoinPhysiNode
*
pJoin
=
(
SJoinPhysiNode
*
)
pNode
;
res
=
walkPhysiNode
((
SPhysiNode
*
)
pNode
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkPhysiPlan
(
pJoin
->
pMergeCondition
,
order
,
walker
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkPhysiPlan
(
pJoin
->
pOnConditions
,
order
,
walker
,
pContext
);
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
a08a6e57
...
...
@@ -718,6 +718,7 @@ void nodesDestroyNode(SNode* pNode) {
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
{
SJoinLogicNode
*
pLogicNode
=
(
SJoinLogicNode
*
)
pNode
;
destroyLogicNode
((
SLogicNode
*
)
pLogicNode
);
nodesDestroyNode
(
pLogicNode
->
pMergeCondition
);
nodesDestroyNode
(
pLogicNode
->
pOnConditions
);
break
;
}
...
...
@@ -828,6 +829,7 @@ void nodesDestroyNode(SNode* pNode) {
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
{
SJoinPhysiNode
*
pPhyNode
=
(
SJoinPhysiNode
*
)
pNode
;
destroyPhysiNode
((
SPhysiNode
*
)
pPhyNode
);
nodesDestroyNode
(
pPhyNode
->
pMergeCondition
);
nodesDestroyNode
(
pPhyNode
->
pOnConditions
);
nodesDestroyList
(
pPhyNode
->
pTargets
);
break
;
...
...
@@ -1493,6 +1495,38 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
return
TSDB_CODE_SUCCESS
;
}
int32_t
nodesCollectColumnsFromNode
(
SNode
*
node
,
const
char
*
pTableAlias
,
ECollectColType
type
,
SNodeList
**
pCols
)
{
if
(
NULL
==
pCols
)
{
return
TSDB_CODE_FAILED
;
}
SCollectColumnsCxt
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
pTableAlias
=
pTableAlias
,
.
collectType
=
type
,
.
pCols
=
(
NULL
==
*
pCols
?
nodesMakeList
()
:
*
pCols
),
.
pColHash
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
)};
if
(
NULL
==
cxt
.
pCols
||
NULL
==
cxt
.
pColHash
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pCols
=
NULL
;
nodesWalkExpr
(
node
,
collectColumns
,
&
cxt
);
taosHashCleanup
(
cxt
.
pColHash
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesDestroyList
(
cxt
.
pCols
);
return
cxt
.
errCode
;
}
if
(
LIST_LENGTH
(
cxt
.
pCols
)
>
0
)
{
*
pCols
=
cxt
.
pCols
;
}
else
{
nodesDestroyList
(
cxt
.
pCols
);
}
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SCollectFuncsCxt
{
int32_t
errCode
;
FFuncClassifier
classifier
;
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
a08a6e57
...
...
@@ -480,12 +480,18 @@ static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProject
return
pushDownCondOptAppendCond
(
&
pProject
->
node
.
pConditions
,
pCond
);
}
static
int32_t
pushDownCondOptPushCondToJoin
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
,
SNode
**
pCond
)
{
return
pushDownCondOptAppendCond
(
&
pJoin
->
node
.
pConditions
,
pCond
);
}
static
int32_t
pushDownCondOptPushCondToChild
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pChild
,
SNode
**
pCond
)
{
switch
(
nodeType
(
pChild
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
pushDownCondOptPushCondToScan
(
pCxt
,
(
SScanLogicNode
*
)
pChild
,
pCond
);
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
pushDownCondOptPushCondToProject
(
pCxt
,
(
SProjectLogicNode
*
)
pChild
,
pCond
);
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
return
pushDownCondOptPushCondToJoin
(
pCxt
,
(
SJoinLogicNode
*
)
pChild
,
pCond
);
default:
break
;
}
...
...
@@ -554,13 +560,83 @@ static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogic
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
pushDownCondOptPartJoinOnCondLogicCond
(
SJoinLogicNode
*
pJoin
,
SNode
**
ppMergeCond
,
SNode
**
ppOnCond
)
{
SLogicConditionNode
*
pLogicCond
=
(
SLogicConditionNode
*
)(
pJoin
->
pOnConditions
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNodeList
*
pOnConds
=
NULL
;
SNode
*
pCond
=
NULL
;
FOREACH
(
pCond
,
pLogicCond
->
pParameterList
)
{
if
(
pushDownCondOptIsPriKeyEqualCond
(
pJoin
,
pCond
))
{
*
ppMergeCond
=
nodesCloneNode
(
pCond
);
}
else
{
code
=
nodesListMakeAppend
(
&
pOnConds
,
nodesCloneNode
(
pCond
));
}
}
SNode
*
pTempOnCond
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesMergeConds
(
&
pTempOnCond
,
&
pOnConds
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
*
ppMergeCond
)
{
*
ppOnCond
=
pTempOnCond
;
nodesDestroyNode
(
pJoin
->
pOnConditions
);
pJoin
->
pOnConditions
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
else
{
nodesDestroyList
(
pOnConds
);
nodesDestroyNode
(
pTempOnCond
);
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
}
static
int32_t
pushDownCondOptPartJoinOnCond
(
SJoinLogicNode
*
pJoin
,
SNode
**
ppMergeCond
,
SNode
**
ppOnCond
)
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pJoin
->
pOnConditions
)
&&
LOGIC_COND_TYPE_AND
==
((
SLogicConditionNode
*
)(
pJoin
->
pOnConditions
))
->
condType
)
{
return
pushDownCondOptPartJoinOnCondLogicCond
(
pJoin
,
ppMergeCond
,
ppOnCond
);
}
if
(
pushDownCondOptIsPriKeyEqualCond
(
pJoin
,
pJoin
->
pOnConditions
))
{
*
ppMergeCond
=
nodesCloneNode
(
pJoin
->
pOnConditions
);
*
ppOnCond
=
NULL
;
nodesDestroyNode
(
pJoin
->
pOnConditions
);
pJoin
->
pOnConditions
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
else
{
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
}
static
int32_t
pushDownCondOptJoinExtractMergeCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
int32_t
code
=
pushDownCondOptCheckJoinOnCond
(
pCxt
,
pJoin
);
SNode
*
pJoinMergeCond
=
NULL
;
SNode
*
pJoinOnCond
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
pushDownCondOptPartJoinOnCond
(
pJoin
,
&
pJoinMergeCond
,
&
pJoinOnCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pJoin
->
pMergeCondition
=
pJoinMergeCond
;
pJoin
->
pOnConditions
=
pJoinOnCond
;
}
else
{
nodesDestroyNode
(
pJoinMergeCond
);
nodesDestroyNode
(
pJoinOnCond
);
}
return
code
;
}
static
int32_t
pushDownCondOptDealJoin
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
if
(
OPTIMIZE_FLAG_TEST_MASK
(
pJoin
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_PUSH_DOWN_CONDE
))
{
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
pJoin
->
node
.
pConditions
)
{
return
pushDownCondOptCheckJoinOnCond
(
pCxt
,
pJoin
);
int32_t
code
=
pushDownCondOptJoinExtractMergeCond
(
pCxt
,
pJoin
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
OPTIMIZE_FLAG_SET_MASK
(
pJoin
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_PUSH_DOWN_CONDE
);
pCxt
->
optimized
=
true
;
}
return
code
;
}
SNode
*
pOnCond
=
NULL
;
...
...
@@ -579,10 +655,13 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
pushDownCondOptPushCondToChild
(
pCxt
,
(
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
),
&
pRightChildCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
pushDownCondOptJoinExtractMergeCond
(
pCxt
,
pJoin
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
OPTIMIZE_FLAG_SET_MASK
(
pJoin
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_PUSH_DOWN_CONDE
);
pCxt
->
optimized
=
true
;
code
=
pushDownCondOptCheckJoinOnCond
(
pCxt
,
pJoin
);
}
else
{
nodesDestroyNode
(
pOnCond
);
nodesDestroyNode
(
pLeftChildCond
);
...
...
@@ -720,7 +799,8 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
// TODO: remove it after full implementation of pushing down to child
if
(
1
!=
LIST_LENGTH
(
pAgg
->
node
.
pChildren
)
||
QUERY_NODE_LOGIC_PLAN_SCAN
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
))
&&
QUERY_NODE_LOGIC_PLAN_PROJECT
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
)))
{
QUERY_NODE_LOGIC_PLAN_PROJECT
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
))
&&
QUERY_NODE_LOGIC_PLAN_JOIN
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
)))
{
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
a08a6e57
...
...
@@ -612,10 +612,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
int32_t
code
=
TSDB_CODE_SUCCESS
;
pJoin
->
joinType
=
pJoinLogicNode
->
joinType
;
if
(
NULL
!=
pJoinLogicNode
->
pOnConditions
)
{
code
=
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pOnConditions
,
&
pJoin
->
pOnConditions
);
}
setNodeSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
pMergeCondition
,
&
pJoin
->
pMergeCondition
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setListSlotId
(
pCxt
,
pLeftDesc
->
dataBlockId
,
pRightDesc
->
dataBlockId
,
pJoinLogicNode
->
node
.
pTargets
,
&
pJoin
->
pTargets
);
...
...
@@ -623,6 +621,21 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlockSlots
(
pCxt
,
pJoin
->
pTargets
,
pJoin
->
node
.
pOutputDataBlockDesc
);
}
SNodeList
*
condCols
=
nodesMakeList
();
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pJoinLogicNode
->
pOnConditions
)
{
code
=
nodesCollectColumnsFromNode
(
pJoinLogicNode
->
pOnConditions
,
NULL
,
COLLECT_COL_TYPE_ALL
,
&
condCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlockSlots
(
pCxt
,
condCols
,
pJoin
->
node
.
pOutputDataBlockDesc
);
nodesDestroyList
(
condCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pJoinLogicNode
->
pOnConditions
)
{
code
=
setNodeSlotId
(
pCxt
,
((
SPhysiNode
*
)
pJoin
)
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pJoinLogicNode
->
pOnConditions
,
&
pJoin
->
pOnConditions
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pJoinLogicNode
,
(
SPhysiNode
*
)
pJoin
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录