Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
927a2b37
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
927a2b37
编写于
12月 18, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
12月 18, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9184 from taosdata/feature/3.0_wxy
TD-12194 Improve the planner interface.
上级
51490bdb
1afa3de4
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
221 addition
and
16 deletion
+221
-16
include/libs/planner/planner.h
include/libs/planner/planner.h
+7
-2
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+2
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+17
-13
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+191
-0
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+4
-0
未找到文件。
include/libs/planner/planner.h
浏览文件 @
927a2b37
...
...
@@ -108,7 +108,7 @@ typedef struct SProjectPhyNode {
typedef
struct
SExchangePhyNode
{
SPhyNode
node
;
uint64_t
srcTemplateId
;
// template id of datasource suplans
SArray
*
pS
ourceEpSet
;
// SEpSet
, scheduler fill by calling qSetSuplanExecutionNode
SArray
*
pS
rcEndPoints
;
// SEpAddrMsg
, scheduler fill by calling qSetSuplanExecutionNode
}
SExchangePhyNode
;
typedef
struct
SSubplanId
{
...
...
@@ -129,6 +129,7 @@ typedef struct SSubplan {
typedef
struct
SQueryDag
{
uint64_t
queryId
;
int32_t
numOfSubplans
;
SArray
*
pSubplans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
SQueryDag
;
...
...
@@ -137,7 +138,11 @@ typedef struct SQueryDag {
*/
int32_t
qCreateQueryDag
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
);
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
SArray
*
eps
);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @eps Execution location of this group of datasource subplans, is an array of SEpAddr structures
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SArray
*
eps
);
int32_t
qExplainQuery
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
927a2b37
...
...
@@ -100,8 +100,9 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
);
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SArray
*
eps
);
int32_t
subPlanToString
(
const
SSubplan
*
pPhyNode
,
char
**
str
);
int32_t
stringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
);
/**
* Destroy the query plan object.
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
927a2b37
...
...
@@ -19,6 +19,13 @@
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _
typedef
struct
SPlanContext
{
struct
SCatalog
*
pCatalog
;
struct
SQueryDag
*
pDag
;
SSubplan
*
pCurrentSubplan
;
SSubplanId
nextId
;
}
SPlanContext
;
static
const
char
*
gOpName
[]
=
{
"Unknown"
,
#define INCLUDE_AS_NAME
...
...
@@ -26,12 +33,14 @@ static const char* gOpName[] = {
#undef INCLUDE_AS_NAME
};
typedef
struct
SPlanContext
{
struct
SCatalog
*
pCatalog
;
struct
SQueryDag
*
pDag
;
SSubplan
*
pCurrentSubplan
;
SSubplanId
nextId
;
}
SPlanContext
;
int32_t
opNameToOpType
(
const
char
*
name
)
{
for
(
int32_t
i
=
1
;
i
<
sizeof
(
gOpName
)
/
sizeof
(
gOpName
[
0
]);
++
i
)
{
if
(
strcmp
(
name
,
gOpName
[
i
]))
{
return
i
;
}
}
return
OP_Unknown
;
}
static
void
toDataBlockSchema
(
SQueryPlanNode
*
pPlanNode
,
SDataBlockSchema
*
dataBlockSchema
)
{
SWAP
(
dataBlockSchema
->
pSchema
,
pPlanNode
->
pSchema
,
SSchema
*
);
...
...
@@ -216,11 +225,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return
TSDB_CODE_SUCCESS
;
}
int32_t
opNameToOpType
(
const
char
*
name
)
{
for
(
int32_t
i
=
1
;
i
<
sizeof
(
gOpName
)
/
sizeof
(
gOpName
[
0
]);
++
i
)
{
if
(
strcmp
(
name
,
gOpName
[
i
]))
{
return
i
;
}
}
return
OP_Unknown
;
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SArray
*
eps
)
{
//todo
}
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
927a2b37
...
...
@@ -463,6 +463,191 @@ static bool exprInfoFromJson(const cJSON* json, void* obj) {
return
res
;
}
static
const
char
*
jkTimeWindowStartKey
=
"StartKey"
;
static
const
char
*
jkTimeWindowEndKey
=
"EndKey"
;
static
bool
timeWindowToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
STimeWindow
*
win
=
(
const
STimeWindow
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkTimeWindowStartKey
,
win
->
skey
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkTimeWindowEndKey
,
win
->
ekey
);
}
return
res
;
}
static
bool
timeWindowFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
STimeWindow
*
win
=
(
STimeWindow
*
)
obj
;
win
->
skey
=
getNumber
(
json
,
jkTimeWindowStartKey
);
win
->
ekey
=
getNumber
(
json
,
jkTimeWindowEndKey
);
return
true
;
}
static
const
char
*
jkScanNodeTableId
=
"TableId"
;
static
const
char
*
jkScanNodeTableType
=
"TableType"
;
static
bool
scanNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SScanPhyNode
*
scan
=
(
const
SScanPhyNode
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableId
,
scan
->
uid
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableType
,
scan
->
tableType
);
}
return
res
;
}
static
bool
scanNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SScanPhyNode
*
scan
=
(
SScanPhyNode
*
)
obj
;
scan
->
uid
=
getNumber
(
json
,
jkScanNodeTableId
);
scan
->
tableType
=
getNumber
(
json
,
jkScanNodeTableType
);
return
true
;
}
static
const
char
*
jkTableScanNodeFlag
=
"Flag"
;
static
const
char
*
jkTableScanNodeWindow
=
"Window"
;
static
const
char
*
jkTableScanNodeTagsConditions
=
"TagsConditions"
;
static
bool
tableScanNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
STableScanPhyNode
*
scan
=
(
const
STableScanPhyNode
*
)
obj
;
bool
res
=
scanNodeToJson
(
obj
,
json
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkTableScanNodeFlag
,
scan
->
scanFlag
);
}
if
(
res
)
{
res
=
addObject
(
json
,
jkTableScanNodeWindow
,
timeWindowToJson
,
&
scan
->
window
);
}
if
(
res
)
{
res
=
addArray
(
json
,
jkTableScanNodeTagsConditions
,
exprInfoToJson
,
scan
->
pTagsConditions
);
}
return
res
;
}
static
bool
tableScanNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
STableScanPhyNode
*
scan
=
(
STableScanPhyNode
*
)
obj
;
bool
res
=
scanNodeFromJson
(
json
,
obj
);
if
(
res
)
{
scan
->
scanFlag
=
getNumber
(
json
,
jkTableScanNodeFlag
);
}
if
(
res
)
{
res
=
fromObject
(
json
,
jkTableScanNodeWindow
,
timeWindowFromJson
,
&
scan
->
window
,
true
);
}
if
(
res
)
{
res
=
fromArray
(
json
,
jkTableScanNodeTagsConditions
,
exprInfoFromJson
,
&
scan
->
pTagsConditions
,
sizeof
(
SExprInfo
));
}
return
res
;
}
static
const
char
*
jkEpAddrFqdn
=
"Fqdn"
;
static
const
char
*
jkEpAddrPort
=
"Port"
;
static
bool
epAddrToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SEpAddrMsg
*
ep
=
(
const
SEpAddrMsg
*
)
obj
;
bool
res
=
cJSON_AddStringToObject
(
json
,
jkEpAddrFqdn
,
ep
->
fqdn
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkEpAddrPort
,
ep
->
port
);
}
return
res
;
}
static
bool
epAddrFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SEpAddrMsg
*
ep
=
(
SEpAddrMsg
*
)
obj
;
copyString
(
json
,
jkEpAddrFqdn
,
ep
->
fqdn
);
ep
->
port
=
getNumber
(
json
,
jkEpAddrPort
);
return
true
;
}
static
const
char
*
jkExchangeNodeSrcTemplateId
=
"SrcTemplateId"
;
static
const
char
*
jkExchangeNodeSrcEndPoints
=
"SrcEndPoints"
;
static
bool
exchangeNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SExchangePhyNode
*
exchange
=
(
const
SExchangePhyNode
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkExchangeNodeSrcTemplateId
,
exchange
->
srcTemplateId
);
if
(
res
)
{
res
=
addArray
(
json
,
jkExchangeNodeSrcEndPoints
,
epAddrToJson
,
exchange
->
pSrcEndPoints
);
}
return
res
;
}
static
bool
exchangeNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SExchangePhyNode
*
exchange
=
(
SExchangePhyNode
*
)
obj
;
exchange
->
srcTemplateId
=
getNumber
(
json
,
jkExchangeNodeSrcTemplateId
);
return
fromArray
(
json
,
jkExchangeNodeSrcEndPoints
,
epAddrFromJson
,
&
exchange
->
pSrcEndPoints
,
sizeof
(
SEpAddrMsg
));
}
static
bool
specificPhyNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SPhyNode
*
phyNode
=
(
const
SPhyNode
*
)
obj
;
switch
(
phyNode
->
info
.
type
)
{
case
OP_TableScan
:
case
OP_DataBlocksOptScan
:
case
OP_TableSeqScan
:
return
tableScanNodeToJson
(
obj
,
json
);
case
OP_TagScan
:
case
OP_SystemTableScan
:
return
scanNodeToJson
(
obj
,
json
);
case
OP_Aggregate
:
break
;
// todo
case
OP_Project
:
return
true
;
case
OP_Groupby
:
case
OP_Limit
:
case
OP_SLimit
:
case
OP_TimeWindow
:
case
OP_SessionWindow
:
case
OP_StateWindow
:
case
OP_Fill
:
case
OP_MultiTableAggregate
:
case
OP_MultiTableTimeInterval
:
case
OP_Filter
:
case
OP_Distinct
:
case
OP_Join
:
case
OP_AllTimeWindow
:
case
OP_AllMultiTableTimeInterval
:
case
OP_Order
:
break
;
// todo
case
OP_Exchange
:
return
exchangeNodeToJson
(
obj
,
json
);
default:
break
;
}
return
false
;
}
static
bool
specificPhyNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SPhyNode
*
phyNode
=
(
SPhyNode
*
)
obj
;
switch
(
phyNode
->
info
.
type
)
{
case
OP_TableScan
:
case
OP_DataBlocksOptScan
:
case
OP_TableSeqScan
:
return
tableScanNodeFromJson
(
json
,
obj
);
case
OP_TagScan
:
case
OP_SystemTableScan
:
return
scanNodeFromJson
(
json
,
obj
);
case
OP_Aggregate
:
break
;
// todo
case
OP_Project
:
return
true
;
case
OP_Groupby
:
case
OP_Limit
:
case
OP_SLimit
:
case
OP_TimeWindow
:
case
OP_SessionWindow
:
case
OP_StateWindow
:
case
OP_Fill
:
case
OP_MultiTableAggregate
:
case
OP_MultiTableTimeInterval
:
case
OP_Filter
:
case
OP_Distinct
:
case
OP_Join
:
case
OP_AllTimeWindow
:
case
OP_AllMultiTableTimeInterval
:
case
OP_Order
:
break
;
// todo
case
OP_Exchange
:
return
exchangeNodeFromJson
(
json
,
obj
);
default:
break
;
}
return
false
;
}
static
const
char
*
jkPnodeName
=
"Name"
;
static
const
char
*
jkPnodeTargets
=
"Targets"
;
static
const
char
*
jkPnodeConditions
=
"Conditions"
;
...
...
@@ -484,6 +669,9 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) {
if
(
res
)
{
res
=
addArray
(
jNode
,
jkPnodeChildren
,
phyNodeToJson
,
phyNode
->
pChildren
);
}
if
(
res
)
{
res
=
addObject
(
jNode
,
phyNode
->
info
.
name
,
specificPhyNodeToJson
,
phyNode
);
}
return
res
;
}
...
...
@@ -501,6 +689,9 @@ static bool phyNodeFromJson(const cJSON* json, void* obj) {
if
(
res
)
{
res
=
fromArray
(
json
,
jkPnodeChildren
,
phyNodeFromJson
,
&
node
->
pChildren
,
sizeof
(
SSlotSchema
));
}
if
(
res
)
{
res
=
fromObject
(
json
,
node
->
info
.
name
,
specificPhyNodeFromJson
,
node
,
true
);
}
return
res
;
}
...
...
source/libs/planner/src/planner.c
浏览文件 @
927a2b37
...
...
@@ -46,6 +46,10 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
return
TSDB_CODE_SUCCESS
;
}
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SArray
*
eps
)
{
return
setSubplanExecutionNode
(
subplan
,
templateId
,
eps
);
}
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
)
{
return
subPlanToString
(
subplan
,
str
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录