Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
edd0295a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
edd0295a
编写于
6月 02, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: interval distributed split
上级
b445afb5
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
753 addition
and
141 deletion
+753
-141
include/libs/function/functionMgt.h
include/libs/function/functionMgt.h
+3
-0
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+2
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+14
-1
source/libs/function/inc/builtins.h
source/libs/function/inc/builtins.h
+14
-12
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+17
-16
source/libs/function/src/functionMgt.c
source/libs/function/src/functionMgt.c
+78
-0
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+10
-1
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+191
-5
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+4
-0
source/libs/planner/inc/planInt.h
source/libs/planner/inc/planInt.h
+1
-0
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+57
-57
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+62
-6
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+237
-34
source/libs/planner/src/planUtil.c
source/libs/planner/src/planUtil.c
+51
-0
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+12
-9
未找到文件。
include/libs/function/functionMgt.h
浏览文件 @
edd0295a
...
...
@@ -156,6 +156,9 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
bool
fmIsMultiResFunc
(
int32_t
funcId
);
bool
fmIsRepeatScanFunc
(
int32_t
funcId
);
bool
fmIsUserDefinedFunc
(
int32_t
funcId
);
bool
fmIsDistExecFunc
(
int32_t
funcId
);
int32_t
fmGetDistMethod
(
const
SFunctionNode
*
pFunc
,
SFunctionNode
**
pPartialFunc
,
SFunctionNode
**
pMergeFunc
);
typedef
enum
EFuncDataRequired
{
FUNC_DATA_REQUIRED_DATA_LOAD
=
1
,
...
...
include/libs/nodes/nodes.h
浏览文件 @
edd0295a
...
...
@@ -190,6 +190,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PROJECT
,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
,
QUERY_NODE_LOGIC_PLAN_EXCHANGE
,
QUERY_NODE_LOGIC_PLAN_MERGE
,
QUERY_NODE_LOGIC_PLAN_WINDOW
,
QUERY_NODE_LOGIC_PLAN_FILL
,
QUERY_NODE_LOGIC_PLAN_SORT
,
...
...
@@ -207,6 +208,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_JOIN
,
QUERY_NODE_PHYSICAL_PLAN_AGG
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
edd0295a
...
...
@@ -94,9 +94,15 @@ typedef struct SVnodeModifLogicNode {
typedef
struct
SExchangeLogicNode
{
SLogicNode
node
;
int32_t
srcGroupId
;
uint8_t
precision
;
}
SExchangeLogicNode
;
typedef
struct
SMergeLogicNode
{
SLogicNode
node
;
SNodeList
*
pMergeKeys
;
int32_t
numOfChannels
;
int32_t
srcGroupId
;
}
SMergeLogicNode
;
typedef
enum
EWindowType
{
WINDOW_TYPE_INTERVAL
=
1
,
WINDOW_TYPE_SESSION
,
WINDOW_TYPE_STATE
}
EWindowType
;
typedef
struct
SWindowLogicNode
{
...
...
@@ -265,6 +271,13 @@ typedef struct SExchangePhysiNode {
SNodeList
*
pSrcEndPoints
;
// element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
}
SExchangePhysiNode
;
typedef
struct
SMergePhysiNode
{
SPhysiNode
node
;
SNodeList
*
pMergeKeys
;
int32_t
numOfChannels
;
int32_t
srcGroupId
;
}
SMergePhysiNode
;
typedef
struct
SWinodwPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of parameter expression of function
...
...
source/libs/function/inc/builtins.h
浏览文件 @
edd0295a
...
...
@@ -26,22 +26,24 @@ typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t l
typedef
EFuncDataRequired
(
*
FFuncDataRequired
)(
SFunctionNode
*
pFunc
,
STimeWindow
*
pTimeWindow
);
typedef
struct
SBuiltinFuncDefinition
{
c
har
name
[
FUNCTION_NAME_MAX_LENGTH
]
;
EFunctionType
type
;
uint64_t
classification
;
FTranslateFunc
translateFunc
;
FFuncDataRequired
dataRequiredFunc
;
FExecGetEnv
getEnvFunc
;
FExecInit
initFunc
;
FExecProcess
processFunc
;
c
onst
char
*
name
;
EFunctionType
type
;
uint64_t
classification
;
FTranslateFunc
translateFunc
;
FFuncDataRequired
dataRequiredFunc
;
FExecGetEnv
getEnvFunc
;
FExecInit
initFunc
;
FExecProcess
processFunc
;
FScalarExecProcess
sprocessFunc
;
FExecFinalize
finalizeFunc
;
FExecProcess
invertFunc
;
FExecCombine
combineFunc
;
FExecFinalize
finalizeFunc
;
FExecProcess
invertFunc
;
FExecCombine
combineFunc
;
const
char
*
pPartialFunc
;
const
char
*
pMergeFunc
;
}
SBuiltinFuncDefinition
;
extern
const
SBuiltinFuncDefinition
funcMgtBuiltins
[];
extern
const
int
funcMgtBuiltinsNum
;
extern
const
int
funcMgtBuiltinsNum
;
#ifdef __cplusplus
}
...
...
source/libs/function/src/builtins.c
浏览文件 @
edd0295a
...
...
@@ -156,14 +156,14 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
//param0
//
param0
SNode
*
pParamNode0
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
if
(
nodeType
(
pParamNode0
)
!=
QUERY_NODE_COLUMN
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"The first parameter of PERCENTILE function can only be column"
);
}
//param1
//
param1
SValueNode
*
pValue
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
if
(
pValue
->
datum
.
i
<
0
||
pValue
->
datum
.
i
>
100
)
{
...
...
@@ -178,7 +178,7 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
//set result type
//
set result type
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_DOUBLE
].
bytes
,
.
type
=
TSDB_DATA_TYPE_DOUBLE
};
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -197,14 +197,14 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
//param0
//
param0
SNode
*
pParamNode0
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
if
(
nodeType
(
pParamNode0
)
!=
QUERY_NODE_COLUMN
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"The first parameter of APERCENTILE function can only be column"
);
}
//param1
//
param1
SNode
*
pParamNode1
=
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
if
(
nodeType
(
pParamNode1
)
!=
QUERY_NODE_VALUE
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
...
...
@@ -223,7 +223,7 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
//param2
//
param2
if
(
3
==
numOfParams
)
{
uint8_t
para3Type
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
2
))
->
resType
.
type
;
if
(
!
IS_VAR_DATA_TYPE
(
para3Type
))
{
...
...
@@ -263,14 +263,14 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
//param0
//
param0
SNode
*
pParamNode0
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
if
(
nodeType
(
pParamNode0
)
!=
QUERY_NODE_COLUMN
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"The first parameter of TOP/BOTTOM function can only be column"
);
}
//param1
//
param1
SNode
*
pParamNode1
=
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
if
(
nodeType
(
pParamNode1
)
!=
QUERY_NODE_VALUE
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
...
...
@@ -287,7 +287,7 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
pValue
->
notReserved
=
true
;
//set result type
//
set result type
SDataType
*
pType
=
&
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
;
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
pType
->
bytes
,
.
type
=
pType
->
type
};
return
TSDB_CODE_SUCCESS
;
...
...
@@ -659,8 +659,8 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if
(
pValue
->
datum
.
i
<
((
i
>
1
)
?
0
:
1
)
||
pValue
->
datum
.
i
>
100
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"TAIL function second parameter should be in range [1, 100], "
"third parameter should be in range [0, 100]"
);
"TAIL function second parameter should be in range [1, 100], "
"third parameter should be in range [0, 100]"
);
}
pValue
->
notReserved
=
true
;
...
...
@@ -721,7 +721,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
//param0
//
param0
SNode
*
pParamNode0
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
if
(
nodeType
(
pParamNode0
)
!=
QUERY_NODE_COLUMN
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
...
...
@@ -729,12 +729,11 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
uint8_t
colType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
.
type
;
if
(
!
IS_SIGNED_NUMERIC_TYPE
(
colType
)
&&
!
IS_FLOAT_TYPE
(
colType
)
&&
TSDB_DATA_TYPE_BOOL
!=
colType
)
{
if
(
!
IS_SIGNED_NUMERIC_TYPE
(
colType
)
&&
!
IS_FLOAT_TYPE
(
colType
)
&&
TSDB_DATA_TYPE_BOOL
!=
colType
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
//param1
//
param1
if
(
numOfParams
==
2
)
{
uint8_t
paraType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
))
->
resType
.
type
;
if
(
!
IS_INTEGER_TYPE
(
paraType
))
{
...
...
@@ -852,7 +851,7 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
if
(
3
==
numOfParams
)
{
SExprNode
*
p2
=
(
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
2
);
uint8_t
para2Type
=
p2
->
resType
.
type
;
uint8_t
para2Type
=
p2
->
resType
.
type
;
if
(
!
IS_INTEGER_TYPE
(
para2Type
))
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
...
...
@@ -993,6 +992,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
finalizeFunc
=
functionFinalize
,
.
invertFunc
=
countInvertFunction
,
.
combineFunc
=
combineFunction
,
// .pPartialFunc = "count",
// .pMergeFunc = "sum"
},
{
.
name
=
"sum"
,
...
...
source/libs/function/src/functionMgt.c
浏览文件 @
edd0295a
...
...
@@ -199,3 +199,81 @@ bool fmIsInvertible(int32_t funcId) {
}
return
res
;
}
static
SFunctionNode
*
createFunction
(
const
char
*
pName
,
SNodeList
*
pParameterList
)
{
SFunctionNode
*
pFunc
=
nodesMakeNode
(
QUERY_NODE_FUNCTION
);
if
(
NULL
==
pFunc
)
{
return
NULL
;
}
strcpy
(
pFunc
->
functionName
,
pName
);
pFunc
->
pParameterList
=
pParameterList
;
char
msg
[
64
]
=
{
0
};
if
(
TSDB_CODE_SUCCESS
!=
fmGetFuncInfo
(
pFunc
,
msg
,
sizeof
(
msg
)))
{
nodesDestroyNode
(
pFunc
);
return
NULL
;
}
return
pFunc
;
}
static
SColumnNode
*
createColumnByFunc
(
const
SFunctionNode
*
pFunc
)
{
SColumnNode
*
pCol
=
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
return
NULL
;
}
strcpy
(
pCol
->
colName
,
pFunc
->
node
.
aliasName
);
pCol
->
node
.
resType
=
pFunc
->
node
.
resType
;
return
pCol
;
}
bool
fmIsDistExecFunc
(
int32_t
funcId
)
{
if
(
!
fmIsVectorFunc
(
funcId
))
{
return
true
;
}
return
(
NULL
!=
funcMgtBuiltins
[
funcId
].
pPartialFunc
&&
NULL
!=
funcMgtBuiltins
[
funcId
].
pMergeFunc
);
}
static
int32_t
createPartialFunction
(
const
SFunctionNode
*
pSrcFunc
,
SFunctionNode
**
pPartialFunc
)
{
SNodeList
*
pParameterList
=
nodesCloneList
(
pSrcFunc
->
pParameterList
);
if
(
NULL
==
pParameterList
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pPartialFunc
=
createFunction
(
funcMgtBuiltins
[
pSrcFunc
->
funcId
].
pPartialFunc
,
pParameterList
);
if
(
NULL
==
*
pPartialFunc
)
{
nodesDestroyList
(
pParameterList
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
snprintf
((
*
pPartialFunc
)
->
node
.
aliasName
,
sizeof
((
*
pPartialFunc
)
->
node
.
aliasName
),
"%s.%p"
,
(
*
pPartialFunc
)
->
functionName
,
pSrcFunc
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
createMergeFunction
(
const
SFunctionNode
*
pSrcFunc
,
const
SFunctionNode
*
pPartialFunc
,
SFunctionNode
**
pMergeFunc
)
{
SNodeList
*
pParameterList
=
NULL
;
nodesListMakeStrictAppend
(
&
pParameterList
,
createColumnByFunc
(
pPartialFunc
));
*
pMergeFunc
=
createFunction
(
funcMgtBuiltins
[
pSrcFunc
->
funcId
].
pMergeFunc
,
pParameterList
);
if
(
NULL
==
*
pMergeFunc
)
{
nodesDestroyList
(
pParameterList
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
strcpy
((
*
pMergeFunc
)
->
node
.
aliasName
,
pSrcFunc
->
node
.
aliasName
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
fmGetDistMethod
(
const
SFunctionNode
*
pFunc
,
SFunctionNode
**
pPartialFunc
,
SFunctionNode
**
pMergeFunc
)
{
if
(
!
fmIsDistExecFunc
(
pFunc
->
funcId
))
{
return
TSDB_CODE_FAILED
;
}
int32_t
code
=
createPartialFunction
(
pFunc
,
pPartialFunc
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createMergeFunction
(
pFunc
,
*
pPartialFunc
,
pMergeFunc
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyNode
(
*
pPartialFunc
);
nodesDestroyNode
(
*
pMergeFunc
);
}
return
code
;
}
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
edd0295a
...
...
@@ -366,7 +366,14 @@ static SNode* logicVnodeModifCopy(const SVnodeModifLogicNode* pSrc, SVnodeModifL
static
SNode
*
logicExchangeCopy
(
const
SExchangeLogicNode
*
pSrc
,
SExchangeLogicNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
COPY_SCALAR_FIELD
(
srcGroupId
);
COPY_SCALAR_FIELD
(
precision
);
return
(
SNode
*
)
pDst
;
}
static
SNode
*
logicMergeCopy
(
const
SMergeLogicNode
*
pSrc
,
SMergeLogicNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
CLONE_NODE_LIST_FIELD
(
pMergeKeys
);
COPY_SCALAR_FIELD
(
numOfChannels
);
COPY_SCALAR_FIELD
(
srcGroupId
);
return
(
SNode
*
)
pDst
;
}
...
...
@@ -529,6 +536,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return
logicVnodeModifCopy
((
const
SVnodeModifLogicNode
*
)
pNode
,
(
SVnodeModifLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
logicExchangeCopy
((
const
SExchangeLogicNode
*
)
pNode
,
(
SExchangeLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
return
logicMergeCopy
((
const
SMergeLogicNode
*
)
pNode
,
(
SMergeLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
logicWindowCopy
((
const
SWindowLogicNode
*
)
pNode
,
(
SWindowLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_FILL
:
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
edd0295a
...
...
@@ -190,6 +190,8 @@ const char* nodesNodeName(ENodeType type) {
return
"LogicVnodeModif"
;
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
"LogicExchange"
;
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
return
"LogicMerge"
;
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
"LogicWindow"
;
case
QUERY_NODE_LOGIC_PLAN_FILL
:
...
...
@@ -220,6 +222,8 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiAgg"
;
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
return
"PhysiExchange"
;
case
QUERY_NODE_PHYSICAL_PLAN_MERGE
:
return
"PhysiMerge"
;
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
"PhysiSort"
;
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
...
...
@@ -596,7 +600,6 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
}
static
const
char
*
jkExchangeLogicPlanSrcGroupId
=
"SrcGroupId"
;
static
const
char
*
jkExchangeLogicPlanSrcPrecision
=
"Precision"
;
static
int32_t
logicExchangeNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SExchangeLogicNode
*
pNode
=
(
const
SExchangeLogicNode
*
)
pObj
;
...
...
@@ -605,9 +608,6 @@ static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkExchangeLogicPlanSrcGroupId
,
pNode
->
srcGroupId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkExchangeLogicPlanSrcPrecision
,
pNode
->
precision
);
}
return
code
;
}
...
...
@@ -619,8 +619,144 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkExchangeLogicPlanSrcGroupId
,
&
pNode
->
srcGroupId
);
}
return
code
;
}
static
const
char
*
jkMergeLogicPlanMergeKeys
=
"MergeKeys"
;
static
const
char
*
jkMergeLogicPlanNumOfChannels
=
"NumOfChannels"
;
static
const
char
*
jkMergeLogicPlanSrcGroupId
=
"SrcGroupId"
;
static
int32_t
logicMergeNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SMergeLogicNode
*
pNode
=
(
const
SMergeLogicNode
*
)
pObj
;
int32_t
code
=
logicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkMergeLogicPlanMergeKeys
,
pNode
->
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetUTinyIntValue
(
pJson
,
jkExchangeLogicPlanSrcPrecision
,
&
pNode
->
precision
);
code
=
tjsonAddIntegerToObject
(
pJson
,
jkMergeLogicPlanNumOfChannels
,
pNode
->
numOfChannels
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkMergeLogicPlanSrcGroupId
,
pNode
->
srcGroupId
);
}
return
code
;
}
static
int32_t
jsonToLogicMergeNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SMergeLogicNode
*
pNode
=
(
SMergeLogicNode
*
)
pObj
;
int32_t
code
=
jsonToLogicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkMergeLogicPlanMergeKeys
,
&
pNode
->
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkMergeLogicPlanNumOfChannels
,
&
pNode
->
numOfChannels
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkMergeLogicPlanSrcGroupId
,
&
pNode
->
srcGroupId
);
}
return
code
;
}
static
const
char
*
jkWindowLogicPlanWinType
=
"WinType"
;
static
const
char
*
jkWindowLogicPlanFuncs
=
"Funcs"
;
static
const
char
*
jkWindowLogicPlanInterval
=
"Interval"
;
static
const
char
*
jkWindowLogicPlanOffset
=
"Offset"
;
static
const
char
*
jkWindowLogicPlanSliding
=
"Sliding"
;
static
const
char
*
jkWindowLogicPlanIntervalUnit
=
"IntervalUnit"
;
static
const
char
*
jkWindowLogicPlanSlidingUnit
=
"SlidingUnit"
;
static
const
char
*
jkWindowLogicPlanSessionGap
=
"SessionGap"
;
static
const
char
*
jkWindowLogicPlanTspk
=
"Tspk"
;
static
const
char
*
jkWindowLogicPlanStateExpr
=
"StateExpr"
;
static
const
char
*
jkWindowLogicPlanTriggerType
=
"TriggerType"
;
static
const
char
*
jkWindowLogicPlanWatermark
=
"Watermark"
;
static
int32_t
logicWindowNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SWindowLogicNode
*
pNode
=
(
const
SWindowLogicNode
*
)
pObj
;
int32_t
code
=
logicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowLogicPlanWinType
,
pNode
->
winType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkWindowLogicPlanFuncs
,
pNode
->
pFuncs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowLogicPlanInterval
,
pNode
->
interval
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowLogicPlanOffset
,
pNode
->
offset
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowLogicPlanSliding
,
pNode
->
sliding
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowLogicPlanIntervalUnit
,
pNode
->
intervalUnit
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowLogicPlanSlidingUnit
,
pNode
->
slidingUnit
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowLogicPlanSessionGap
,
pNode
->
sessionGap
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkWindowLogicPlanTspk
,
nodeToJson
,
pNode
->
pTspk
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkWindowLogicPlanStateExpr
,
nodeToJson
,
pNode
->
pStateExpr
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowLogicPlanTriggerType
,
pNode
->
triggerType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowLogicPlanWatermark
,
pNode
->
watermark
);
}
return
code
;
}
static
int32_t
jsonToLogicWindowNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SWindowLogicNode
*
pNode
=
(
SWindowLogicNode
*
)
pObj
;
int32_t
code
=
jsonToLogicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkWindowLogicPlanWinType
,
pNode
->
winType
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkWindowLogicPlanFuncs
,
&
pNode
->
pFuncs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkWindowLogicPlanInterval
,
&
pNode
->
interval
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkWindowLogicPlanOffset
,
&
pNode
->
offset
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkWindowLogicPlanSliding
,
&
pNode
->
sliding
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkWindowLogicPlanIntervalUnit
,
&
pNode
->
intervalUnit
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkWindowLogicPlanSlidingUnit
,
&
pNode
->
slidingUnit
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkWindowLogicPlanSessionGap
,
&
pNode
->
sessionGap
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkWindowLogicPlanTspk
,
&
pNode
->
pTspk
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkWindowLogicPlanStateExpr
,
&
pNode
->
pStateExpr
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkWindowLogicPlanTriggerType
,
&
pNode
->
triggerType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkWindowLogicPlanWatermark
,
&
pNode
->
watermark
);
}
return
code
;
...
...
@@ -1453,6 +1589,44 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return
code
;
}
static
const
char
*
jkMergePhysiPlanMergeKeys
=
"MergeKeys"
;
static
const
char
*
jkMergePhysiPlanNumOfChannels
=
"NumOfChannels"
;
static
const
char
*
jkMergePhysiPlanSrcGroupId
=
"SrcGroupId"
;
static
int32_t
physiMergeNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SMergePhysiNode
*
pNode
=
(
const
SMergePhysiNode
*
)
pObj
;
int32_t
code
=
physicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkMergePhysiPlanMergeKeys
,
pNode
->
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkMergePhysiPlanNumOfChannels
,
pNode
->
numOfChannels
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkMergePhysiPlanSrcGroupId
,
pNode
->
srcGroupId
);
}
return
code
;
}
static
int32_t
jsonToPhysiMergeNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SMergePhysiNode
*
pNode
=
(
SMergePhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkMergePhysiPlanMergeKeys
,
&
pNode
->
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkMergePhysiPlanNumOfChannels
,
&
pNode
->
numOfChannels
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkMergePhysiPlanSrcGroupId
,
&
pNode
->
srcGroupId
);
}
return
code
;
}
static
const
char
*
jkSortPhysiPlanExprs
=
"Exprs"
;
static
const
char
*
jkSortPhysiPlanSortKeys
=
"SortKeys"
;
static
const
char
*
jkSortPhysiPlanTargets
=
"Targets"
;
...
...
@@ -3388,6 +3562,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
break
;
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
logicExchangeNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
return
logicMergeNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
logicWindowNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_FILL
:
return
logicFillNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_SORT
:
...
...
@@ -3414,6 +3592,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return
physiAggNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
return
physiExchangeNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_MERGE
:
return
physiMergeNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
physiSortNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
...
...
@@ -3499,6 +3679,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToLogicProjectNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
jsonToLogicExchangeNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
return
jsonToLogicMergeNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
jsonToLogicWindowNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_FILL
:
return
jsonToLogicFillNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_SORT
:
...
...
@@ -3525,6 +3709,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToPhysiAggNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
return
jsonToPhysiExchangeNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_MERGE
:
return
jsonToPhysiMergeNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
jsonToPhysiSortNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
edd0295a
...
...
@@ -222,6 +222,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SVnodeModifLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
makeNode
(
type
,
sizeof
(
SExchangeLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
return
makeNode
(
type
,
sizeof
(
SMergeLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
makeNode
(
type
,
sizeof
(
SWindowLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_FILL
:
...
...
@@ -252,6 +254,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SAggPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
return
makeNode
(
type
,
sizeof
(
SExchangePhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_MERGE
:
return
makeNode
(
type
,
sizeof
(
SMergePhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
makeNode
(
type
,
sizeof
(
SSortPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
...
...
source/libs/planner/inc/planInt.h
浏览文件 @
edd0295a
...
...
@@ -36,6 +36,7 @@ extern "C" {
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
int32_t
generateUsageErrMsg
(
char
*
pBuf
,
int32_t
len
,
int32_t
errCode
,
...);
int32_t
createColumnByRewriteExps
(
SNodeList
*
pExprs
,
SNodeList
**
pList
);
int32_t
createLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
**
pLogicNode
);
int32_t
optimizeLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
);
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
edd0295a
...
...
@@ -132,56 +132,56 @@ static int32_t createChildLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelec
return
code
;
}
typedef
struct
SCreateColumnCxt
{
int32_t
errCode
;
SNodeList
*
pList
;
}
SCreateColumnCxt
;
static
EDealRes
doCreateColumn
(
SNode
*
pNode
,
void
*
pContext
)
{
SCreateColumnCxt
*
pCxt
=
(
SCreateColumnCxt
*
)
pContext
;
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_COLUMN
:
{
SNode
*
pCol
=
nodesCloneNode
(
pNode
);
if
(
NULL
==
pCol
)
{
return
DEAL_RES_ERROR
;
}
return
(
TSDB_CODE_SUCCESS
==
nodesListAppend
(
pCxt
->
pList
,
pCol
)
?
DEAL_RES_IGNORE_CHILD
:
DEAL_RES_ERROR
);
}
case
QUERY_NODE_OPERATOR
:
case
QUERY_NODE_LOGIC_CONDITION
:
case
QUERY_NODE_FUNCTION
:
{
SExprNode
*
pExpr
=
(
SExprNode
*
)
pNode
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
return
DEAL_RES_ERROR
;
}
pCol
->
node
.
resType
=
pExpr
->
resType
;
strcpy
(
pCol
->
colName
,
pExpr
->
aliasName
);
return
(
TSDB_CODE_SUCCESS
==
nodesListAppend
(
pCxt
->
pList
,
pCol
)
?
DEAL_RES_IGNORE_CHILD
:
DEAL_RES_ERROR
);
}
default:
break
;
}
return
DEAL_RES_CONTINUE
;
}
static
int32_t
createColumnByRewriteExps
(
SLogicPlanContext
*
pCxt
,
SNodeList
*
pExprs
,
SNodeList
**
pList
)
{
SCreateColumnCxt
cxt
=
{.
errCode
=
TSDB_CODE_SUCCESS
,
.
pList
=
(
NULL
==
*
pList
?
nodesMakeList
()
:
*
pList
)};
if
(
NULL
==
cxt
.
pList
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
nodesWalkExprs
(
pExprs
,
doCreateColumn
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesDestroyList
(
cxt
.
pList
);
return
cxt
.
errCode
;
}
if
(
NULL
==
*
pList
)
{
*
pList
=
cxt
.
pList
;
}
return
cxt
.
errCode
;
}
//
typedef struct SCreateColumnCxt {
//
int32_t errCode;
//
SNodeList* pList;
//
} SCreateColumnCxt;
//
static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
//
SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext;
//
switch (nodeType(pNode)) {
//
case QUERY_NODE_COLUMN: {
//
SNode* pCol = nodesCloneNode(pNode);
//
if (NULL == pCol) {
//
return DEAL_RES_ERROR;
//
}
//
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
//
}
//
case QUERY_NODE_OPERATOR:
//
case QUERY_NODE_LOGIC_CONDITION:
//
case QUERY_NODE_FUNCTION: {
//
SExprNode* pExpr = (SExprNode*)pNode;
//
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
//
if (NULL == pCol) {
//
return DEAL_RES_ERROR;
//
}
//
pCol->node.resType = pExpr->resType;
//
strcpy(pCol->colName, pExpr->aliasName);
//
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
//
}
//
default:
//
break;
//
}
//
return DEAL_RES_CONTINUE;
//
}
// static int32_t createColumnByRewriteExps(
SNodeList* pExprs, SNodeList** pList) {
//
SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
//
if (NULL == cxt.pList) {
//
return TSDB_CODE_OUT_OF_MEMORY;
//
}
//
nodesWalkExprs(pExprs, doCreateColumn, &cxt);
//
if (TSDB_CODE_SUCCESS != cxt.errCode) {
//
nodesDestroyList(cxt.pList);
//
return cxt.errCode;
//
}
//
if (NULL == *pList) {
//
*pList = cxt.pList;
//
}
//
return cxt.errCode;
//
}
static
EScanType
getScanType
(
SLogicPlanContext
*
pCxt
,
SNodeList
*
pScanPseudoCols
,
SNodeList
*
pScanCols
,
STableMeta
*
pMeta
)
{
...
...
@@ -293,10 +293,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
// set output
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
p
Cxt
,
p
Scan
->
pScanCols
,
&
pScan
->
node
.
pTargets
);
code
=
createColumnByRewriteExps
(
pScan
->
pScanCols
,
&
pScan
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
p
Cxt
,
p
Scan
->
pScanPseudoCols
,
&
pScan
->
node
.
pTargets
);
code
=
createColumnByRewriteExps
(
pScan
->
pScanPseudoCols
,
&
pScan
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -461,10 +461,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
// set the output
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pAgg
->
pGroupKeys
)
{
code
=
createColumnByRewriteExps
(
p
Cxt
,
p
Agg
->
pGroupKeys
,
&
pAgg
->
node
.
pTargets
);
code
=
createColumnByRewriteExps
(
pAgg
->
pGroupKeys
,
&
pAgg
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pAgg
->
pAggFuncs
)
{
code
=
createColumnByRewriteExps
(
p
Cxt
,
p
Agg
->
pAggFuncs
,
&
pAgg
->
node
.
pTargets
);
code
=
createColumnByRewriteExps
(
pAgg
->
pAggFuncs
,
&
pAgg
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -490,7 +490,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
p
Cxt
,
p
Window
->
pFuncs
,
&
pWindow
->
node
.
pTargets
);
code
=
createColumnByRewriteExps
(
pWindow
->
pFuncs
,
&
pWindow
->
node
.
pTargets
);
}
pSelect
->
hasAggFuncs
=
false
;
...
...
@@ -760,7 +760,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
// set the output
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
p
Cxt
,
p
Agg
->
pGroupKeys
,
&
pAgg
->
node
.
pTargets
);
code
=
createColumnByRewriteExps
(
pAgg
->
pGroupKeys
,
&
pAgg
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -907,7 +907,7 @@ static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pS
// set the output
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
p
Cxt
,
p
Agg
->
pGroupKeys
,
&
pAgg
->
node
.
pTargets
);
code
=
createColumnByRewriteExps
(
pAgg
->
pGroupKeys
,
&
pAgg
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
edd0295a
...
...
@@ -835,7 +835,7 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
static
int32_t
doCreateExchangePhysiNode
(
SPhysiPlanContext
*
pCxt
,
SExchangeLogicNode
*
pExchangeLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
makePhysiNode
(
pCxt
,
pExchangeLogicNode
->
precision
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
);
pCxt
,
pExchangeLogicNode
->
node
.
precision
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -845,10 +845,11 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
createStreamScanPhysiNodeByExchange
(
SPhysiPlanContext
*
pCxt
,
SExchangeLogicNode
*
pExchangeLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SScanPhysiNode
*
pScan
=
(
SScanPhysiNode
*
)
makePhysiNode
(
pCxt
,
pExchangeLogicNode
->
precision
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
pCxt
,
pExchangeLogicNode
->
node
.
precision
,
(
SLogicNode
*
)
pExchangeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
if
(
NULL
==
pScan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -949,7 +950,8 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
SWindowLogicNode
*
pWindowLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SSessionWinodwPhysiNode
*
pSession
=
(
SSessionWinodwPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pWindowLogicNode
,
(
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
:
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
));
(
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
:
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
));
if
(
NULL
==
pSession
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -1132,6 +1134,54 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
return
code
;
}
static
int32_t
createExchangePhysiNodeByMerge
(
SMergePhysiNode
*
pMerge
)
{
SExchangePhysiNode
*
pExchange
=
nodesMakeNode
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pExchange
->
srcGroupId
=
pMerge
->
srcGroupId
;
pExchange
->
node
.
pParent
=
(
SPhysiNode
*
)
pMerge
;
pExchange
->
node
.
pOutputDataBlockDesc
=
nodesCloneNode
(
pMerge
->
node
.
pOutputDataBlockDesc
);
if
(
NULL
==
pExchange
->
node
.
pOutputDataBlockDesc
)
{
nodesDestroyNode
(
pExchange
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
return
nodesListMakeStrictAppend
(
&
pMerge
->
node
.
pChildren
,
pExchange
);
}
static
int32_t
createMergePhysiNode
(
SPhysiPlanContext
*
pCxt
,
SMergeLogicNode
*
pMergeLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SMergePhysiNode
*
pMerge
=
(
SMergePhysiNode
*
)
makePhysiNode
(
pCxt
,
pMergeLogicNode
->
node
.
precision
,
(
SLogicNode
*
)
pMergeLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
);
if
(
NULL
==
pMerge
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pMerge
->
numOfChannels
=
pMergeLogicNode
->
numOfChannels
;
pMerge
->
srcGroupId
=
pMergeLogicNode
->
srcGroupId
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int32_t
i
=
0
;
i
<
pMerge
->
numOfChannels
;
++
i
)
{
code
=
createExchangePhysiNodeByMerge
(
pMerge
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setListSlotId
(
pCxt
,
pMerge
->
node
.
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pMergeLogicNode
->
pMergeKeys
,
&
pMerge
->
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pMerge
;
}
else
{
nodesDestroyNode
(
pMerge
);
}
return
code
;
}
static
int32_t
doCreatePhysiNode
(
SPhysiPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SSubplan
*
pSubplan
,
SNodeList
*
pChildren
,
SPhysiNode
**
pPhyNode
)
{
switch
(
nodeType
(
pLogicNode
))
{
...
...
@@ -1153,6 +1203,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
return
createPartitionPhysiNode
(
pCxt
,
pChildren
,
(
SPartitionLogicNode
*
)
pLogicNode
,
pPhyNode
);
case
QUERY_NODE_LOGIC_PLAN_FILL
:
return
createFillPhysiNode
(
pCxt
,
pChildren
,
(
SFillLogicNode
*
)
pLogicNode
,
pPhyNode
);
case
QUERY_NODE_LOGIC_PLAN_MERGE
:
return
createMergePhysiNode
(
pCxt
,
(
SMergeLogicNode
*
)
pLogicNode
,
pPhyNode
);
default:
break
;
}
...
...
@@ -1183,9 +1235,13 @@ static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode,
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
(
*
pPhyNode
)
->
pChildren
=
pChildren
;
SNode
*
pChild
;
FOREACH
(
pChild
,
(
*
pPhyNode
)
->
pChildren
)
{
((
SPhysiNode
*
)
pChild
)
->
pParent
=
(
*
pPhyNode
);
}
if
(
LIST_LENGTH
(
pChildren
)
>
0
)
{
(
*
pPhyNode
)
->
pChildren
=
pChildren
;
SNode
*
pChild
;
FOREACH
(
pChild
,
(
*
pPhyNode
)
->
pChildren
)
{
((
SPhysiNode
*
)
pChild
)
->
pParent
=
(
*
pPhyNode
);
}
}
else
{
nodesDestroyList
(
pChildren
);
}
}
else
{
nodesDestroyList
(
pChildren
);
}
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
edd0295a
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "functionMgt.h"
#include "planInt.h"
#define SPLIT_FLAG_MASK(n) (1 << n)
...
...
@@ -37,7 +38,17 @@ typedef struct SSplitRule {
typedef
bool
(
*
FSplFindSplitNode
)(
SLogicSubplan
*
pSubplan
,
void
*
pInfo
);
static
SLogicSubplan
*
splCreateSubplan
(
SSplitContext
*
pCxt
,
SLogicNode
*
pNode
,
int32_t
flag
)
{
static
void
splSetSubplanVgroups
(
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
))
{
TSWAP
(
pSubplan
->
pVgroupList
,
((
SScanLogicNode
*
)
pNode
)
->
pVgroupList
);
}
else
{
if
(
1
==
LIST_LENGTH
(
pNode
->
pChildren
))
{
splSetSubplanVgroups
(
pSubplan
,
(
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
));
}
}
}
static
SLogicSubplan
*
splCreateScanSubplan
(
SSplitContext
*
pCxt
,
SLogicNode
*
pNode
,
int32_t
flag
)
{
SLogicSubplan
*
pSubplan
=
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
if
(
NULL
==
pSubplan
)
{
return
NULL
;
...
...
@@ -45,10 +56,9 @@ static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, i
pSubplan
->
id
.
queryId
=
pCxt
->
queryId
;
pSubplan
->
id
.
groupId
=
pCxt
->
groupId
;
pSubplan
->
subplanType
=
SUBPLAN_TYPE_SCAN
;
pSubplan
->
pNode
=
(
SLogicNode
*
)
nodesCloneNode
(
pNode
);
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
))
{
TSWAP
(
pSubplan
->
pVgroupList
,
((
SScanLogicNode
*
)
pSubplan
->
pNode
)
->
pVgroupList
);
}
pSubplan
->
pNode
=
pNode
;
pSubplan
->
pNode
->
pParent
=
NULL
;
splSetSubplanVgroups
(
pSubplan
,
pNode
);
SPLIT_FLAG_SET_MASK
(
pSubplan
->
splitFlag
,
flag
);
return
pSubplan
;
}
...
...
@@ -60,7 +70,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pExchange
->
srcGroupId
=
pCxt
->
groupId
;
pExchange
->
precision
=
pSplitNode
->
precision
;
pExchange
->
node
.
precision
=
pSplitNode
->
precision
;
pExchange
->
node
.
pTargets
=
nodesCloneList
(
pSplitNode
->
pTargets
);
if
(
NULL
==
pExchange
->
node
.
pTargets
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -77,7 +87,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
FOREACH
(
pNode
,
pSplitNode
->
pParent
->
pChildren
)
{
if
(
nodesEqualNode
(
pNode
,
pSplitNode
))
{
REPLACE_NODE
(
pExchange
);
nodesDestroyNode
(
pNode
)
;
pExchange
->
node
.
pParent
=
pSplitNode
->
pParent
;
return
TSDB_CODE_SUCCESS
;
}
}
...
...
@@ -101,13 +111,50 @@ static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag,
}
typedef
struct
SStableSplitInfo
{
S
ScanLogicNode
*
pScan
;
SLogicSubplan
*
pSubplan
;
S
LogicNode
*
pSplitNode
;
SLogicSubplan
*
pSubplan
;
}
SStableSplitInfo
;
static
bool
stbSplHasGatherExecFunc
(
const
SNodeList
*
pFuncs
)
{
SNode
*
pFunc
=
NULL
;
FOREACH
(
pFunc
,
pFuncs
)
{
if
(
!
fmIsDistExecFunc
(((
SFunctionNode
*
)
pFunc
)
->
funcId
))
{
return
true
;
}
}
return
false
;
}
static
bool
stbSplIsMultiTbScan
(
SScanLogicNode
*
pScan
)
{
return
(
NULL
!=
pScan
->
pVgroupList
&&
pScan
->
pVgroupList
->
numOfVgroups
>
1
);
}
static
bool
stbSplHasMultiTbScan
(
SLogicNode
*
pNode
)
{
if
(
1
!=
LIST_LENGTH
(
pNode
->
pChildren
))
{
return
false
;
}
SNode
*
pChild
=
nodesListGetNode
(
pNode
->
pChildren
,
0
);
return
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pChild
)
&&
stbSplIsMultiTbScan
((
SScanLogicNode
*
)
pChild
));
}
static
bool
stbSplNeedSplit
(
SLogicNode
*
pNode
)
{
switch
(
nodeType
(
pNode
))
{
// case QUERY_NODE_LOGIC_PLAN_AGG:
// return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
!
stbSplHasGatherExecFunc
(((
SAggLogicNode
*
)
pNode
)
->
pAggFuncs
)
&&
stbSplHasMultiTbScan
(
pNode
);
// case QUERY_NODE_LOGIC_PLAN_SORT:
// return stbSplHasMultiTbScan(pNode);
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
stbSplIsMultiTbScan
((
SScanLogicNode
*
)
pNode
);
default:
break
;
}
return
false
;
}
static
SLogicNode
*
stbSplMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
)
&&
NULL
!=
((
SScanLogicNode
*
)
pNode
)
->
pVgroupList
&&
((
SScanLogicNode
*
)
pNode
)
->
pVgroupList
->
numOfVgroups
>
1
)
{
if
(
stbSplNeedSplit
(
pNode
))
{
return
pNode
;
}
SNode
*
pChild
;
...
...
@@ -123,22 +170,178 @@ static SLogicNode* stbSplMatchByNode(SLogicNode* pNode) {
static
bool
stbSplFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
stbSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pS
can
=
(
SScanLogicNode
*
)
pSplitNode
;
pInfo
->
pS
plitNode
=
pSplitNode
;
pInfo
->
pSubplan
=
pSubplan
;
}
return
NULL
!=
pSplitNode
;
}
static
int32_t
stbSplRewriteFuns
(
const
SNodeList
*
pFuncs
,
SNodeList
**
pPartialFuncs
,
SNodeList
**
pMergeFuncs
)
{
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pFuncs
)
{
SFunctionNode
*
pFunc
=
(
SFunctionNode
*
)
pNode
;
SFunctionNode
*
pPartFunc
=
NULL
;
SFunctionNode
*
pMergeFunc
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
fmIsWindowPseudoColumnFunc
(
pFunc
->
funcId
))
{
pPartFunc
=
nodesCloneNode
(
pFunc
);
pMergeFunc
=
nodesCloneNode
(
pFunc
);
if
(
NULL
==
pPartFunc
||
NULL
==
pMergeFunc
)
{
nodesDestroyNode
(
pPartFunc
);
nodesDestroyNode
(
pMergeFunc
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
else
{
code
=
fmGetDistMethod
(
pFunc
,
&
pPartFunc
,
&
pMergeFunc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
pPartialFuncs
,
pPartFunc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
pMergeFuncs
,
pMergeFunc
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyList
(
*
pPartialFuncs
);
nodesDestroyList
(
*
pMergeFuncs
);
return
code
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
stbSplAppendWStart
(
SNodeList
*
pFuncs
,
int32_t
*
pIndex
)
{
int32_t
index
=
0
;
SNode
*
pFunc
=
NULL
;
FOREACH
(
pFunc
,
pFuncs
)
{
if
(
FUNCTION_TYPE_WSTARTTS
==
((
SFunctionNode
*
)
pFunc
)
->
funcType
)
{
*
pIndex
=
index
;
return
TSDB_CODE_SUCCESS
;
}
++
index
;
}
SFunctionNode
*
pWStart
=
nodesMakeNode
(
QUERY_NODE_FUNCTION
);
if
(
NULL
==
pWStart
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
strcpy
(
pWStart
->
functionName
,
"_wstartts"
);
snprintf
(
pWStart
->
node
.
aliasName
,
sizeof
(
pWStart
->
node
.
aliasName
),
"%s.%p"
,
pWStart
->
functionName
,
pWStart
);
int32_t
code
=
fmGetFuncInfo
(
pWStart
,
NULL
,
0
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListStrictAppend
(
pFuncs
,
pWStart
);
}
*
pIndex
=
index
;
return
code
;
}
static
int32_t
stbSplCreatePartWindowNode
(
SWindowLogicNode
*
pMergeWindow
,
SLogicNode
**
pPartWindow
)
{
SNodeList
*
pFunc
=
pMergeWindow
->
pFuncs
;
pMergeWindow
->
pFuncs
=
NULL
;
SNodeList
*
pTargets
=
pMergeWindow
->
node
.
pTargets
;
pMergeWindow
->
node
.
pTargets
=
NULL
;
SNodeList
*
pChildren
=
pMergeWindow
->
node
.
pChildren
;
pMergeWindow
->
node
.
pChildren
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SWindowLogicNode
*
pPartWin
=
nodesCloneNode
(
pMergeWindow
);
if
(
NULL
==
pPartWin
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pMergeWindow
->
node
.
pTargets
=
pTargets
;
pPartWin
->
node
.
pChildren
=
pChildren
;
code
=
stbSplRewriteFuns
(
pFunc
,
&
pPartWin
->
pFuncs
,
&
pMergeWindow
->
pFuncs
);
}
int32_t
index
=
0
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplAppendWStart
(
pPartWin
->
pFuncs
,
&
index
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
pPartWin
->
pFuncs
,
&
pPartWin
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
nodesDestroyNode
(
pMergeWindow
->
pTspk
);
pMergeWindow
->
pTspk
=
nodesCloneNode
(
nodesListGetNode
(
pPartWin
->
node
.
pTargets
,
index
));
if
(
NULL
==
pMergeWindow
->
pTspk
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
nodesDestroyList
(
pFunc
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPartWindow
=
(
SLogicNode
*
)
pPartWin
;
}
else
{
nodesDestroyNode
(
pPartWin
);
}
return
code
;
}
static
int32_t
stbSplCreateMergeNode
(
SSplitContext
*
pCxt
,
SLogicNode
*
pParent
,
SLogicNode
*
pPartChild
)
{
SMergeLogicNode
*
pMerge
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_MERGE
);
if
(
NULL
==
pMerge
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pMerge
->
numOfChannels
=
((
SScanLogicNode
*
)
nodesListGetNode
(
pPartChild
->
pChildren
,
0
))
->
pVgroupList
->
numOfVgroups
;
pMerge
->
srcGroupId
=
pCxt
->
groupId
;
pMerge
->
node
.
pParent
=
pParent
;
pMerge
->
node
.
precision
=
pPartChild
->
precision
;
int32_t
code
=
nodesListMakeStrictAppend
(
&
pMerge
->
pMergeKeys
,
nodesCloneNode
(((
SWindowLogicNode
*
)
pParent
)
->
pTspk
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pMerge
->
node
.
pTargets
=
nodesCloneList
(
pPartChild
->
pTargets
);
if
(
NULL
==
pMerge
->
node
.
pTargets
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeAppend
(
&
pParent
->
pChildren
,
pMerge
);
}
return
code
;
}
static
int32_t
stbSplSplitWindowNode
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pPartWindow
=
NULL
;
int32_t
code
=
stbSplCreatePartWindowNode
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
,
&
pPartWindow
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
pInfo
->
pSplitNode
,
pPartWindow
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pInfo
->
pSubplan
->
pChildren
,
splCreateScanSubplan
(
pCxt
,
pPartWindow
,
SPLIT_FLAG_STABLE_SPLIT
));
}
pInfo
->
pSubplan
->
subplanType
=
SUBPLAN_TYPE_MERGE
;
return
code
;
}
static
int32_t
stbSplSplitScanNode
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
int32_t
code
=
splCreateExchangeNode
(
pCxt
,
pInfo
->
pSubplan
,
pInfo
->
pSplitNode
,
SUBPLAN_TYPE_MERGE
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pInfo
->
pSubplan
->
pChildren
,
splCreateScanSubplan
(
pCxt
,
pInfo
->
pSplitNode
,
SPLIT_FLAG_STABLE_SPLIT
));
}
return
code
;
}
static
int32_t
stableSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
SStableSplitInfo
info
=
{
0
};
if
(
!
splMatch
(
pCxt
,
pSubplan
,
SPLIT_FLAG_STABLE_SPLIT
,
(
FSplFindSplitNode
)
stbSplFindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
nodesListMakeStrictAppend
(
&
info
.
pSubplan
->
pChildren
,
splCreateSubplan
(
pCxt
,
(
SLogicNode
*
)
info
.
pScan
,
SPLIT_FLAG_STABLE_SPLIT
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
splCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
(
SLogicNode
*
)
info
.
pScan
,
SUBPLAN_TYPE_MERGE
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
switch
(
nodeType
(
info
.
pSplitNode
))
{
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
code
=
stbSplSplitWindowNode
(
pCxt
,
&
info
);
break
;
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
code
=
stbSplSplitScanNode
(
pCxt
,
&
info
);
break
;
default:
break
;
}
++
(
pCxt
->
groupId
);
pCxt
->
split
=
true
;
return
code
;
...
...
@@ -187,9 +390,9 @@ static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan
if
(
!
splMatch
(
pCxt
,
pSubplan
,
0
,
(
FSplFindSplitNode
)
sigTbJoinSplFindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
nodesListMakeStrictAppend
(
&
info
.
pSubplan
->
pChildren
,
splCreateSubplan
(
pCxt
,
info
.
pSplitNode
,
0
)
);
int32_t
code
=
splCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pSplitNode
,
info
.
pSubplan
->
subplanType
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
splCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pSplitNode
,
info
.
pSubplan
->
subplanType
);
code
=
nodesListMakeStrictAppend
(
&
info
.
pSubplan
->
pChildren
,
splCreateScanSubplan
(
pCxt
,
info
.
pSplitNode
,
0
)
);
}
++
(
pCxt
->
groupId
);
pCxt
->
split
=
true
;
...
...
@@ -272,13 +475,13 @@ typedef struct SUnionAllSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SUnionAllSplitInfo
;
static
SLogicNode
*
un
ionAl
lMatchByNode
(
SLogicNode
*
pNode
)
{
static
SLogicNode
*
un
AllSp
lMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_PROJECT
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
un
ionAl
lMatchByNode
((
SLogicNode
*
)
pChild
);
SLogicNode
*
pSplitNode
=
un
AllSp
lMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
...
...
@@ -286,8 +489,8 @@ static SLogicNode* unionAllMatchByNode(SLogicNode* pNode) {
return
NULL
;
}
static
bool
un
ionAl
lFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SUnionAllSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
un
ionAl
lMatchByNode
(
pSubplan
->
pNode
);
static
bool
un
AllSp
lFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SUnionAllSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
un
AllSp
lMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pProject
=
(
SProjectLogicNode
*
)
pSplitNode
;
pInfo
->
pSubplan
=
pSubplan
;
...
...
@@ -295,13 +498,13 @@ static bool unionAllFindSplitNode(SLogicSubplan* pSubplan, SUnionAllSplitInfo* p
return
NULL
!=
pSplitNode
;
}
static
int32_t
un
ionAl
lCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SProjectLogicNode
*
pProject
)
{
static
int32_t
un
AllSp
lCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SProjectLogicNode
*
pProject
)
{
SExchangeLogicNode
*
pExchange
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pExchange
->
srcGroupId
=
pCxt
->
groupId
;
pExchange
->
precision
=
pProject
->
node
.
precision
;
pExchange
->
node
.
precision
=
pProject
->
node
.
precision
;
pExchange
->
node
.
pTargets
=
nodesCloneList
(
pProject
->
node
.
pTargets
);
if
(
NULL
==
pExchange
->
node
.
pTargets
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -329,13 +532,13 @@ static int32_t unionAllCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pS
static
int32_t
unionAllSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
SUnionAllSplitInfo
info
=
{
0
};
if
(
!
splMatch
(
pCxt
,
pSubplan
,
0
,
(
FSplFindSplitNode
)
un
ionAl
lFindSplitNode
,
&
info
))
{
if
(
!
splMatch
(
pCxt
,
pSubplan
,
0
,
(
FSplFindSplitNode
)
un
AllSp
lFindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
unionSplitSubplan
(
pCxt
,
info
.
pSubplan
,
(
SLogicNode
*
)
info
.
pProject
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
un
ionAl
lCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pProject
);
code
=
un
AllSp
lCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pProject
);
}
++
(
pCxt
->
groupId
);
pCxt
->
split
=
true
;
...
...
@@ -347,13 +550,13 @@ typedef struct SUnionDistinctSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SUnionDistinctSplitInfo
;
static
SLogicNode
*
un
ionDistinct
MatchByNode
(
SLogicNode
*
pNode
)
{
static
SLogicNode
*
un
DistSpl
MatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_AGG
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
un
ionDistinct
MatchByNode
((
SLogicNode
*
)
pChild
);
SLogicNode
*
pSplitNode
=
un
DistSpl
MatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
...
...
@@ -361,13 +564,13 @@ static SLogicNode* unionDistinctMatchByNode(SLogicNode* pNode) {
return
NULL
;
}
static
int32_t
unCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SAggLogicNode
*
pAgg
)
{
static
int32_t
un
DistSpl
CreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SAggLogicNode
*
pAgg
)
{
SExchangeLogicNode
*
pExchange
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pExchange
->
srcGroupId
=
pCxt
->
groupId
;
// pExchange->precision = pScan->pMeta->tableInfo
.precision;
pExchange
->
node
.
precision
=
pAgg
->
node
.
precision
;
pExchange
->
node
.
pTargets
=
nodesCloneList
(
pAgg
->
pGroupKeys
);
if
(
NULL
==
pExchange
->
node
.
pTargets
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -378,8 +581,8 @@ static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan
return
nodesListMakeAppend
(
&
pAgg
->
node
.
pChildren
,
pExchange
);
}
static
bool
un
ionDistinct
FindSplitNode
(
SLogicSubplan
*
pSubplan
,
SUnionDistinctSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
un
ionDistinct
MatchByNode
(
pSubplan
->
pNode
);
static
bool
un
DistSpl
FindSplitNode
(
SLogicSubplan
*
pSubplan
,
SUnionDistinctSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
un
DistSpl
MatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pAgg
=
(
SAggLogicNode
*
)
pSplitNode
;
pInfo
->
pSubplan
=
pSubplan
;
...
...
@@ -389,13 +592,13 @@ static bool unionDistinctFindSplitNode(SLogicSubplan* pSubplan, SUnionDistinctSp
static
int32_t
unionDistinctSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
SUnionDistinctSplitInfo
info
=
{
0
};
if
(
!
splMatch
(
pCxt
,
pSubplan
,
0
,
(
FSplFindSplitNode
)
un
ionDistinct
FindSplitNode
,
&
info
))
{
if
(
!
splMatch
(
pCxt
,
pSubplan
,
0
,
(
FSplFindSplitNode
)
un
DistSpl
FindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
unionSplitSubplan
(
pCxt
,
info
.
pSubplan
,
(
SLogicNode
*
)
info
.
pAgg
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
unCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pAgg
);
code
=
un
DistSpl
CreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pAgg
);
}
++
(
pCxt
->
groupId
);
pCxt
->
split
=
true
;
...
...
source/libs/planner/src/planUtil.c
浏览文件 @
edd0295a
...
...
@@ -34,3 +34,54 @@ int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...) {
va_end
(
vArgList
);
return
errCode
;
}
typedef
struct
SCreateColumnCxt
{
int32_t
errCode
;
SNodeList
*
pList
;
}
SCreateColumnCxt
;
static
EDealRes
doCreateColumn
(
SNode
*
pNode
,
void
*
pContext
)
{
SCreateColumnCxt
*
pCxt
=
(
SCreateColumnCxt
*
)
pContext
;
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_COLUMN
:
{
SNode
*
pCol
=
nodesCloneNode
(
pNode
);
if
(
NULL
==
pCol
)
{
return
DEAL_RES_ERROR
;
}
return
(
TSDB_CODE_SUCCESS
==
nodesListAppend
(
pCxt
->
pList
,
pCol
)
?
DEAL_RES_IGNORE_CHILD
:
DEAL_RES_ERROR
);
}
case
QUERY_NODE_OPERATOR
:
case
QUERY_NODE_LOGIC_CONDITION
:
case
QUERY_NODE_FUNCTION
:
{
SExprNode
*
pExpr
=
(
SExprNode
*
)
pNode
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
return
DEAL_RES_ERROR
;
}
pCol
->
node
.
resType
=
pExpr
->
resType
;
strcpy
(
pCol
->
colName
,
pExpr
->
aliasName
);
return
(
TSDB_CODE_SUCCESS
==
nodesListAppend
(
pCxt
->
pList
,
pCol
)
?
DEAL_RES_IGNORE_CHILD
:
DEAL_RES_ERROR
);
}
default:
break
;
}
return
DEAL_RES_CONTINUE
;
}
int32_t
createColumnByRewriteExps
(
SNodeList
*
pExprs
,
SNodeList
**
pList
)
{
SCreateColumnCxt
cxt
=
{.
errCode
=
TSDB_CODE_SUCCESS
,
.
pList
=
(
NULL
==
*
pList
?
nodesMakeList
()
:
*
pList
)};
if
(
NULL
==
cxt
.
pList
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
nodesWalkExprs
(
pExprs
,
doCreateColumn
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesDestroyList
(
cxt
.
pList
);
return
cxt
.
errCode
;
}
if
(
NULL
==
*
pList
)
{
*
pList
=
cxt
.
pList
;
}
return
cxt
.
errCode
;
}
source/libs/planner/src/planner.c
浏览文件 @
edd0295a
...
...
@@ -58,16 +58,19 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
nodeType
(
pNode
))
{
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
pNode
;
if
(
pExchange
->
srcGroupId
==
groupId
)
{
if
(
NULL
==
pExchange
->
pSrcEndPoints
)
{
pExchange
->
pSrcEndPoints
=
nodesMakeList
();
if
(
NULL
==
pExchange
->
pSrcEndPoints
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
pExchange
->
pSrcEndPoints
,
nodesCloneNode
(
pSource
)))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
nodesListMakeStrictAppend
(
&
pExchange
->
pSrcEndPoints
,
nodesCloneNode
(
pSource
));
}
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE
==
nodeType
(
pNode
))
{
SMergePhysiNode
*
pMerge
=
(
SMergePhysiNode
*
)
pNode
;
if
(
pMerge
->
srcGroupId
==
groupId
)
{
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
nodesListGetNode
(
pMerge
->
node
.
pChildren
,
pMerge
->
numOfChannels
-
1
);
if
(
1
==
pMerge
->
numOfChannels
)
{
pMerge
->
numOfChannels
=
LIST_LENGTH
(
pMerge
->
node
.
pChildren
);
}
else
{
--
(
pMerge
->
numOfChannels
);
}
return
TSDB_CODE_SUCCESS
;
return
nodesListMakeStrictAppend
(
&
pExchange
->
pSrcEndPoints
,
nodesCloneNode
(
pSource
))
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录