Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ec5eeea7
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
ec5eeea7
编写于
3月 23, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
reorganize physical plan code
上级
a4bb0e66
变更
22
展开全部
隐藏空白更改
内联
并排
Showing
22 changed file
with
1226 addition
and
716 deletion
+1226
-716
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+2
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+13
-3
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-1
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+2
-2
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+138
-21
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+7
-2
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+13
-1
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+1
-1
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+1
-1
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+2
-2
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+1
-0
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+184
-185
source/libs/parser/test/parserAstTest.cpp
source/libs/parser/test/parserAstTest.cpp
+73
-21
source/libs/planner/inc/planInt.h
source/libs/planner/inc/planInt.h
+4
-3
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+38
-23
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+20
-0
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+416
-400
source/libs/planner/src/planScaleOut.c
source/libs/planner/src/planScaleOut.c
+203
-0
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+50
-8
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+14
-6
source/libs/planner/test/plannerTest.cpp
source/libs/planner/test/plannerTest.cpp
+42
-35
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
ec5eeea7
...
...
@@ -135,6 +135,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
,
QUERY_NODE_PHYSICAL_PLAN_INSERT
,
QUERY_NODE_PHYSICAL_SUBPLAN
,
...
...
@@ -169,6 +170,7 @@ void nodesDestroyNode(SNodeptr pNode);
SNodeList
*
nodesMakeList
();
int32_t
nodesListAppend
(
SNodeList
*
pList
,
SNodeptr
pNode
);
int32_t
nodesListStrictAppend
(
SNodeList
*
pList
,
SNodeptr
pNode
);
int32_t
nodesListMakeAppend
(
SNodeList
**
pList
,
SNodeptr
pNode
);
int32_t
nodesListAppendList
(
SNodeList
*
pTarget
,
SNodeList
*
pSrc
);
int32_t
nodesListStrictAppendList
(
SNodeList
*
pTarget
,
SNodeList
*
pSrc
);
SListCell
*
nodesListErase
(
SNodeList
*
pList
,
SListCell
*
pCell
);
...
...
include/libs/nodes/plannodes.h
浏览文件 @
ec5eeea7
...
...
@@ -95,6 +95,7 @@ typedef struct SWindowLogicNode {
int8_t
intervalUnit
;
int8_t
slidingUnit
;
SFillNode
*
pFill
;
int64_t
sessionGap
;
}
SWindowLogicNode
;
typedef
enum
ESubplanType
{
...
...
@@ -110,7 +111,7 @@ typedef struct SSubplanId {
int32_t
subplanId
;
}
SSubplanId
;
typedef
struct
S
SubLogicP
lan
{
typedef
struct
S
LogicSubp
lan
{
ENodeType
type
;
SSubplanId
id
;
SNodeList
*
pChildren
;
...
...
@@ -120,7 +121,7 @@ typedef struct SSubLogicPlan {
SVgroupsInfo
*
pVgroupList
;
int32_t
level
;
int32_t
splitFlag
;
}
S
SubLogicP
lan
;
}
S
LogicSubp
lan
;
typedef
struct
SQueryLogicPlan
{
ENodeType
type
;
...
...
@@ -213,10 +214,14 @@ typedef struct SExchangePhysiNode {
SNodeList
*
pSrcEndPoints
;
// element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
}
SExchangePhysiNode
;
typedef
struct
S
Interval
PhysiNode
{
typedef
struct
S
Winodw
PhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of parameter expression of function
SNodeList
*
pFuncs
;
}
SWinodwPhysiNode
;
typedef
struct
SIntervalPhysiNode
{
SWinodwPhysiNode
window
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
...
...
@@ -225,6 +230,11 @@ typedef struct SIntervalPhysiNode {
SFillNode
*
pFill
;
}
SIntervalPhysiNode
;
typedef
struct
SSessionWinodwPhysiNode
{
SWinodwPhysiNode
window
;
int64_t
gap
;
}
SSessionWinodwPhysiNode
;
typedef
struct
SDataSinkNode
{
ENodeType
type
;
SDataBlockDescNode
*
pInputDataBlockDesc
;
...
...
include/libs/nodes/querynodes.h
浏览文件 @
ec5eeea7
...
...
@@ -191,8 +191,8 @@ typedef struct SStateWindowNode {
typedef
struct
SSessionWindowNode
{
ENodeType
type
;
// QUERY_NODE_SESSION_WINDOW
int64_t
gap
;
// gap between two session window(in microseconds)
SNode
*
pCol
;
SNode
*
pGap
;
// gap between two session window(in microseconds)
}
SSessionWindowNode
;
typedef
struct
SIntervalWindowNode
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
ec5eeea7
...
...
@@ -8342,7 +8342,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SIntervalPhysiNode
*
pIntervalPhyNode
=
(
SIntervalPhysiNode
*
)
pPhyNode
;
int32_t
num
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
pFuncs
,
NULL
,
&
num
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
num
);
SSDataBlock
*
pResBlock
=
createOutputBuf_rv1
(
pPhyNode
->
pOutputDataBlockDesc
);
SInterval
interval
=
{.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
.
intervalUnit
=
'a'
,
.
slidingUnit
=
'a'
};
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
ec5eeea7
...
...
@@ -276,7 +276,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
return
(
SNode
*
)
pDst
;
}
static
SNode
*
logicSubplanCopy
(
const
S
SubLogicPlan
*
pSrc
,
SSubLogicP
lan
*
pDst
)
{
static
SNode
*
logicSubplanCopy
(
const
S
LogicSubplan
*
pSrc
,
SLogicSubp
lan
*
pDst
)
{
CLONE_NODE_FIELD
(
pNode
);
COPY_SCALAR_FIELD
(
subplanType
);
return
(
SNode
*
)
pDst
;
...
...
@@ -358,7 +358,7 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
logicWindowCopy
((
const
SWindowLogicNode
*
)
pNode
,
(
SWindowLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_SUBPLAN
:
return
logicSubplanCopy
((
const
S
SubLogicPlan
*
)
pNode
,
(
SSubLogicP
lan
*
)
pDst
);
return
logicSubplanCopy
((
const
S
LogicSubplan
*
)
pNode
,
(
SLogicSubp
lan
*
)
pDst
);
default:
break
;
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
ec5eeea7
...
...
@@ -70,6 +70,14 @@ const char* nodesNodeName(ENodeType type) {
return
"SlotDesc"
;
case
QUERY_NODE_COLUMN_DEF
:
return
"ColumnDef"
;
case
QUERY_NODE_DOWNSTREAM_SOURCE
:
return
"DownstreamSource"
;
case
QUERY_NODE_DATABASE_OPTIONS
:
return
"DatabaseOptions"
;
case
QUERY_NODE_TABLE_OPTIONS
:
return
"TableOptions"
;
case
QUERY_NODE_INDEX_OPTIONS
:
return
"IndexOptions"
;
case
QUERY_NODE_SET_OPERATOR
:
return
"SetOperator"
;
case
QUERY_NODE_SELECT_STMT
:
...
...
@@ -78,16 +86,76 @@ const char* nodesNodeName(ENodeType type) {
return
"VnodeModifStmt"
;
case
QUERY_NODE_CREATE_DATABASE_STMT
:
return
"CreateDatabaseStmt"
;
case
QUERY_NODE_DROP_DATABASE_STMT
:
return
"DropDatabaseStmt"
;
case
QUERY_NODE_ALTER_DATABASE_STMT
:
return
"AlterDatabaseStmt"
;
case
QUERY_NODE_CREATE_TABLE_STMT
:
return
"CreateTableStmt"
;
case
QUERY_NODE_CREATE_SUBTABLE_CLAUSE
:
return
"CreateSubtableClause"
;
case
QUERY_NODE_CREATE_MULTI_TABLE_STMT
:
return
"CreateMultiTableStmt"
;
case
QUERY_NODE_DROP_TABLE_CLAUSE
:
return
"DropTableClause"
;
case
QUERY_NODE_DROP_TABLE_STMT
:
return
"DropTableStmt"
;
case
QUERY_NODE_DROP_SUPER_TABLE_STMT
:
return
"DropSuperTableStmt"
;
case
QUERY_NODE_ALTER_TABLE_STMT
:
return
"AlterTableStmt"
;
case
QUERY_NODE_CREATE_USER_STMT
:
return
"CreateUserStmt"
;
case
QUERY_NODE_ALTER_USER_STMT
:
return
"AlterUserStmt"
;
case
QUERY_NODE_DROP_USER_STMT
:
return
"DropUserStmt"
;
case
QUERY_NODE_USE_DATABASE_STMT
:
return
"UseDatabaseStmt"
;
case
QUERY_NODE_CREATE_DNODE_STMT
:
return
"CreateDnodeStmt"
;
case
QUERY_NODE_DROP_DNODE_STMT
:
return
"DropDnodeStmt"
;
case
QUERY_NODE_ALTER_DNODE_STMT
:
return
"AlterDnodeStmt"
;
case
QUERY_NODE_CREATE_INDEX_STMT
:
return
"CreateIndexStmt"
;
case
QUERY_NODE_DROP_INDEX_STMT
:
return
"DropIndexStmt"
;
case
QUERY_NODE_CREATE_QNODE_STMT
:
return
"CreateQnodeStmt"
;
case
QUERY_NODE_DROP_QNODE_STMT
:
return
"DropQnodeStmt"
;
case
QUERY_NODE_CREATE_TOPIC_STMT
:
return
"CreateTopicStmt"
;
case
QUERY_NODE_DROP_TOPIC_STMT
:
return
"DropTopicStmt"
;
case
QUERY_NODE_ALTER_LOCAL_STMT
:
return
"AlterLocalStmt"
;
case
QUERY_NODE_SHOW_DATABASES_STMT
:
return
"ShowDatabaseStmt"
;
case
QUERY_NODE_SHOW_TABLES_STMT
:
return
"ShowTablesStmt"
;
case
QUERY_NODE_CREATE_TOPIC_STMT
:
return
"CreateTopicStmt"
;
case
QUERY_NODE_SHOW_STABLES_STMT
:
return
"ShowStablesStmt"
;
case
QUERY_NODE_SHOW_USERS_STMT
:
return
"ShowUsersStmt"
;
case
QUERY_NODE_SHOW_DNODES_STMT
:
return
"ShowDnodesStmt"
;
case
QUERY_NODE_SHOW_VGROUPS_STMT
:
return
"ShowVgroupsStmt"
;
case
QUERY_NODE_SHOW_MNODES_STMT
:
return
"ShowMnodesStmt"
;
case
QUERY_NODE_SHOW_MODULES_STMT
:
return
"ShowModulesStmt"
;
case
QUERY_NODE_SHOW_QNODES_STMT
:
return
"ShowQnodesStmt"
;
case
QUERY_NODE_SHOW_FUNCTIONS_STMT
:
return
"ShowFunctionsStmt"
;
case
QUERY_NODE_SHOW_INDEXES_STMT
:
return
"ShowIndexesStmt"
;
case
QUERY_NODE_SHOW_STREAMS_STMT
:
return
"ShowStreamsStmt"
;
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
"LogicScan"
;
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
...
...
@@ -98,6 +166,10 @@ const char* nodesNodeName(ENodeType type) {
return
"LogicProject"
;
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
:
return
"LogicVnodeModif"
;
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
"LogicExchange"
;
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
"LogicWindow"
;
case
QUERY_NODE_LOGIC_SUBPLAN
:
return
"LogicSubplan"
;
case
QUERY_NODE_LOGIC_PLAN
:
...
...
@@ -846,8 +918,37 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return
code
;
}
static
const
char
*
jkIntervalPhysiPlanExprs
=
"Exprs"
;
static
const
char
*
jkIntervalPhysiPlanFuncs
=
"Funcs"
;
static
const
char
*
jkWindowPhysiPlanExprs
=
"Exprs"
;
static
const
char
*
jkWindowPhysiPlanFuncs
=
"Funcs"
;
static
int32_t
physiWindowNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SWinodwPhysiNode
*
pNode
=
(
const
SWinodwPhysiNode
*
)
pObj
;
int32_t
code
=
physicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkWindowPhysiPlanExprs
,
pNode
->
pExprs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkWindowPhysiPlanFuncs
,
pNode
->
pFuncs
);
}
return
code
;
}
static
int32_t
jsonToPhysiWindowNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SWinodwPhysiNode
*
pNode
=
(
SWinodwPhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkWindowPhysiPlanExprs
,
&
pNode
->
pExprs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkWindowPhysiPlanFuncs
,
&
pNode
->
pFuncs
);
}
return
code
;
}
static
const
char
*
jkIntervalPhysiPlanInterval
=
"Interval"
;
static
const
char
*
jkIntervalPhysiPlanOffset
=
"Offset"
;
static
const
char
*
jkIntervalPhysiPlanSliding
=
"Sliding"
;
...
...
@@ -858,13 +959,7 @@ static const char* jkIntervalPhysiPlanFill = "Fill";
static
int32_t
physiIntervalNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SIntervalPhysiNode
*
pNode
=
(
const
SIntervalPhysiNode
*
)
pObj
;
int32_t
code
=
physicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkIntervalPhysiPlanExprs
,
pNode
->
pExprs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkIntervalPhysiPlanFuncs
,
pNode
->
pFuncs
);
}
int32_t
code
=
physiWindowNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkIntervalPhysiPlanInterval
,
pNode
->
interval
);
}
...
...
@@ -890,13 +985,7 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
static
int32_t
jsonToPhysiIntervalNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SIntervalPhysiNode
*
pNode
=
(
SIntervalPhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkIntervalPhysiPlanExprs
,
&
pNode
->
pExprs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkIntervalPhysiPlanFuncs
,
&
pNode
->
pFuncs
);
}
int32_t
code
=
jsonToPhysiWindowNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkIntervalPhysiPlanInterval
,
&
pNode
->
interval
);
}
...
...
@@ -919,6 +1008,30 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
return
code
;
}
static
const
char
*
jkSessionWindowPhysiPlanGap
=
"Gap"
;
static
int32_t
physiSessionWindowNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SSessionWinodwPhysiNode
*
pNode
=
(
const
SSessionWinodwPhysiNode
*
)
pObj
;
int32_t
code
=
physiWindowNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkSessionWindowPhysiPlanGap
,
pNode
->
gap
);
}
return
code
;
}
static
int32_t
jsonToPhysiSessionWindowNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SSessionWinodwPhysiNode
*
pNode
=
(
SSessionWinodwPhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysiWindowNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetNumberValue
(
pJson
,
jkSessionWindowPhysiPlanGap
,
pNode
->
gap
);
}
return
code
;
}
static
const
char
*
jkDataSinkInputDataBlockDesc
=
"InputDataBlockDesc"
;
static
int32_t
physicDataSinkNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
...
...
@@ -2066,6 +2179,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
break
;
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
return
physiIntervalNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:
return
physiSessionWindowNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
return
physiDispatchNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_INSERT
:
...
...
@@ -2075,7 +2190,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case
QUERY_NODE_PHYSICAL_PLAN
:
return
planToJson
(
pObj
,
pJson
);
default:
assert
(
0
);
//
assert(0);
break
;
}
nodesWarn
(
"specificNodeToJson unknown node = %s"
,
nodesNodeName
(
nodeType
(
pObj
)));
...
...
@@ -2149,14 +2264,16 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToPhysiAggNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
return
jsonToPhysiExchangeNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
return
jsonToPhysiIntervalNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:
return
jsonToPhysiSessionWindowNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
return
jsonToPhysiDispatchNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
jsonToSubplan
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN
:
return
jsonToPlan
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
return
jsonToPhysiIntervalNode
(
pJson
,
pObj
);
default:
assert
(
0
);
break
;
...
...
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
ec5eeea7
...
...
@@ -79,9 +79,14 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case
QUERY_NODE_STATE_WINDOW
:
res
=
walkNode
(((
SStateWindowNode
*
)
pNode
)
->
pCol
,
order
,
walker
,
pContext
);
break
;
case
QUERY_NODE_SESSION_WINDOW
:
res
=
walkNode
(((
SSessionWindowNode
*
)
pNode
)
->
pCol
,
order
,
walker
,
pContext
);
case
QUERY_NODE_SESSION_WINDOW
:
{
SSessionWindowNode
*
pSession
=
(
SSessionWindowNode
*
)
pNode
;
res
=
walkNode
(
pSession
->
pCol
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
res
=
walkNode
(
pSession
->
pGap
,
order
,
walker
,
pContext
);
}
break
;
}
case
QUERY_NODE_INTERVAL_WINDOW
:
{
SIntervalWindowNode
*
pInterval
=
(
SIntervalWindowNode
*
)
pNode
;
res
=
walkNode
(
pInterval
->
pInterval
,
order
,
walker
,
pContext
);
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
ec5eeea7
...
...
@@ -160,7 +160,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
makeNode
(
type
,
sizeof
(
SWindowLogicNode
));
case
QUERY_NODE_LOGIC_SUBPLAN
:
return
makeNode
(
type
,
sizeof
(
S
SubLogicP
lan
));
return
makeNode
(
type
,
sizeof
(
S
LogicSubp
lan
));
case
QUERY_NODE_LOGIC_PLAN
:
return
makeNode
(
type
,
sizeof
(
SQueryLogicPlan
));
case
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
:
...
...
@@ -332,6 +332,7 @@ int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) {
int32_t
nodesListStrictAppend
(
SNodeList
*
pList
,
SNodeptr
pNode
)
{
if
(
NULL
==
pNode
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
nodesListAppend
(
pList
,
pNode
);
...
...
@@ -341,6 +342,17 @@ int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode) {
return
code
;
}
int32_t
nodesListMakeAppend
(
SNodeList
**
pList
,
SNodeptr
pNode
)
{
if
(
NULL
==
*
pList
)
{
*
pList
=
nodesMakeList
();
if
(
NULL
==
*
pList
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
return
nodesListAppend
(
*
pList
,
pNode
);
}
int32_t
nodesListAppendList
(
SNodeList
*
pTarget
,
SNodeList
*
pSrc
)
{
if
(
NULL
==
pTarget
||
NULL
==
pSrc
)
{
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/parser/inc/parAst.h
浏览文件 @
ec5eeea7
...
...
@@ -100,7 +100,7 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
SNode
*
createJoinTableNode
(
SAstCreateContext
*
pCxt
,
EJoinType
type
,
SNode
*
pLeft
,
SNode
*
pRight
,
SNode
*
pJoinCond
);
SNode
*
createLimitNode
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pLimit
,
const
SToken
*
pOffset
);
SNode
*
createOrderByExprNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pExpr
,
EOrder
order
,
ENullOrder
nullOrder
);
SNode
*
createSessionWindowNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pCol
,
const
SToken
*
pVal
);
SNode
*
createSessionWindowNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pCol
,
SNode
*
pGap
);
SNode
*
createStateWindowNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pCol
);
SNode
*
createIntervalWindowNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pInterval
,
SNode
*
pOffset
,
SNode
*
pSliding
,
SNode
*
pFill
);
SNode
*
createFillNode
(
SAstCreateContext
*
pCxt
,
EFillMode
mode
,
SNode
*
pValues
);
...
...
source/libs/parser/inc/sql.y
浏览文件 @
ec5eeea7
...
...
@@ -624,7 +624,7 @@ partition_by_clause_opt(A) ::= PARTITION BY expression_list(B).
twindow_clause_opt(A) ::= . { A = NULL; }
twindow_clause_opt(A) ::=
SESSION NK_LP column_reference(B) NK_COMMA
NK_INTEGER(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B), &
C); }
SESSION NK_LP column_reference(B) NK_COMMA
duration_literal(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B),
C); }
twindow_clause_opt(A) ::= STATE_WINDOW NK_LP column_reference(B) NK_RP. { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B)); }
twindow_clause_opt(A) ::=
INTERVAL NK_LP duration_literal(B) NK_RP sliding_opt(C) fill_opt(D). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), NULL, C, D); }
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
ec5eeea7
...
...
@@ -679,11 +679,11 @@ SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order
return
(
SNode
*
)
orderByExpr
;
}
SNode
*
createSessionWindowNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pCol
,
const
SToken
*
pVal
)
{
SNode
*
createSessionWindowNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pCol
,
SNode
*
pGap
)
{
SSessionWindowNode
*
session
=
(
SSessionWindowNode
*
)
nodesMakeNode
(
QUERY_NODE_SESSION_WINDOW
);
CHECK_OUT_OF_MEM
(
session
);
session
->
pCol
=
pCol
;
// session->gap = getInteger(pVal)
;
session
->
pGap
=
pGap
;
return
(
SNode
*
)
session
;
}
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
ec5eeea7
...
...
@@ -71,6 +71,7 @@ int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...) {
va_start
(
vArgList
,
errCode
);
vsnprintf
(
pBuf
->
buf
,
pBuf
->
len
,
getSyntaxErrFormat
(
errCode
),
vArgList
);
va_end
(
vArgList
);
terrno
=
errCode
;
return
errCode
;
}
...
...
source/libs/parser/src/sql.c
浏览文件 @
ec5eeea7
此差异已折叠。
点击以展开。
source/libs/parser/test/parserAstTest.cpp
浏览文件 @
ec5eeea7
...
...
@@ -44,6 +44,9 @@ protected:
query_
=
nullptr
;
bool
res
=
runImpl
(
parseCode
,
translateCode
);
qDestroyQuery
(
query_
);
if
(
!
res
)
{
dump
();
}
return
res
;
}
...
...
@@ -53,26 +56,40 @@ private:
bool
runImpl
(
int32_t
parseCode
,
int32_t
translateCode
)
{
int32_t
code
=
doParse
(
&
cxt_
,
&
query_
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] parser code:"
<<
tstrerror
(
code
)
<<
", msg:"
<<
errMagBuf_
<<
endl
;
return
(
TSDB_CODE_SUCCESS
!
=
parseCode
);
parseErrStr_
=
string
(
"code:"
)
+
tstrerror
(
code
)
+
string
(
", msg:"
)
+
errMagBuf_
;
return
(
terrno
=
=
parseCode
);
}
if
(
TSDB_CODE_SUCCESS
!=
parseCode
)
{
return
false
;
}
string
parserStr
=
toString
(
query_
->
pRoot
);
parsedAstStr_
=
toString
(
query_
->
pRoot
);
code
=
doTranslate
(
&
cxt_
,
query_
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] translate code:"
<<
code
<<
", "
<<
translateCode
<<
", msg:"
<<
errMagBuf_
<<
endl
;
return
(
code
==
translateCode
);
translateErrStr_
=
string
(
"code:"
)
+
tstrerror
(
code
)
+
string
(
", msg:"
)
+
errMagBuf_
;
return
(
terrno
==
translateCode
);
}
cout
<<
"input sql : ["
<<
cxt_
.
pSql
<<
"]"
<<
endl
;
cout
<<
"parser output: "
<<
endl
;
cout
<<
parserStr
<<
endl
;
cout
<<
"translate output: "
<<
endl
;
cout
<<
toString
(
query_
->
pRoot
)
<<
endl
;
translatedAstStr_
=
toString
(
query_
->
pRoot
);
return
(
TSDB_CODE_SUCCESS
==
translateCode
);
}
void
dump
()
{
cout
<<
"input sql : ["
<<
cxt_
.
pSql
<<
"]"
<<
endl
;
if
(
!
parseErrStr_
.
empty
())
{
cout
<<
"parse error: "
<<
parseErrStr_
<<
endl
;
}
if
(
!
parsedAstStr_
.
empty
())
{
cout
<<
"parse output: "
<<
endl
;
cout
<<
parsedAstStr_
<<
endl
;
}
if
(
!
translateErrStr_
.
empty
())
{
cout
<<
"translate error: "
<<
translateErrStr_
<<
endl
;
}
if
(
!
translatedAstStr_
.
empty
())
{
cout
<<
"translate output: "
<<
endl
;
cout
<<
translatedAstStr_
<<
endl
;
}
}
string
toString
(
const
SNode
*
pRoot
,
bool
format
=
false
)
{
char
*
pStr
=
NULL
;
int32_t
len
=
0
;
...
...
@@ -91,6 +108,10 @@ private:
memset
(
errMagBuf_
,
0
,
max_err_len
);
cxt_
.
pMsg
=
errMagBuf_
;
cxt_
.
msgLen
=
max_err_len
;
parseErrStr_
.
clear
();
parsedAstStr_
.
clear
();
translateErrStr_
.
clear
();
translatedAstStr_
.
clear
();
}
string
acctId_
;
...
...
@@ -99,8 +120,50 @@ private:
string
sqlBuf_
;
SParseContext
cxt_
;
SQuery
*
query_
;
string
parseErrStr_
;
string
parsedAstStr_
;
string
translateErrStr_
;
string
translatedAstStr_
;
};
TEST_F
(
ParserTest
,
createAccount
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"create account ac_wxy pass '123456'"
);
ASSERT_TRUE
(
run
(
TSDB_CODE_PAR_EXPRIE_STATEMENT
));
}
TEST_F
(
ParserTest
,
alterAccount
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"alter account ac_wxy pass '123456'"
);
ASSERT_TRUE
(
run
(
TSDB_CODE_PAR_EXPRIE_STATEMENT
));
}
TEST_F
(
ParserTest
,
createUser
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"create user wxy pass '123456'"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
alterUser
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"alter user wxy pass '123456'"
);
ASSERT_TRUE
(
run
());
bind
(
"alter user wxy privilege 'write'"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
dropUser
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"drop user wxy"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
selectSimple
)
{
setDatabase
(
"root"
,
"test"
);
...
...
@@ -295,12 +358,6 @@ TEST_F(ParserTest, selectSemanticError) {
ASSERT_TRUE
(
run
(
TSDB_CODE_SUCCESS
,
TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION
));
}
TEST_F
(
ParserTest
,
createUser
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"create user wxy pass '123456'"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
showUsers
)
{
setDatabase
(
"root"
,
"test"
);
...
...
@@ -309,12 +366,7 @@ TEST_F(ParserTest, showUsers) {
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
alterAccount
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"alter account ac_wxy pass '123456'"
);
ASSERT_TRUE
(
run
(
TSDB_CODE_PAR_EXPRIE_STATEMENT
));
}
TEST_F
(
ParserTest
,
createDnode
)
{
setDatabase
(
"root"
,
"test"
);
...
...
source/libs/planner/inc/planInt.h
浏览文件 @
ec5eeea7
...
...
@@ -56,9 +56,10 @@ extern "C" {
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
int32_t
createLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
**
pLogicNode
);
int32_t
optimize
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
);
int32_t
applySplitRule
(
SSubLogicPlan
*
pSubplan
);
int32_t
createPhysiPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SQueryPlan
**
pPlan
,
SArray
*
pExecNodeList
);
int32_t
optimizeLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
);
int32_t
splitLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SLogicSubplan
**
pLogicSubplan
);
int32_t
scaleOutLogicPlan
(
SPlanContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
,
SQueryLogicPlan
**
pLogicPlan
);
int32_t
createPhysiPlan
(
SPlanContext
*
pCxt
,
SQueryLogicPlan
*
pLogicPlan
,
SQueryPlan
**
pPlan
,
SArray
*
pExecNodeList
);
#ifdef __cplusplus
}
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
ec5eeea7
...
...
@@ -411,6 +411,38 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
return
code
;
}
static
int32_t
createWindowLogicNodeFinalize
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SWindowLogicNode
*
pWindow
,
SLogicNode
**
pLogicNode
)
{
int32_t
code
=
nodesCollectFuncs
(
pSelect
,
fmIsAggFunc
,
&
pWindow
->
pFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteExpr
(
pWindow
->
pFuncs
,
pSelect
,
SQL_CLAUSE_WINDOW
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
pCxt
,
pWindow
->
pFuncs
,
&
pWindow
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pLogicNode
=
(
SLogicNode
*
)
pWindow
;
}
else
{
nodesDestroyNode
(
pWindow
);
}
return
code
;
}
static
int32_t
createWindowLogicNodeBySession
(
SLogicPlanContext
*
pCxt
,
SSessionWindowNode
*
pSession
,
SSelectStmt
*
pSelect
,
SLogicNode
**
pLogicNode
)
{
SWindowLogicNode
*
pWindow
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_WINDOW
);
if
(
NULL
==
pWindow
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pWindow
->
winType
=
WINDOW_TYPE_SESSION
;
pWindow
->
sessionGap
=
((
SValueNode
*
)
pSession
->
pGap
)
->
datum
.
i
;
return
createWindowLogicNodeFinalize
(
pCxt
,
pSelect
,
pWindow
,
pLogicNode
);
}
static
int32_t
createWindowLogicNodeByInterval
(
SLogicPlanContext
*
pCxt
,
SIntervalWindowNode
*
pInterval
,
SSelectStmt
*
pSelect
,
SLogicNode
**
pLogicNode
)
{
SWindowLogicNode
*
pWindow
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_WINDOW
);
if
(
NULL
==
pWindow
)
{
...
...
@@ -424,34 +456,15 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow
->
sliding
=
(
NULL
!=
pInterval
->
pSliding
?
((
SValueNode
*
)
pInterval
->
pSliding
)
->
datum
.
i
:
pWindow
->
interval
);
pWindow
->
slidingUnit
=
(
NULL
!=
pInterval
->
pSliding
?
((
SValueNode
*
)
pInterval
->
pSliding
)
->
unit
:
pWindow
->
intervalUnit
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
NULL
!=
pInterval
->
pFill
)
{
pWindow
->
pFill
=
nodesCloneNode
(
pInterval
->
pFill
);
if
(
NULL
==
pWindow
->
pFill
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
nodesDestroyNode
(
pWindow
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesCollectFuncs
(
pSelect
,
fmIsAggFunc
,
&
pWindow
->
pFuncs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteExpr
(
pWindow
->
pFuncs
,
pSelect
,
SQL_CLAUSE_WINDOW
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
pCxt
,
pWindow
->
pFuncs
,
&
pWindow
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pLogicNode
=
(
SLogicNode
*
)
pWindow
;
}
else
{
nodesDestroyNode
(
pWindow
);
}
return
code
;
return
createWindowLogicNodeFinalize
(
pCxt
,
pSelect
,
pWindow
,
pLogicNode
);
}
static
int32_t
createWindowLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SLogicNode
**
pLogicNode
)
{
...
...
@@ -460,8 +473,10 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
}
switch
(
nodeType
(
pSelect
->
pWindow
))
{
case
QUERY_NODE_SESSION_WINDOW
:
return
createWindowLogicNodeBySession
(
pCxt
,
(
SSessionWindowNode
*
)
pSelect
->
pWindow
,
pSelect
,
pLogicNode
);
case
QUERY_NODE_INTERVAL_WINDOW
:
return
createWindowLogicNodeByInterval
(
pCxt
,
(
SIntervalWindowNode
*
)
pSelect
->
pWindow
,
pSelect
,
pLogicNode
);
return
createWindowLogicNodeByInterval
(
pCxt
,
(
SIntervalWindowNode
*
)
pSelect
->
pWindow
,
pSelect
,
pLogicNode
);
default:
break
;
}
...
...
source/libs/planner/src/planOptimizer.c
0 → 100644
浏览文件 @
ec5eeea7
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planInt.h"
int32_t
optimizeLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
return
TSDB_CODE_SUCCESS
;
}
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
ec5eeea7
此差异已折叠。
点击以展开。
source/libs/planner/src/planScaleOut.c
0 → 100644
浏览文件 @
ec5eeea7
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planInt.h"
typedef
struct
SScaleOutContext
{
SPlanContext
*
pPlanCxt
;
int32_t
subplanId
;
}
SScaleOutContext
;
static
SLogicSubplan
*
singleCloneSubLogicPlan
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSrc
,
int32_t
level
)
{
SLogicSubplan
*
pDst
=
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
if
(
NULL
==
pDst
->
pNode
)
{
return
NULL
;
}
pDst
->
pNode
=
nodesCloneNode
(
pSrc
->
pNode
);
if
(
NULL
==
pDst
->
pNode
)
{
nodesDestroyNode
(
pDst
);
return
NULL
;
}
pDst
->
subplanType
=
pSrc
->
subplanType
;
pDst
->
level
=
level
;
pDst
->
id
.
queryId
=
pSrc
->
id
.
queryId
;
pDst
->
id
.
groupId
=
pSrc
->
id
.
groupId
;
pDst
->
id
.
subplanId
=
pCxt
->
subplanId
++
;
return
pDst
;
}
static
int32_t
scaleOutForModify
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
SVnodeModifLogicNode
*
pNode
=
(
SVnodeModifLogicNode
*
)
pSubplan
->
pNode
;
size_t
numOfVgroups
=
taosArrayGetSize
(
pNode
->
pDataBlocks
);
for
(
int32_t
i
=
0
;
i
<
numOfVgroups
;
++
i
)
{
SLogicSubplan
*
pNewSubplan
=
singleCloneSubLogicPlan
(
pCxt
,
pSubplan
,
level
);
if
(
NULL
==
pNewSubplan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
((
SVnodeModifLogicNode
*
)
pNewSubplan
->
pNode
)
->
pVgDataBlocks
=
(
SVgDataBlocks
*
)
taosArrayGetP
(
pNode
->
pDataBlocks
,
i
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
pGroup
,
pNewSubplan
))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
scaleOutForMerge
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
return
nodesListStrictAppend
(
pGroup
,
singleCloneSubLogicPlan
(
pCxt
,
pSubplan
,
level
));
}
static
int32_t
doSetScanVgroup
(
SLogicNode
*
pNode
,
const
SVgroupInfo
*
pVgroup
,
bool
*
pFound
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
))
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
pNode
;
pScan
->
pVgroupList
=
calloc
(
1
,
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SVgroupInfo
));
if
(
NULL
==
pScan
->
pVgroupList
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
memcpy
(
pScan
->
pVgroupList
->
vgroups
,
pVgroup
,
sizeof
(
SVgroupInfo
));
*
pFound
=
true
;
return
TSDB_CODE_SUCCESS
;
}
SNode
*
pChild
=
NULL
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
int32_t
code
=
doSetScanVgroup
((
SLogicNode
*
)
pChild
,
pVgroup
,
pFound
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
*
pFound
)
{
return
code
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
setScanVgroup
(
SLogicNode
*
pNode
,
const
SVgroupInfo
*
pVgroup
)
{
bool
found
=
false
;
return
doSetScanVgroup
(
pNode
,
pVgroup
,
&
found
);
}
static
int32_t
scaleOutForScan
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
level
,
SNodeList
*
pGroup
)
{
if
(
pSubplan
->
pVgroupList
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int32_t
i
=
0
;
i
<
pSubplan
->
pVgroupList
->
numOfVgroups
;
++
i
)
{
SLogicSubplan
*
pNewSubplan
=
singleCloneSubLogicPlan
(
pCxt
,
pSubplan
,
level
);
if
(
NULL
==
pNewSubplan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
setScanVgroup
(
pNewSubplan
->
pNode
,
pSubplan
->
pVgroupList
->
vgroups
+
i
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListStrictAppend
(
pGroup
,
pNewSubplan
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
return
code
;
}
else
{
return
scaleOutForMerge
(
pCxt
,
pSubplan
,
level
,
pGroup
);
}
}
static
int32_t
pushHierarchicalPlan
(
SNodeList
*
pParentsGroup
,
SNodeList
*
pCurrentGroup
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
bool
topLevel
=
(
0
==
LIST_LENGTH
(
pParentsGroup
));
SNode
*
pChild
=
NULL
;
FOREACH
(
pChild
,
pCurrentGroup
)
{
if
(
topLevel
)
{
code
=
nodesListAppend
(
pParentsGroup
,
pChild
);
}
else
{
SNode
*
pParent
=
NULL
;
FOREACH
(
pParent
,
pParentsGroup
)
{
code
=
nodesListMakeAppend
(
&
(((
SLogicSubplan
*
)
pParent
)
->
pChildren
),
pChild
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeAppend
(
&
(((
SLogicSubplan
*
)
pChild
)
->
pParents
),
pParent
);
}
}
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
return
code
;
}
static
int32_t
doScaleOut
(
SScaleOutContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
*
pLevel
,
SNodeList
*
pParentsGroup
)
{
SNodeList
*
pCurrentGroup
=
nodesMakeList
();
if
(
NULL
==
pCurrentGroup
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
switch
(
pSubplan
->
subplanType
)
{
case
SUBPLAN_TYPE_MERGE
:
code
=
scaleOutForMerge
(
pCxt
,
pSubplan
,
*
pLevel
,
pCurrentGroup
);
break
;
case
SUBPLAN_TYPE_SCAN
:
code
=
scaleOutForScan
(
pCxt
,
pSubplan
,
*
pLevel
,
pCurrentGroup
);
break
;
case
SUBPLAN_TYPE_MODIFY
:
code
=
scaleOutForModify
(
pCxt
,
pSubplan
,
*
pLevel
,
pCurrentGroup
);
break
;
default:
break
;
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
pushHierarchicalPlan
(
pParentsGroup
,
pCurrentGroup
);
++
(
*
pLevel
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
SNode
*
pChild
;
FOREACH
(
pChild
,
pSubplan
->
pChildren
)
{
code
=
doScaleOut
(
pCxt
,
(
SLogicSubplan
*
)
pChild
,
pLevel
,
pCurrentGroup
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyList
(
pCurrentGroup
);
}
return
code
;
}
static
SQueryLogicPlan
*
makeQueryLogicPlan
()
{
SQueryLogicPlan
*
pLogicPlan
=
(
SQueryLogicPlan
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN
);
if
(
NULL
==
pLogicPlan
)
{
return
NULL
;
}
pLogicPlan
->
pTopSubplans
=
nodesMakeList
();
if
(
NULL
==
pLogicPlan
->
pTopSubplans
)
{
nodesDestroyNode
(
pLogicPlan
);
return
NULL
;
}
return
pLogicPlan
;
}
int32_t
scaleOutLogicPlan
(
SPlanContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
,
SQueryLogicPlan
**
pLogicPlan
)
{
SQueryLogicPlan
*
pPlan
=
makeQueryLogicPlan
();
if
(
NULL
==
pPlan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SScaleOutContext
cxt
=
{
.
pPlanCxt
=
pCxt
,
.
subplanId
=
1
};
int32_t
code
=
doScaleOut
(
&
cxt
,
pLogicSubplan
,
&
(
pPlan
->
totalLevel
),
pPlan
->
pTopSubplans
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pLogicPlan
=
pPlan
;
}
else
{
nodesDestroyNode
(
pPlan
);
}
return
code
;
}
source/libs/planner/src/planSpliter.c
浏览文件 @
ec5eeea7
...
...
@@ -29,7 +29,7 @@ typedef struct SSplitContext {
void
*
pInfo
;
}
SSplitContext
;
typedef
int32_t
(
*
FMatch
)(
SSplitContext
*
pCxt
,
S
SubLogicP
lan
*
pSubplan
);
typedef
int32_t
(
*
FMatch
)(
SSplitContext
*
pCxt
,
S
LogicSubp
lan
*
pSubplan
);
typedef
int32_t
(
*
FSplit
)(
SSplitContext
*
pCxt
);
typedef
struct
SSplitRule
{
...
...
@@ -40,7 +40,7 @@ typedef struct SSplitRule {
typedef
struct
SStsInfo
{
SScanLogicNode
*
pScan
;
S
SubLogicP
lan
*
pSubplan
;
S
LogicSubp
lan
*
pSubplan
;
}
SStsInfo
;
static
SLogicNode
*
stsMatchByNode
(
SLogicNode
*
pNode
)
{
...
...
@@ -58,7 +58,7 @@ static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
return
NULL
;
}
static
int32_t
stsMatch
(
SSplitContext
*
pCxt
,
S
SubLogicP
lan
*
pSubplan
)
{
static
int32_t
stsMatch
(
SSplitContext
*
pCxt
,
S
LogicSubp
lan
*
pSubplan
)
{
if
(
SPLIT_FLAG_TEST_MASK
(
pSubplan
->
splitFlag
,
SPLIT_FLAG_STS
))
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -74,7 +74,7 @@ static int32_t stsMatch(SSplitContext* pCxt, SSubLogicPlan* pSubplan) {
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pSubplan
->
pChildren
)
{
int32_t
code
=
stsMatch
(
pCxt
,
(
S
SubLogicP
lan
*
)
pChild
);
int32_t
code
=
stsMatch
(
pCxt
,
(
S
LogicSubp
lan
*
)
pChild
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
pCxt
->
match
)
{
return
code
;
}
...
...
@@ -82,8 +82,8 @@ static int32_t stsMatch(SSplitContext* pCxt, SSubLogicPlan* pSubplan) {
return
TSDB_CODE_SUCCESS
;
}
static
S
SubLogicP
lan
*
stsCreateScanSubplan
(
SSplitContext
*
pCxt
,
SScanLogicNode
*
pScan
)
{
S
SubLogicP
lan
*
pSubplan
=
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
static
S
LogicSubp
lan
*
stsCreateScanSubplan
(
SSplitContext
*
pCxt
,
SScanLogicNode
*
pScan
)
{
S
LogicSubp
lan
*
pSubplan
=
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
if
(
NULL
==
pSubplan
)
{
return
NULL
;
}
...
...
@@ -95,7 +95,7 @@ static SSubLogicPlan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode*
return
pSubplan
;
}
static
int32_t
stsCreateExchangeNode
(
SSplitContext
*
pCxt
,
S
SubLogicP
lan
*
pSubplan
,
SScanLogicNode
*
pScan
)
{
static
int32_t
stsCreateExchangeNode
(
SSplitContext
*
pCxt
,
S
LogicSubp
lan
*
pSubplan
,
SScanLogicNode
*
pScan
)
{
SExchangeLogicNode
*
pExchange
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -145,7 +145,7 @@ static const SSplitRule splitRuleSet[] = {
static
const
int32_t
splitRuleNum
=
(
sizeof
(
splitRuleSet
)
/
sizeof
(
SSplitRule
));
int32_t
applySplitRule
(
SSubLogicP
lan
*
pSubplan
)
{
static
int32_t
applySplitRule
(
SLogicSubp
lan
*
pSubplan
)
{
SSplitContext
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
groupId
=
pSubplan
->
id
.
groupId
+
1
,
.
match
=
false
,
.
pInfo
=
NULL
};
bool
split
=
false
;
do
{
...
...
@@ -164,3 +164,45 @@ int32_t applySplitRule(SSubLogicPlan* pSubplan) {
}
while
(
split
);
return
TSDB_CODE_SUCCESS
;
}
static
void
doSetLogicNodeParent
(
SLogicNode
*
pNode
,
SLogicNode
*
pParent
)
{
pNode
->
pParent
=
pParent
;
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
doSetLogicNodeParent
((
SLogicNode
*
)
pChild
,
pNode
);
}
}
static
void
setLogicNodeParent
(
SLogicNode
*
pNode
)
{
doSetLogicNodeParent
(
pNode
,
NULL
);
}
int32_t
splitLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SLogicSubplan
**
pLogicSubplan
)
{
SLogicSubplan
*
pSubplan
=
(
SLogicSubplan
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
if
(
NULL
==
pSubplan
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pSubplan
->
pNode
=
nodesCloneNode
(
pLogicNode
);
if
(
NULL
==
pSubplan
->
pNode
)
{
nodesDestroyNode
(
pSubplan
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
==
nodeType
(
pLogicNode
))
{
pSubplan
->
subplanType
=
SUBPLAN_TYPE_MODIFY
;
TSWAP
(((
SVnodeModifLogicNode
*
)
pLogicNode
)
->
pDataBlocks
,
((
SVnodeModifLogicNode
*
)
pSubplan
->
pNode
)
->
pDataBlocks
,
SArray
*
);
}
else
{
pSubplan
->
subplanType
=
SUBPLAN_TYPE_SCAN
;
}
pSubplan
->
id
.
queryId
=
pCxt
->
queryId
;
setLogicNodeParent
(
pSubplan
->
pNode
);
int32_t
code
=
applySplitRule
(
pSubplan
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pLogicSubplan
=
pSubplan
;
}
else
{
nodesDestroyNode
(
pSubplan
);
}
return
code
;
}
\ No newline at end of file
source/libs/planner/src/planner.c
浏览文件 @
ec5eeea7
...
...
@@ -17,20 +17,28 @@
#include "planInt.h"
int32_t
optimize
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
qCreateQueryPlan
(
SPlanContext
*
pCxt
,
SQueryPlan
**
pPlan
,
SArray
*
pExecNodeList
)
{
SLogicNode
*
pLogicNode
=
NULL
;
SLogicSubplan
*
pLogicSubplan
=
NULL
;
SQueryLogicPlan
*
pLogicPlan
=
NULL
;
int32_t
code
=
createLogicPlan
(
pCxt
,
&
pLogicNode
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
optimize
(
pCxt
,
pLogicNode
);
code
=
optimizeLogicPlan
(
pCxt
,
pLogicNode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
splitLogicPlan
(
pCxt
,
pLogicNode
,
&
pLogicSubplan
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
scaleOutLogicPlan
(
pCxt
,
pLogicSubplan
,
&
pLogicPlan
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createPhysiPlan
(
pCxt
,
pLogic
Node
,
pPlan
,
pExecNodeList
);
code
=
createPhysiPlan
(
pCxt
,
pLogic
Plan
,
pPlan
,
pExecNodeList
);
}
nodesDestroyNode
(
pLogicNode
);
nodesDestroyNode
(
pLogicSubplan
);
nodesDestroyNode
(
pLogicPlan
);
return
code
;
}
...
...
source/libs/planner/test/plannerTest.cpp
浏览文件 @
ec5eeea7
...
...
@@ -26,11 +26,6 @@ using namespace testing;
class
PlannerTest
:
public
Test
{
protected:
enum
TestTarget
{
TEST_LOGIC_PLAN
,
TEST_PHYSICAL_PLAN
};
void
setDatabase
(
const
string
&
acctId
,
const
string
&
db
)
{
acctId_
=
acctId
;
db_
=
db
;
...
...
@@ -46,7 +41,7 @@ protected:
cxt_
.
pSql
=
sqlBuf_
.
c_str
();
}
bool
run
(
TestTarget
target
=
TEST_PHYSICAL_PLAN
)
{
bool
run
()
{
int32_t
code
=
qParseQuerySql
(
&
cxt_
,
&
query_
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -56,12 +51,12 @@ protected:
const
string
syntaxTreeStr
=
toString
(
query_
->
pRoot
,
false
);
SLogicNode
*
pLogic
Plan
=
nullptr
;
SLogicNode
*
pLogic
Node
=
nullptr
;
SPlanContext
cxt
=
{
.
queryId
=
1
,
.
acctId
=
0
};
setPlanContext
(
query_
,
&
cxt
);
code
=
createLogicPlan
(
&
cxt
,
&
pLogic
Plan
);
code
=
createLogicPlan
(
&
cxt
,
&
pLogic
Node
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"]
logic p
lan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"]
createLogicP
lan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
false
;
}
...
...
@@ -69,24 +64,37 @@ protected:
cout
<<
"syntax test : "
<<
endl
;
cout
<<
syntaxTreeStr
<<
endl
;
cout
<<
"unformatted logic plan : "
<<
endl
;
cout
<<
toString
((
const
SNode
*
)
pLogicPlan
,
false
)
<<
endl
;
if
(
TEST_PHYSICAL_PLAN
==
target
)
{
SQueryPlan
*
pPlan
=
nullptr
;
code
=
createPhysiPlan
(
&
cxt
,
pLogicPlan
,
&
pPlan
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] physical plan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
false
;
}
cout
<<
"unformatted physical plan : "
<<
endl
;
cout
<<
toString
((
const
SNode
*
)
pPlan
,
false
)
<<
endl
;
SNode
*
pNode
;
FOREACH
(
pNode
,
pPlan
->
pSubplans
)
{
SNode
*
pSubplan
;
FOREACH
(
pSubplan
,
((
SNodeListNode
*
)
pNode
)
->
pNodeList
)
{
cout
<<
"unformatted physical subplan : "
<<
endl
;
cout
<<
toString
(
pSubplan
,
false
)
<<
endl
;
}
cout
<<
toString
((
const
SNode
*
)
pLogicNode
,
false
)
<<
endl
;
SLogicSubplan
*
pLogicSubplan
=
nullptr
;
code
=
splitLogicPlan
(
&
cxt
,
pLogicNode
,
&
pLogicSubplan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] splitLogicPlan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
false
;
}
SQueryLogicPlan
*
pLogicPlan
=
NULL
;
code
=
scaleOutLogicPlan
(
&
cxt
,
pLogicSubplan
,
&
pLogicPlan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] createPhysiPlan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
false
;
}
SQueryPlan
*
pPlan
=
nullptr
;
code
=
createPhysiPlan
(
&
cxt
,
pLogicPlan
,
&
pPlan
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] createPhysiPlan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
false
;
}
cout
<<
"unformatted physical plan : "
<<
endl
;
cout
<<
toString
((
const
SNode
*
)
pPlan
,
false
)
<<
endl
;
SNode
*
pNode
;
FOREACH
(
pNode
,
pPlan
->
pSubplans
)
{
SNode
*
pSubplan
;
FOREACH
(
pSubplan
,
((
SNodeListNode
*
)
pNode
)
->
pNodeList
)
{
cout
<<
"unformatted physical subplan : "
<<
endl
;
cout
<<
toString
(
pSubplan
,
false
)
<<
endl
;
}
}
...
...
@@ -120,14 +128,6 @@ private:
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] toString code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
string
();
}
SNode
*
pNode
;
code
=
nodesStringToNode
(
pStr
,
&
pNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
pStr
);
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] toObject code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
string
();
}
nodesDestroyNode
(
pNode
);
string
str
(
pStr
);
tfree
(
pStr
);
return
str
;
...
...
@@ -185,6 +185,13 @@ TEST_F(PlannerTest, interval) {
ASSERT_TRUE
(
run
());
}
TEST_F
(
PlannerTest
,
sessionWindow
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"SELECT count(*) FROM t1 session(ts, 10s)"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
PlannerTest
,
showTables
)
{
setDatabase
(
"root"
,
"test"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录