Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2081bf76
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2081bf76
编写于
12月 16, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into catalog_dev
上级
3b872546
021d6997
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
342 addition
and
158 deletion
+342
-158
include/libs/parser/parser.h
include/libs/parser/parser.h
+7
-5
include/libs/planner/planner.h
include/libs/planner/planner.h
+73
-32
include/libs/planner/plannerOp.h
include/libs/planner/plannerOp.h
+48
-0
include/util/tarray.h
include/util/tarray.h
+1
-1
source/libs/parser/src/insertParser.c
source/libs/parser/src/insertParser.c
+17
-19
source/libs/parser/test/CMakeLists.txt
source/libs/parser/test/CMakeLists.txt
+7
-4
source/libs/parser/test/insertTest.cpp
source/libs/parser/test/insertTest.cpp
+21
-0
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+21
-60
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+135
-11
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+10
-24
source/libs/scheduler/CMakeLists.txt
source/libs/scheduler/CMakeLists.txt
+1
-1
source/util/src/tarray.c
source/util/src/tarray.c
+1
-1
未找到文件。
include/libs/parser/parser.h
浏览文件 @
2081bf76
...
...
@@ -132,13 +132,15 @@ struct SInsertStmtInfo;
bool
qIsInsertSql
(
const
char
*
pStr
,
size_t
length
);
typedef
struct
SParseContext
{
const
char
*
pSql
;
// sql string
size_t
sqlLen
;
// length of the sql string
int64_t
id
;
// operator id, generated by uuid generator
const
char
*
pAcctId
;
const
char
*
pDbname
;
const
SEpSet
*
pEpSet
;
void
*
pRpc
;
const
char
*
pClusterId
;
const
SEpSet
*
pEpSet
;
int64_t
id
;
// query id, generated by uuid generator
int8_t
schemaAttached
;
// denote if submit block is built with table schema or not
const
char
*
pSql
;
// sql string
size_t
sqlLen
;
// length of the sql string
char
*
pMsg
;
// extended error message if exists to help avoid the problem in sql statement.
int32_t
msgLen
;
// max length of the msg
}
SParseContext
;
...
...
include/libs/planner/planner.h
浏览文件 @
2081bf76
...
...
@@ -20,52 +20,92 @@
extern
"C"
{
#endif
#include "taosmsg.h"
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
enum
OPERATOR_TYPE_E
{
OP_TableScan
=
1
,
OP_DataBlocksOptScan
=
2
,
OP_TableSeqScan
=
3
,
OP_TagScan
=
4
,
OP_TableBlockInfoScan
=
5
,
OP_Aggregate
=
6
,
OP_Project
=
7
,
OP_Groupby
=
8
,
OP_Limit
=
9
,
OP_SLimit
=
10
,
OP_TimeWindow
=
11
,
OP_SessionWindow
=
12
,
OP_StateWindow
=
22
,
OP_Fill
=
13
,
OP_MultiTableAggregate
=
14
,
OP_MultiTableTimeInterval
=
15
,
// OP_DummyInput = 16, //TODO remove it after fully refactor.
// OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
// OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter
=
19
,
OP_Distinct
=
20
,
OP_Join
=
21
,
OP_AllTimeWindow
=
23
,
OP_AllMultiTableTimeInterval
=
24
,
OP_Order
=
25
,
OP_Exchange
=
26
,
OP_Unknown
,
#define INCLUDE_AS_ENUM
#include "plannerOp.h"
#undef INCLUDE_AS_ENUM
OP_TotalNum
};
struct
SEpSet
;
struct
SQueryPlanNode
;
struct
SPhyNode
;
struct
SQueryStmtInfo
;
typedef
SSchema
SSlotSchema
;
typedef
struct
SDataBlockSchema
{
SSlotSchema
*
pSchema
;
int32_t
numOfCols
;
// number of columns
}
SDataBlockSchema
;
typedef
struct
SQueryNodeBasicInfo
{
int32_t
type
;
// operator type
const
char
*
name
;
// operator name
}
SQueryNodeBasicInfo
;
typedef
struct
SPhyNode
{
SQueryNodeBasicInfo
info
;
SArray
*
pTargets
;
// target list to be computed or scanned at this node
SArray
*
pConditions
;
// implicitly-ANDed qual conditions
SDataBlockSchema
targetSchema
;
// children plan to generated result for current node to process
// in case of join, multiple plan nodes exist.
SArray
*
pChildren
;
struct
SPhyNode
*
pParent
;
}
SPhyNode
;
typedef
struct
SScanPhyNode
{
SPhyNode
node
;
uint64_t
uid
;
// unique id of the table
int8_t
tableType
;
}
SScanPhyNode
;
typedef
SScanPhyNode
SSystemTableScanPhyNode
;
typedef
SScanPhyNode
STagScanPhyNode
;
typedef
struct
STableScanPhyNode
{
SScanPhyNode
scan
;
uint8_t
scanFlag
;
// denotes reversed scan of data or not
STimeWindow
window
;
SArray
*
pTagsConditions
;
// implicitly-ANDed tag qual conditions
}
STableScanPhyNode
;
typedef
STableScanPhyNode
STableSeqScanPhyNode
;
typedef
struct
SProjectPhyNode
{
SPhyNode
node
;
}
SProjectPhyNode
;
typedef
struct
SExchangePhyNode
{
SPhyNode
node
;
uint64_t
templateId
;
SArray
*
pSourceEpSet
;
// SEpSet
}
SExchangePhyNode
;
typedef
struct
SSubplanId
{
uint64_t
queryId
;
uint64_t
templateId
;
uint64_t
subplanId
;
}
SSubplanId
;
typedef
struct
SSubplan
{
int32_t
type
;
// QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
SArray
*
pDatasource
;
// the datasource subplan,from which to fetch the result
struct
SPhyNode
*
pNode
;
// physical plan of current subplan
SSubplanId
id
;
// unique id of the subplan
int32_t
type
;
// QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
int32_t
level
;
// the execution level of current subplan, starting from 0.
SEpSet
execEpSet
;
// for the scan sub plan, the optional execution node
SArray
*
pChildern
;
// the datasource subplan,from which to fetch the result
SArray
*
pParents
;
// the data destination subplan, get data from current subplan
SPhyNode
*
pNode
;
// physical plan of current subplan
}
SSubplan
;
typedef
struct
SQueryDag
{
SArray
*
*
pSubplans
;
SArray
*
pSubplans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
SQueryDag
;
/**
...
...
@@ -75,6 +115,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
int32_t
qExplainQuery
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
/**
* Convert to subplan to string for the scheduler to send to the executor
*/
...
...
include/libs/planner/plannerOp.h
0 → 100644
浏览文件 @
2081bf76
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if defined(INCLUDE_AS_ENUM) // enum define mode
#undef OP_ENUM_MACRO
#define OP_ENUM_MACRO(op) OP_##op,
#elif defined(INCLUDE_AS_NAME) // comment define mode
#undef OP_ENUM_MACRO
#define OP_ENUM_MACRO(op) #op,
#else
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
#endif
OP_ENUM_MACRO
(
TableScan
)
OP_ENUM_MACRO
(
DataBlocksOptScan
)
OP_ENUM_MACRO
(
TableSeqScan
)
OP_ENUM_MACRO
(
TagScan
)
OP_ENUM_MACRO
(
TableBlockInfoScan
)
OP_ENUM_MACRO
(
Aggregate
)
OP_ENUM_MACRO
(
Project
)
OP_ENUM_MACRO
(
Groupby
)
OP_ENUM_MACRO
(
Limit
)
OP_ENUM_MACRO
(
SLimit
)
OP_ENUM_MACRO
(
TimeWindow
)
OP_ENUM_MACRO
(
SessionWindow
)
OP_ENUM_MACRO
(
StateWindow
)
OP_ENUM_MACRO
(
Fill
)
OP_ENUM_MACRO
(
MultiTableAggregate
)
OP_ENUM_MACRO
(
MultiTableTimeInterval
)
OP_ENUM_MACRO
(
Filter
)
OP_ENUM_MACRO
(
Distinct
)
OP_ENUM_MACRO
(
Join
)
OP_ENUM_MACRO
(
AllTimeWindow
)
OP_ENUM_MACRO
(
AllMultiTableTimeInterval
)
OP_ENUM_MACRO
(
Order
)
OP_ENUM_MACRO
(
Exchange
)
include/util/tarray.h
浏览文件 @
2081bf76
...
...
@@ -41,7 +41,7 @@ typedef struct SArray {
* @param elemSize
* @return
*/
void
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
);
SArray
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
);
/**
*
...
...
source/libs/parser/src/insertParser.c
浏览文件 @
2081bf76
...
...
@@ -71,8 +71,7 @@ typedef struct SInsertParseContext {
const
char
*
pSql
;
SMsgBuf
msg
;
struct
SCatalog
*
pCatalog
;
SMetaData
meta
;
// need release
const
STableMeta
*
pTableMeta
;
STableMeta
*
pTableMeta
;
SHashObj
*
pTableBlockHashObj
;
// data block for each table. need release
int32_t
totalNum
;
SInsertStmtInfo
*
pOutput
;
...
...
@@ -165,29 +164,29 @@ static int32_t skipInsertInto(SInsertParseContext* pCxt) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
build
TableName
(
SInsertParseContext
*
pCxt
,
SToken
*
pStname
,
SArray
*
tableNameList
)
{
static
int32_t
build
Name
(
SInsertParseContext
*
pCxt
,
SToken
*
pStname
,
char
*
fullDbName
,
char
*
tableName
)
{
if
(
parserValidateIdToken
(
pStname
)
!=
TSDB_CODE_SUCCESS
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"invalid table name"
,
pStname
->
z
);
}
SName
name
=
{
0
};
strcpy
(
name
.
dbname
,
pCxt
->
pComCxt
->
pDbname
);
strncpy
(
name
.
tname
,
pStname
->
z
,
pStname
->
n
);
taosArrayPush
(
tableNameList
,
&
name
);
char
*
p
=
strnchr
(
pStname
->
z
,
TS_PATH_DELIMITER
[
0
],
pStname
->
n
,
false
);
if
(
NULL
!=
p
)
{
// db.table
strcpy
(
fullDbName
,
pCxt
->
pComCxt
->
pAcctId
);
fullDbName
[
strlen
(
pCxt
->
pComCxt
->
pAcctId
)]
=
TS_PATH_DELIMITER
[
0
];
strncpy
(
fullDbName
,
pStname
->
z
,
p
-
pStname
->
z
);
strncpy
(
tableName
,
p
+
1
,
pStname
->
n
-
(
p
-
pStname
->
z
)
-
1
);
}
else
{
snprintf
(
fullDbName
,
TSDB_FULL_DB_NAME_LEN
,
"%s.%s"
,
pCxt
->
pComCxt
->
pAcctId
,
pCxt
->
pComCxt
->
pDbname
);
strncpy
(
tableName
,
pStname
->
z
,
pStname
->
n
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildMetaReq
(
SInsertParseContext
*
pCxt
,
SToken
*
pStname
,
SCatalogReq
*
pMetaReq
)
{
pMetaReq
->
pTableName
=
taosArrayInit
(
4
,
sizeof
(
SName
));
return
buildTableName
(
pCxt
,
pStname
,
pMetaReq
->
pTableName
);
}
static
int32_t
getTableMeta
(
SInsertParseContext
*
pCxt
,
SToken
*
pTname
)
{
SCatalogReq
req
;
CHECK_CODE
(
buildMetaReq
(
pCxt
,
pTname
,
&
req
))
;
CHECK_CODE
(
catalogGetTableMeta
(
pCxt
->
pCatalog
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
));
//TODO
pCxt
->
pTableMeta
=
(
STableMeta
*
)
taosArrayGetP
(
pCxt
->
meta
.
pTableMeta
,
0
);
char
fullDbName
[
TSDB_FULL_DB_NAME_LEN
]
=
{
0
}
;
char
tableName
[
TSDB_TABLE_NAME_LEN
]
=
{
0
}
;
CHECK_CODE
(
buildName
(
pCxt
,
pTname
,
fullDbName
,
tableName
));
CHECK_CODE
(
catalogGetTableMeta
(
pCxt
->
pCatalog
,
pCxt
->
pComCxt
->
pRpc
,
pCxt
->
pComCxt
->
pEpSet
,
fullDbName
,
tableName
,
&
pCxt
->
pTableMeta
)
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -868,12 +867,11 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
.
pOutput
=
*
pInfo
};
CHECK_CODE
(
catalogGetHandle
(
NULL
,
&
context
.
pCatalog
));
//TODO
if
(
NULL
==
context
.
pTableBlockHashObj
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
CHECK_CODE
(
catalogGetHandle
(
pContext
->
pClusterId
,
&
context
.
pCatalog
));
CHECK_CODE
(
skipInsertInto
(
&
context
));
CHECK_CODE
(
parseInsertBody
(
&
context
));
...
...
source/libs/parser/test/CMakeLists.txt
浏览文件 @
2081bf76
...
...
@@ -6,13 +6,16 @@ SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
parserTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
parserTest
PUBLIC os util common parser catalog transport gtest function planner query
)
TARGET_INCLUDE_DIRECTORIES
(
parserTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/parser/"
PRIVATE
"
${
CMAKE_SOURCE_DIR
}
/source/libs/parser/inc"
)
TARGET_LINK_LIBRARIES
(
parserTest
PUBLIC os util common parser catalog transport gtest function planner query
)
TARGET_LINK_OPTIONS
(
parserTest PRIVATE -Wl,-wrap,malloc
)
source/libs/parser/test/insertTest.cpp
浏览文件 @
2081bf76
...
...
@@ -27,6 +27,27 @@ namespace {
}
}
extern
"C"
{
#include <execinfo.h>
void
*
__real_malloc
(
size_t
);
void
*
__wrap_malloc
(
size_t
c
)
{
// printf("My MALLOC called: %d\n", c);
// void *array[32];
// int size = backtrace(array, 32);
// char **symbols = backtrace_symbols(array, size);
// for (int i = 0; i < size; ++i) {
// cout << symbols[i] << endl;
// }
// free(symbols);
return
__real_malloc
(
c
);
}
}
// syntax:
// INSERT INTO
// tb_name
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
2081bf76
...
...
@@ -23,25 +23,23 @@ extern "C" {
#include "common.h"
#include "tarray.h"
#include "planner.h"
#include "parser.h"
#include "taosmsg.h"
enum
LOGIC_PLAN_E
{
LP_SCAN
=
1
,
LP_SESSION
=
2
,
LP_STATE
=
3
,
LP_INTERVAL
=
4
,
LP_FILL
=
5
,
LP_AGG
=
6
,
LP_JOIN
=
7
,
LP_PROJECT
=
8
,
LP_DISTINCT
=
9
,
LP_ORDER
=
10
};
typedef
struct
SQueryNodeBasicInfo
{
int32_t
type
;
// operator type
char
*
name
;
// operator name
}
SQueryNodeBasicInfo
;
#define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2
#define QNODE_PROJECT 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DISTINCT 8
#define QNODE_SORT 9
#define QNODE_UNION 10
#define QNODE_TIMEWINDOW 11
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
typedef
struct
SQueryDistPlanNodeInfo
{
bool
stableQuery
;
// super table query or not
...
...
@@ -52,8 +50,9 @@ typedef struct SQueryDistPlanNodeInfo {
}
SQueryDistPlanNodeInfo
;
typedef
struct
SQueryTableInfo
{
char
*
tableName
;
uint64_t
uid
;
char
*
tableName
;
// to be deleted
uint64_t
uid
;
// to be deleted
STableMetaInfo
*
pMeta
;
STimeWindow
window
;
}
SQueryTableInfo
;
...
...
@@ -64,50 +63,12 @@ typedef struct SQueryPlanNode {
SArray
*
pExpr
;
// the query functions or sql aggregations
int32_t
numOfExpr
;
// number of result columns, which is also the number of pExprs
void
*
pExtInfo
;
// additional information
//
previous
operator to generated result for current node to process
//
children
operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
SArray
*
p
PrevNodes
;
// upstream nodes
struct
SQueryPlanNode
*
nextNode
;
SArray
*
p
Children
;
// upstream nodes
struct
SQueryPlanNode
*
pParent
;
}
SQueryPlanNode
;
typedef
SSchema
SSlotSchema
;
typedef
struct
SDataBlockSchema
{
int32_t
index
;
SSlotSchema
*
pSchema
;
int32_t
numOfCols
;
// number of columns
}
SDataBlockSchema
;
typedef
struct
SPhyNode
{
SQueryNodeBasicInfo
info
;
SArray
*
pTargets
;
// target list to be computed or scanned at this node
SArray
*
pConditions
;
// implicitly-ANDed qual conditions
SDataBlockSchema
targetSchema
;
// children plan to generated result for current node to process
// in case of join, multiple plan nodes exist.
SArray
*
pChildren
;
}
SPhyNode
;
typedef
struct
SScanPhyNode
{
SPhyNode
node
;
uint64_t
uid
;
// unique id of the table
}
SScanPhyNode
;
typedef
SScanPhyNode
STagScanPhyNode
;
typedef
SScanPhyNode
SSystemTableScanPhyNode
;
typedef
struct
SMultiTableScanPhyNode
{
SScanPhyNode
scan
;
SArray
*
pTagsConditions
;
// implicitly-ANDed tag qual conditions
}
SMultiTableScanPhyNode
;
typedef
SMultiTableScanPhyNode
SMultiTableSeqScanPhyNode
;
typedef
struct
SProjectPhyNode
{
SPhyNode
node
;
}
SProjectPhyNode
;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
2081bf76
...
...
@@ -14,23 +14,147 @@
*/
#include "plannerInt.h"
#include "parser.h"
static
const
char
*
gOpName
[]
=
{
"Unknown"
,
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
typedef
struct
SPlanContext
{
struct
SCatalog
*
pCatalog
;
struct
SQueryDag
*
pDag
;
SSubplan
*
pCurrentSubplan
;
SSubplanId
nextId
;
}
SPlanContext
;
static
void
toDataBlockSchema
(
SQueryPlanNode
*
pPlanNode
,
SDataBlockSchema
*
dataBlockSchema
)
{
SWAP
(
dataBlockSchema
->
pSchema
,
pPlanNode
->
pSchema
,
SSchema
*
);
dataBlockSchema
->
numOfCols
=
pPlanNode
->
numOfCols
;
}
static
SPhyNode
*
initPhyNode
(
SQueryPlanNode
*
pPlanNode
,
int32_t
type
,
int32_t
size
)
{
SPhyNode
*
node
=
(
SPhyNode
*
)
calloc
(
1
,
size
);
node
->
info
.
type
=
type
;
node
->
info
.
name
=
gOpName
[
type
];
SWAP
(
node
->
pTargets
,
pPlanNode
->
pExpr
,
SArray
*
);
toDataBlockSchema
(
pPlanNode
,
&
(
node
->
targetSchema
));
}
static
SPhyNode
*
createTagScanNode
(
SQueryPlanNode
*
pPlanNode
)
{
return
initPhyNode
(
pPlanNode
,
OP_TagScan
,
sizeof
(
STagScanPhyNode
));
}
static
SSubplan
*
initSubplan
(
SPlanContext
*
pCxt
,
int32_t
type
)
{
SSubplan
*
subplan
=
calloc
(
1
,
sizeof
(
SSubplan
));
subplan
->
id
=
pCxt
->
nextId
;
++
(
pCxt
->
nextId
.
subplanId
);
subplan
->
type
=
type
;
subplan
->
level
=
0
;
if
(
NULL
!=
pCxt
->
pCurrentSubplan
)
{
subplan
->
level
=
pCxt
->
pCurrentSubplan
->
level
+
1
;
if
(
NULL
==
pCxt
->
pCurrentSubplan
->
pChildern
)
{
pCxt
->
pCurrentSubplan
->
pChildern
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
}
taosArrayPush
(
pCxt
->
pCurrentSubplan
->
pChildern
,
subplan
);
subplan
->
pParents
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
taosArrayPush
(
subplan
->
pParents
,
pCxt
->
pCurrentSubplan
);
}
pCxt
->
pCurrentSubplan
=
subplan
;
return
subplan
;
}
static
uint8_t
getScanFlag
(
SQueryPlanNode
*
pPlanNode
)
{
// todo
return
MASTER_SCAN
;
}
static
SPhyNode
*
createTableScanNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
)
{
STableScanPhyNode
*
node
=
(
STableScanPhyNode
*
)
initPhyNode
(
pPlanNode
,
OP_TableScan
,
sizeof
(
STableScanPhyNode
));
node
->
scan
.
uid
=
pTable
->
pMeta
->
pTableMeta
->
uid
;
node
->
scan
.
tableType
=
pTable
->
pMeta
->
pTableMeta
->
tableType
;
node
->
scanFlag
=
getScanFlag
(
pPlanNode
);
node
->
window
=
pTable
->
window
;
// todo tag cond
}
static
void
vgroupToEpSet
(
const
SVgroupMsg
*
vg
,
SEpSet
*
epSet
)
{
// todo
}
static
void
splitSubplanBySTable
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
)
{
SVgroupsInfo
*
vgroupList
=
pTable
->
pMeta
->
vgroupList
;
for
(
int32_t
i
=
0
;
i
<
pTable
->
pMeta
->
vgroupList
->
numOfVgroups
;
++
i
)
{
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_SCAN
);
vgroupToEpSet
(
&
(
pTable
->
pMeta
->
vgroupList
->
vgroups
[
i
]),
&
subplan
->
execEpSet
);
subplan
->
pNode
=
createTableScanNode
(
pCxt
,
pPlanNode
,
pTable
);
// todo reset pCxt->pCurrentSubplan
}
}
static
SPhyNode
*
createExchangeNode
()
{
SPhyNode
*
createScanNode
(
SQueryPlanNode
*
pPlanNode
)
{
return
NULL
;
}
SPhyNode
*
createPhyNode
(
SQueryPlanNode
*
node
)
{
switch
(
node
->
info
.
type
)
{
case
LP_SCAN
:
return
createScanNode
(
node
);
static
SPhyNode
*
createScanNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
SQueryTableInfo
*
pTable
=
(
SQueryTableInfo
*
)
pPlanNode
->
pExtInfo
;
if
(
TSDB_SUPER_TABLE
==
pTable
->
pMeta
->
pTableMeta
->
tableType
)
{
splitSubplanBySTable
(
pCxt
,
pPlanNode
,
pTable
);
return
createExchangeNode
(
pCxt
,
pTable
);
}
return
NULL
;
return
createTableScanNode
(
pCxt
,
pPlanNode
,
pTable
)
;
}
SPhyNode
*
createSubplan
(
SQueryPlanNode
*
pSubquery
)
{
return
NULL
;
static
SPhyNode
*
createPhyNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
SPhyNode
*
node
=
NULL
;
switch
(
pPlanNode
->
info
.
type
)
{
case
QNODE_TAGSCAN
:
node
=
createTagScanNode
(
pPlanNode
);
break
;
case
QNODE_TABLESCAN
:
node
=
createScanNode
(
pCxt
,
pPlanNode
);
break
;
default:
assert
(
false
);
}
if
(
pPlanNode
->
pChildren
!=
NULL
&&
taosArrayGetSize
(
pPlanNode
->
pChildren
)
>
0
)
{
node
->
pChildren
=
taosArrayInit
(
4
,
POINTER_BYTES
);
size_t
size
=
taosArrayGetSize
(
pPlanNode
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhyNode
*
child
=
createPhyNode
(
pCxt
,
taosArrayGet
(
pPlanNode
->
pChildren
,
i
));
child
->
pParent
=
node
;
taosArrayPush
(
node
->
pChildren
,
&
child
);
}
}
return
node
;
}
static
void
createSubplanByLevel
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pRoot
)
{
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_MERGE
);
subplan
->
pNode
=
createPhyNode
(
pCxt
,
pRoot
);
SArray
*
l0
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
taosArrayPush
(
l0
,
&
subplan
);
taosArrayPush
(
pCxt
->
pDag
->
pSubplans
,
&
l0
);
// todo deal subquery
}
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
)
{
SPlanContext
context
=
{
.
pCatalog
=
pCatalog
,
.
pDag
=
calloc
(
1
,
sizeof
(
SQueryDag
)),
.
pCurrentSubplan
=
NULL
};
if
(
NULL
==
context
.
pDag
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
context
.
pDag
->
pSubplans
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
createSubplanByLevel
(
&
context
,
pQueryNode
);
*
pDag
=
context
.
pDag
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
createDag
(
struct
SQueryPlanNode
*
pQueryNode
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
)
{
return
0
;
int32_t
subPlanToString
(
struct
SSubplan
*
pPhyNode
,
char
**
str
)
{
return
TSDB_CODE_SUCCESS
;
}
source/libs/planner/src/planner.c
浏览文件 @
2081bf76
...
...
@@ -18,21 +18,6 @@
#include "parser.h"
#include "plannerInt.h"
#define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2
#define QNODE_PROJECT 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DISTINCT 8
#define QNODE_SORT 9
#define QNODE_UNION 10
#define QNODE_TIMEWINDOW 11
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
typedef
struct
SFillEssInfo
{
int32_t
fillType
;
// fill type
int64_t
*
val
;
// fill value
...
...
@@ -104,12 +89,13 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
taosArrayPush
(
pNode
->
pExpr
,
&
pExpr
[
i
]);
}
pNode
->
p
PrevNodes
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pNode
->
p
Children
=
taosArrayInit
(
4
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
numOfPrev
;
++
i
)
{
taosArrayPush
(
pNode
->
p
PrevNodes
,
&
prev
[
i
]);
taosArrayPush
(
pNode
->
p
Children
,
&
prev
[
i
]);
}
switch
(
type
)
{
case
QNODE_TAGSCAN
:
case
QNODE_TABLESCAN
:
{
SQueryTableInfo
*
info
=
calloc
(
1
,
sizeof
(
SQueryTableInfo
));
memcpy
(
info
,
pExtInfo
,
sizeof
(
SQueryTableInfo
));
...
...
@@ -177,7 +163,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
SArray
*
pExprs
,
SArray
*
tableCols
)
{
if
(
pQueryInfo
->
info
.
onlyTagQuery
)
{
int32_t
num
=
(
int32_t
)
taosArrayGetSize
(
pExprs
);
SQueryPlanNode
*
pNode
=
createQueryNode
(
QNODE_TAGSCAN
,
"TableTagScan"
,
NULL
,
0
,
pExprs
->
pData
,
num
,
NULL
);
SQueryPlanNode
*
pNode
=
createQueryNode
(
QNODE_TAGSCAN
,
"TableTagScan"
,
NULL
,
0
,
pExprs
->
pData
,
num
,
info
);
if
(
pQueryInfo
->
info
.
distinct
)
{
pNode
=
createQueryNode
(
QNODE_DISTINCT
,
"Distinct"
,
&
pNode
,
1
,
pExprs
->
pData
,
num
,
NULL
);
...
...
@@ -386,14 +372,14 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
tfree
(
pQueryNode
->
info
.
name
);
// dropAllExprInfo(pQueryNode->pExpr);
if
(
pQueryNode
->
p
PrevNodes
!=
NULL
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pQueryNode
->
p
PrevNodes
);
if
(
pQueryNode
->
p
Children
!=
NULL
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pQueryNode
->
p
Children
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SQueryPlanNode
*
p
=
taosArrayGetP
(
pQueryNode
->
p
PrevNodes
,
i
);
SQueryPlanNode
*
p
=
taosArrayGetP
(
pQueryNode
->
p
Children
,
i
);
doDestroyQueryNode
(
p
);
}
taosArrayDestroy
(
pQueryNode
->
p
PrevNodes
);
taosArrayDestroy
(
pQueryNode
->
p
Children
);
}
tfree
(
pQueryNode
);
...
...
@@ -607,8 +593,8 @@ int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t
int32_t
queryPlanToStringImpl
(
char
*
buf
,
SQueryPlanNode
*
pQueryNode
,
int32_t
level
,
int32_t
totalLen
)
{
int32_t
len
=
doPrintPlan
(
buf
,
pQueryNode
,
level
,
totalLen
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pQueryNode
->
p
PrevNodes
);
++
i
)
{
SQueryPlanNode
*
p1
=
taosArrayGetP
(
pQueryNode
->
p
PrevNodes
,
i
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pQueryNode
->
p
Children
);
++
i
)
{
SQueryPlanNode
*
p1
=
taosArrayGetP
(
pQueryNode
->
p
Children
,
i
);
int32_t
len1
=
queryPlanToStringImpl
(
buf
,
p1
,
level
+
1
,
len
);
len
=
len1
;
}
...
...
source/libs/scheduler/CMakeLists.txt
浏览文件 @
2081bf76
...
...
@@ -9,5 +9,5 @@ target_include_directories(
target_link_libraries
(
scheduler
PRIVATE os util planner
PRIVATE os util planner
common
)
\ No newline at end of file
source/util/src/tarray.c
浏览文件 @
2081bf76
...
...
@@ -17,7 +17,7 @@
#include "tarray.h"
#include "talgo.h"
void
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
)
{
SArray
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
)
{
assert
(
elemSize
>
0
);
if
(
size
<
TARRAY_MIN_SIZE
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录