Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6c5ec227
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6c5ec227
编写于
6月 02, 2022
作者:
X
Xiaoyu Wang
提交者:
GitHub
6月 02, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13417 from taosdata/feature/3.0_wxy
feat: stream interval distributed split
上级
0bade865
e794d542
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
162 addition
and
71 deletion
+162
-71
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+23
-14
include/libs/planner/planner.h
include/libs/planner/planner.h
+1
-1
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+15
-20
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+2
-1
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+8
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+4
-0
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+1
-0
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+14
-3
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+87
-32
source/libs/planner/test/planOtherTest.cpp
source/libs/planner/test/planOtherTest.cpp
+6
-0
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
6c5ec227
...
...
@@ -212,6 +212,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_FILL
,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
6c5ec227
...
...
@@ -106,21 +106,28 @@ typedef struct SMergeLogicNode {
typedef
enum
EWindowType
{
WINDOW_TYPE_INTERVAL
=
1
,
WINDOW_TYPE_SESSION
,
WINDOW_TYPE_STATE
}
EWindowType
;
typedef
enum
EStreamIntervalAlgorithm
{
STREAM_INTERVAL_ALGO_FINAL
=
1
,
STREAM_INTERVAL_ALGO_SEMI
,
STREAM_INTERVAL_ALGO_SINGLE
}
EStreamIntervalAlgorithm
;
typedef
struct
SWindowLogicNode
{
SLogicNode
node
;
EWindowType
winType
;
SNodeList
*
pFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int64_t
sessionGap
;
SNode
*
pTspk
;
SNode
*
pStateExpr
;
int8_t
triggerType
;
int64_t
watermark
;
double
filesFactor
;
SLogicNode
node
;
EWindowType
winType
;
SNodeList
*
pFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int64_t
sessionGap
;
SNode
*
pTspk
;
SNode
*
pStateExpr
;
int8_t
triggerType
;
int64_t
watermark
;
double
filesFactor
;
EStreamIntervalAlgorithm
stmInterAlgo
;
}
SWindowLogicNode
;
typedef
struct
SFillLogicNode
{
...
...
@@ -301,6 +308,8 @@ typedef struct SIntervalPhysiNode {
}
SIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamFinalIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamSemiIntervalPhysiNode
;
typedef
struct
SFillPhysiNode
{
SPhysiNode
node
;
...
...
include/libs/planner/planner.h
浏览文件 @
6c5ec227
...
...
@@ -36,7 +36,7 @@ typedef struct SPlanContext {
int64_t
watermark
;
char
*
pMsg
;
int32_t
msgLen
;
double
filesFactor
;
double
filesFactor
;
}
SPlanContext
;
// Create the physical plan for the query, according to the AST.
...
...
source/libs/function/src/builtins.c
浏览文件 @
6c5ec227
...
...
@@ -463,12 +463,9 @@ static bool validateStateOper(const SValueNode* pVal) {
if
(
TSDB_DATA_TYPE_BINARY
!=
pVal
->
node
.
resType
.
type
)
{
return
false
;
}
return
(
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"GT"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"GE"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"LT"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"LE"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"EQ"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"NE"
));
return
(
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"GT"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"GE"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"LT"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"LE"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"EQ"
)
||
0
==
strcasecmp
(
varDataVal
(
pVal
->
datum
.
p
),
"NE"
));
}
static
int32_t
translateStateCount
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
...
...
@@ -552,7 +549,6 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
"STATEDURATION function time unit parameter should be greater than db precision"
);
}
pValue
->
notReserved
=
true
;
}
...
...
@@ -837,7 +833,7 @@ static int32_t translateConcatImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t
int32_t
resultBytes
=
0
;
int32_t
sepBytes
=
0
;
//concat_ws separator should be constant string
//
concat_ws separator should be constant string
if
(
hasSep
)
{
SNode
*
pPara
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
if
(
nodeType
(
pPara
)
!=
QUERY_NODE_VALUE
)
{
...
...
@@ -963,7 +959,7 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
return
false
;
}
char
*
tz
=
varDataVal
(
pVal
->
datum
.
p
);
char
*
tz
=
varDataVal
(
pVal
->
datum
.
p
);
int32_t
len
=
varDataLen
(
pVal
->
datum
.
p
);
if
(
len
==
0
)
{
...
...
@@ -1007,20 +1003,20 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
}
void
static
addTimezoneParam
(
SNodeList
*
pList
)
{
char
buf
[
6
]
=
{
0
};
time_t
t
=
taosTime
(
NULL
);
struct
tm
*
tmInfo
=
taosLocalTime
(
&
t
,
NULL
);
char
buf
[
6
]
=
{
0
};
time_t
t
=
taosTime
(
NULL
);
struct
tm
*
tmInfo
=
taosLocalTime
(
&
t
,
NULL
);
strftime
(
buf
,
sizeof
(
buf
),
"%z"
,
tmInfo
);
int32_t
len
=
(
int32_t
)
strlen
(
buf
);
SValueNode
*
pVal
=
(
SValueNode
*
)
nodesMakeNode
(
QUERY_NODE_VALUE
);
pVal
->
literal
=
strndup
(
buf
,
len
);
pVal
->
isDuration
=
false
;
pVal
->
isDuration
=
false
;
pVal
->
translate
=
true
;
pVal
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_BINARY
;
pVal
->
node
.
resType
.
bytes
=
len
+
VARSTR_HEADER_SIZE
;
pVal
->
node
.
resType
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
pVal
->
datum
.
p
=
taosMemoryCalloc
(
1
,
len
+
VARSTR_HEADER_SIZE
+
1
);
pVal
->
datum
.
p
=
taosMemoryCalloc
(
1
,
len
+
VARSTR_HEADER_SIZE
+
1
);
varDataSetLen
(
pVal
->
datum
.
p
,
len
);
strncpy
(
varDataVal
(
pVal
->
datum
.
p
),
pVal
->
literal
,
len
);
...
...
@@ -1033,25 +1029,24 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
//param0
//
param0
uint8_t
paraType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
.
type
;
if
(
!
IS_INTEGER_TYPE
(
paraType
)
&&
TSDB_DATA_TYPE_TIMESTAMP
!=
paraType
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
//param1
//
param1
if
(
numOfParams
==
2
)
{
SValueNode
*
pValue
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
if
(
!
validateTimezoneFormat
(
pValue
))
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"Invalid timzone format"
);
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"Invalid timzone format"
);
}
}
else
{
//
add default client timezone
}
else
{
//
add default client timezone
addTimezoneParam
(
pFunc
->
pParameterList
);
}
//set result type
//
set result type
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
64
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
6c5ec227
...
...
@@ -142,7 +142,7 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
break
;
case
TSDB_DATA_TYPE_NCHAR
:
case
TSDB_DATA_TYPE_VARCHAR
:
case
TSDB_DATA_TYPE_VARBINARY
:{
case
TSDB_DATA_TYPE_VARBINARY
:
{
int32_t
len
=
varDataTLen
(
pSrc
->
datum
.
p
)
+
1
;
pDst
->
datum
.
p
=
taosMemoryCalloc
(
1
,
len
);
if
(
NULL
==
pDst
->
datum
.
p
)
{
...
...
@@ -399,6 +399,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
COPY_SCALAR_FIELD
(
triggerType
);
COPY_SCALAR_FIELD
(
watermark
);
COPY_SCALAR_FIELD
(
filesFactor
);
COPY_SCALAR_FIELD
(
stmInterAlgo
);
return
(
SNode
*
)
pDst
;
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
6c5ec227
...
...
@@ -230,6 +230,10 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
"PhysiStreamInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
return
"PhysiStreamFinalInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
return
"PhysiStreamSemiInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_FILL
:
return
"PhysiFill"
;
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:
...
...
@@ -3611,6 +3615,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return
physiSortNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
return
physiIntervalNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_FILL
:
return
physiFillNodeToJson
(
pObj
,
pJson
);
...
...
@@ -3728,6 +3734,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToPhysiSortNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
return
jsonToPhysiIntervalNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_FILL
:
return
jsonToPhysiFillNode
(
pJson
,
pObj
);
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
6c5ec227
...
...
@@ -260,6 +260,10 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SStreamIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SStreamFinalIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SStreamSemiIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_FILL
:
return
makeNode
(
type
,
sizeof
(
SFillPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
6c5ec227
...
...
@@ -562,6 +562,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow
->
sliding
=
(
NULL
!=
pInterval
->
pSliding
?
((
SValueNode
*
)
pInterval
->
pSliding
)
->
datum
.
i
:
pWindow
->
interval
);
pWindow
->
slidingUnit
=
(
NULL
!=
pInterval
->
pSliding
?
((
SValueNode
*
)
pInterval
->
pSliding
)
->
unit
:
pWindow
->
intervalUnit
);
pWindow
->
stmInterAlgo
=
STREAM_INTERVAL_ALGO_SINGLE
;
pWindow
->
pTspk
=
nodesCloneNode
(
pInterval
->
pCol
);
if
(
NULL
==
pWindow
->
pTspk
)
{
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
6c5ec227
...
...
@@ -526,10 +526,10 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
pScan
->
accountId
=
pCxt
->
pPlanCxt
->
acctId
;
if
(
0
==
strcmp
(
pScanLogicNode
->
tableName
.
tname
,
TSDB_INS_TABLE_USER_TABLES
))
{
vgroupInfoToNodeAddr
(
pScanLogicNode
->
pVgroupList
->
vgroups
,
&
pSubplan
->
execNode
);
SQueryNodeLoad
node
=
{
.
addr
=
pSubplan
->
execNode
,
.
load
=
0
};
SQueryNodeLoad
node
=
{.
addr
=
pSubplan
->
execNode
,
.
load
=
0
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
pSubplan
->
execNode
);
}
else
{
SQueryNodeLoad
node
=
{
.
addr
=
{.
nodeId
=
MNODE_HANDLE
,
.
epSet
=
pCxt
->
pPlanCxt
->
mgmtEpSet
},
.
load
=
0
};
SQueryNodeLoad
node
=
{.
addr
=
{.
nodeId
=
MNODE_HANDLE
,
.
epSet
=
pCxt
->
pPlanCxt
->
mgmtEpSet
},
.
load
=
0
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
node
);
}
pScan
->
mgmtEpSet
=
pCxt
->
pPlanCxt
->
mgmtEpSet
;
...
...
@@ -933,11 +933,22 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
return
code
;
}
static
ENodeType
getIntervalOperatorType
(
bool
streamQuery
,
EStreamIntervalAlgorithm
stmAlgo
)
{
if
(
streamQuery
)
{
return
STREAM_INTERVAL_ALGO_FINAL
==
stmAlgo
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
(
STREAM_INTERVAL_ALGO_SEMI
==
stmAlgo
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
);
}
else
{
return
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
;
}
}
static
int32_t
createIntervalPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SIntervalPhysiNode
*
pInterval
=
(
SIntervalPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pWindowLogicNode
,
(
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
));
getIntervalOperatorType
(
pCxt
->
pPlanCxt
->
streamQuery
,
pWindowLogicNode
->
stmInterAlgo
));
if
(
NULL
==
pInterval
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
6c5ec227
...
...
@@ -24,9 +24,10 @@
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef
struct
SSplitContext
{
uint64_t
queryId
;
int32_t
groupId
;
bool
split
;
SPlanContext
*
pPlanCxt
;
uint64_t
queryId
;
int32_t
groupId
;
bool
split
;
}
SSplitContext
;
typedef
int32_t
(
*
FSplit
)(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
);
...
...
@@ -36,7 +37,7 @@ typedef struct SSplitRule {
FSplit
splitFunc
;
}
SSplitRule
;
typedef
bool
(
*
FSplFindSplitNode
)(
SLogicSubplan
*
pSubplan
,
void
*
pInfo
);
typedef
bool
(
*
FSplFindSplitNode
)(
S
SplitContext
*
pCxt
,
S
LogicSubplan
*
pSubplan
,
void
*
pInfo
);
static
void
splSetSubplanVgroups
(
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
))
{
...
...
@@ -63,19 +64,29 @@ static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNod
return
pSubplan
;
}
static
int32_t
splCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pSplitNode
,
ESubplanType
subplanType
)
{
static
int32_t
splCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicNode
*
pChild
,
SExchangeLogicNode
**
pOutput
)
{
SExchangeLogicNode
*
pExchange
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pExchange
->
srcGroupId
=
pCxt
->
groupId
;
pExchange
->
node
.
precision
=
p
SplitNode
->
precision
;
pExchange
->
node
.
pTargets
=
nodesCloneList
(
p
SplitNode
->
pTargets
);
pExchange
->
node
.
precision
=
p
Child
->
precision
;
pExchange
->
node
.
pTargets
=
nodesCloneList
(
p
Child
->
pTargets
);
if
(
NULL
==
pExchange
->
node
.
pTargets
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pOutput
=
pExchange
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
splCreateExchangeNodeForSubplan
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pSplitNode
,
ESubplanType
subplanType
)
{
SExchangeLogicNode
*
pExchange
=
NULL
;
if
(
TSDB_CODE_SUCCESS
!=
splCreateExchangeNode
(
pCxt
,
pSplitNode
,
&
pExchange
))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pSubplan
->
subplanType
=
subplanType
;
if
(
NULL
==
pSplitNode
->
pParent
)
{
...
...
@@ -97,7 +108,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
static
bool
splMatch
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
flag
,
FSplFindSplitNode
func
,
void
*
pInfo
)
{
if
(
!
SPLIT_FLAG_TEST_MASK
(
pSubplan
->
splitFlag
,
flag
))
{
if
(
func
(
pSubplan
,
pInfo
))
{
if
(
func
(
p
Cxt
,
p
Subplan
,
pInfo
))
{
return
true
;
}
}
...
...
@@ -125,41 +136,47 @@ static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
return
false
;
}
static
bool
stbSplIsMultiTbScan
(
SScanLogicNode
*
pScan
)
{
return
(
NULL
!=
pScan
->
pVgroupList
&&
pScan
->
pVgroupList
->
numOfVgroups
>
1
);
static
bool
stbSplIsMultiTbScan
(
bool
streamQuery
,
SScanLogicNode
*
pScan
)
{
return
(
NULL
!=
pScan
->
pVgroupList
&&
pScan
->
pVgroupList
->
numOfVgroups
>
1
)
||
(
streamQuery
&&
TSDB_SUPER_TABLE
==
pScan
->
pMeta
->
tableType
);
}
static
bool
stbSplHasMultiTbScan
(
SLogicNode
*
pNode
)
{
static
bool
stbSplHasMultiTbScan
(
bool
streamQuery
,
SLogicNode
*
pNode
)
{
if
(
1
!=
LIST_LENGTH
(
pNode
->
pChildren
))
{
return
false
;
}
SNode
*
pChild
=
nodesListGetNode
(
pNode
->
pChildren
,
0
);
return
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pChild
)
&&
stbSplIsMultiTbScan
((
SScanLogicNode
*
)
pChild
));
return
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pChild
)
&&
stbSplIsMultiTbScan
(
streamQuery
,
(
SScanLogicNode
*
)
pChild
));
}
static
bool
stbSplNeedSplit
(
SLogicNode
*
pNode
)
{
static
bool
stbSplNeedSplit
(
bool
streamQuery
,
SLogicNode
*
pNode
)
{
switch
(
nodeType
(
pNode
))
{
// case QUERY_NODE_LOGIC_PLAN_AGG:
// return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
!
stbSplHasGatherExecFunc
(((
SAggLogicNode
*
)
pNode
)
->
pAggFuncs
)
&&
stbSplHasMultiTbScan
(
pNode
);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
{
SWindowLogicNode
*
pWindow
=
(
SWindowLogicNode
*
)
pNode
;
if
(
WINDOW_TYPE_INTERVAL
!=
pWindow
->
winType
)
{
return
false
;
}
return
!
stbSplHasGatherExecFunc
(
pWindow
->
pFuncs
)
&&
stbSplHasMultiTbScan
(
streamQuery
,
pNode
);
}
// case QUERY_NODE_LOGIC_PLAN_SORT:
// return stbSplHasMultiTbScan(pNode);
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
stbSplIsMultiTbScan
((
SScanLogicNode
*
)
pNode
);
return
stbSplIsMultiTbScan
(
streamQuery
,
(
SScanLogicNode
*
)
pNode
);
default:
break
;
}
return
false
;
}
static
SLogicNode
*
stbSplMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
stbSplNeedSplit
(
pNode
))
{
static
SLogicNode
*
stbSplMatchByNode
(
bool
streamQuery
,
SLogicNode
*
pNode
)
{
if
(
stbSplNeedSplit
(
streamQuery
,
pNode
))
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
stbSplMatchByNode
((
SLogicNode
*
)
pChild
);
SLogicNode
*
pSplitNode
=
stbSplMatchByNode
(
streamQuery
,
(
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
...
...
@@ -167,8 +184,8 @@ static SLogicNode* stbSplMatchByNode(SLogicNode* pNode) {
return
NULL
;
}
static
bool
stbSplFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
stbSplMatchByNode
(
pSubplan
->
pNode
);
static
bool
stbSplFindSplitNode
(
S
SplitContext
*
pCxt
,
S
LogicSubplan
*
pSubplan
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
stbSplMatchByNode
(
p
Cxt
->
pPlanCxt
->
streamQuery
,
p
Subplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pSplitNode
=
pSplitNode
;
pInfo
->
pSubplan
=
pSubplan
;
...
...
@@ -301,7 +318,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S
return
code
;
}
static
int32_t
stbSplSplitWindowNode
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
static
int32_t
stbSplSplitWindowNode
ForBatch
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pPartWindow
=
NULL
;
int32_t
code
=
stbSplCreatePartWindowNode
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
,
&
pPartWindow
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -315,8 +332,41 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf
return
code
;
}
static
int32_t
stbSplCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicNode
*
pParent
,
SLogicNode
*
pPartChild
)
{
SExchangeLogicNode
*
pExchange
=
NULL
;
int32_t
code
=
splCreateExchangeNode
(
pCxt
,
pPartChild
,
&
pExchange
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeAppend
(
&
pParent
->
pChildren
,
pExchange
);
}
return
code
;
}
static
int32_t
stbSplSplitWindowNodeForStream
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pPartWindow
=
NULL
;
int32_t
code
=
stbSplCreatePartWindowNode
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
,
&
pPartWindow
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
((
SWindowLogicNode
*
)
pPartWindow
)
->
stmInterAlgo
=
STREAM_INTERVAL_ALGO_SEMI
;
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
stmInterAlgo
=
STREAM_INTERVAL_ALGO_FINAL
;
code
=
stbSplCreateExchangeNode
(
pCxt
,
pInfo
->
pSplitNode
,
pPartWindow
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pInfo
->
pSubplan
->
pChildren
,
splCreateScanSubplan
(
pCxt
,
pPartWindow
,
SPLIT_FLAG_STABLE_SPLIT
));
}
pInfo
->
pSubplan
->
subplanType
=
SUBPLAN_TYPE_MERGE
;
return
code
;
}
static
int32_t
stbSplSplitWindowNode
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
if
(
pCxt
->
pPlanCxt
->
streamQuery
)
{
return
stbSplSplitWindowNodeForStream
(
pCxt
,
pInfo
);
}
else
{
return
stbSplSplitWindowNodeForBatch
(
pCxt
,
pInfo
);
}
}
static
int32_t
stbSplSplitScanNode
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
int32_t
code
=
splCreateExchangeNode
(
pCxt
,
pInfo
->
pSubplan
,
pInfo
->
pSplitNode
,
SUBPLAN_TYPE_MERGE
);
int32_t
code
=
splCreateExchangeNode
ForSubplan
(
pCxt
,
pInfo
->
pSubplan
,
pInfo
->
pSplitNode
,
SUBPLAN_TYPE_MERGE
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pInfo
->
pSubplan
->
pChildren
,
splCreateScanSubplan
(
pCxt
,
pInfo
->
pSplitNode
,
SPLIT_FLAG_STABLE_SPLIT
));
...
...
@@ -325,6 +375,10 @@ static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
}
static
int32_t
stableSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
if
(
pCxt
->
pPlanCxt
->
rSmaQuery
)
{
return
TSDB_CODE_SUCCESS
;
}
SStableSplitInfo
info
=
{
0
};
if
(
!
splMatch
(
pCxt
,
pSubplan
,
SPLIT_FLAG_STABLE_SPLIT
,
(
FSplFindSplitNode
)
stbSplFindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -375,7 +429,7 @@ static SJoinLogicNode* sigTbJoinSplMatchByNode(SLogicNode* pNode) {
return
NULL
;
}
static
bool
sigTbJoinSplFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SSigTbJoinSplitInfo
*
pInfo
)
{
static
bool
sigTbJoinSplFindSplitNode
(
S
SplitContext
*
pCxt
,
S
LogicSubplan
*
pSubplan
,
SSigTbJoinSplitInfo
*
pInfo
)
{
SJoinLogicNode
*
pJoin
=
sigTbJoinSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pJoin
)
{
pInfo
->
pJoin
=
pJoin
;
...
...
@@ -390,7 +444,7 @@ static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan
if
(
!
splMatch
(
pCxt
,
pSubplan
,
0
,
(
FSplFindSplitNode
)
sigTbJoinSplFindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
splCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pSplitNode
,
info
.
pSubplan
->
subplanType
);
int32_t
code
=
splCreateExchangeNode
ForSubplan
(
pCxt
,
info
.
pSubplan
,
info
.
pSplitNode
,
info
.
pSubplan
->
subplanType
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
info
.
pSubplan
->
pChildren
,
splCreateScanSubplan
(
pCxt
,
info
.
pSplitNode
,
0
));
}
...
...
@@ -489,7 +543,7 @@ static SLogicNode* unAllSplMatchByNode(SLogicNode* pNode) {
return
NULL
;
}
static
bool
unAllSplFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SUnionAllSplitInfo
*
pInfo
)
{
static
bool
unAllSplFindSplitNode
(
S
SplitContext
*
pCxt
,
S
LogicSubplan
*
pSubplan
,
SUnionAllSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
unAllSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pProject
=
(
SProjectLogicNode
*
)
pSplitNode
;
...
...
@@ -581,7 +635,7 @@ static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* p
return
nodesListMakeAppend
(
&
pAgg
->
node
.
pChildren
,
pExchange
);
}
static
bool
unDistSplFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SUnionDistinctSplitInfo
*
pInfo
)
{
static
bool
unDistSplFindSplitNode
(
S
SplitContext
*
pCxt
,
S
LogicSubplan
*
pSubplan
,
SUnionDistinctSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
unDistSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pAgg
=
(
SAggLogicNode
*
)
pSplitNode
;
...
...
@@ -623,9 +677,10 @@ static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
taosMemoryFree
(
pStr
);
}
static
int32_t
applySplitRule
(
SLogicSubplan
*
pSubplan
)
{
SSplitContext
cxt
=
{.
queryId
=
pSubplan
->
id
.
queryId
,
.
groupId
=
pSubplan
->
id
.
groupId
+
1
,
.
split
=
false
};
bool
split
=
false
;
static
int32_t
applySplitRule
(
SPlanContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
SSplitContext
cxt
=
{
.
pPlanCxt
=
pCxt
,
.
queryId
=
pSubplan
->
id
.
queryId
,
.
groupId
=
pSubplan
->
id
.
groupId
+
1
,
.
split
=
false
};
bool
split
=
false
;
do
{
split
=
false
;
for
(
int32_t
i
=
0
;
i
<
splitRuleNum
;
++
i
)
{
...
...
@@ -672,7 +727,7 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan
pSubplan
->
id
.
groupId
=
1
;
setLogicNodeParent
(
pSubplan
->
pNode
);
int32_t
code
=
applySplitRule
(
pSubplan
);
int32_t
code
=
applySplitRule
(
p
Cxt
,
p
Subplan
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pLogicSubplan
=
pSubplan
;
}
else
{
...
...
source/libs/planner/test/planOtherTest.cpp
浏览文件 @
6c5ec227
...
...
@@ -33,6 +33,12 @@ TEST_F(PlanOtherTest, createStream) {
"interval(10s)"
);
}
TEST_F
(
PlanOtherTest
,
createStreamUseSTable
)
{
useDb
(
"root"
,
"test"
);
run
(
"create stream if not exists s1 as select count(*) from st1 interval(10s)"
);
}
TEST_F
(
PlanOtherTest
,
createSmaIndex
)
{
useDb
(
"root"
,
"test"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录