Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eff3783a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
eff3783a
编写于
12月 16, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
0e0af577
39de3fe0
变更
28
展开全部
隐藏空白更改
内联
并排
Showing
28 changed file
with
1187 addition
and
546 deletion
+1187
-546
include/common/taosmsg.h
include/common/taosmsg.h
+11
-13
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+11
-51
include/libs/parser/parser.h
include/libs/parser/parser.h
+7
-5
include/libs/planner/planner.h
include/libs/planner/planner.h
+73
-32
include/libs/planner/plannerOp.h
include/libs/planner/plannerOp.h
+48
-0
include/libs/query/query.h
include/libs/query/query.h
+51
-6
include/util/tarray.h
include/util/tarray.h
+1
-1
source/common/inc/commonInt.h
source/common/inc/commonInt.h
+1
-0
source/libs/catalog/CMakeLists.txt
source/libs/catalog/CMakeLists.txt
+2
-0
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+11
-8
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+389
-162
source/libs/catalog/test/CMakeLists.txt
source/libs/catalog/test/CMakeLists.txt
+18
-0
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+152
-0
source/libs/index/inc/index_fst.h
source/libs/index/inc/index_fst.h
+4
-0
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+6
-2
source/libs/index/src/index_fst_automation.c
source/libs/index/src/index_fst_automation.c
+3
-2
source/libs/index/test/indexTests.cpp
source/libs/index/test/indexTests.cpp
+52
-3
source/libs/parser/src/astValidate.c
source/libs/parser/src/astValidate.c
+1
-1
source/libs/parser/src/insertParser.c
source/libs/parser/src/insertParser.c
+17
-19
source/libs/parser/src/parserUtil.c
source/libs/parser/src/parserUtil.c
+0
-23
source/libs/parser/test/CMakeLists.txt
source/libs/parser/test/CMakeLists.txt
+7
-4
source/libs/parser/test/insertTest.cpp
source/libs/parser/test/insertTest.cpp
+21
-0
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+21
-60
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+135
-11
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+10
-24
source/libs/query/src/querymsg.c
source/libs/query/src/querymsg.c
+133
-117
source/libs/scheduler/CMakeLists.txt
source/libs/scheduler/CMakeLists.txt
+1
-1
source/util/src/tarray.c
source/util/src/tarray.c
+1
-1
未找到文件。
include/common/taosmsg.h
浏览文件 @
eff3783a
...
...
@@ -221,8 +221,7 @@ typedef struct SBuildTableMetaInput {
typedef
struct
SBuildUseDBInput
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
vgroupVersion
;
int32_t
dbGroupVersion
;
int32_t
vgVersion
;
}
SBuildUseDBInput
;
...
...
@@ -628,8 +627,7 @@ typedef struct {
typedef
struct
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
int8_t
ignoreNotExists
;
int32_t
vgroupVersion
;
int32_t
dbGroupVersion
;
int32_t
vgVersion
;
int32_t
reserve
[
8
];
}
SUseDbMsg
;
...
...
@@ -809,6 +807,9 @@ typedef struct SSTableVgroupMsg {
typedef
struct
SVgroupInfo
{
int32_t
vgId
;
int32_t
hashBegin
;
int32_t
hashEnd
;
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddrMsg
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupInfo
;
...
...
@@ -842,7 +843,7 @@ typedef struct {
int32_t
tversion
;
uint64_t
tuid
;
uint64_t
suid
;
SVgroupMsg
vgroup
;
int32_t
vgId
;
SSchema
pSchema
[];
}
STableMetaMsg
;
...
...
@@ -864,15 +865,12 @@ typedef struct {
}
STagData
;
typedef
struct
{
int32_t
vgroupNum
;
int32_t
vgroupVersion
;
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
dbVgroupVersion
;
int32_t
dbVgroupNum
;
int32_t
dbHashRange
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int32_t
vgVersion
;
int32_t
vgNum
;
int8_t
hashMethod
;
SVgroupInfo
vgroupInfo
[];
//int32_t vgIdList[];
}
SUseDbRspMsg
;
}
SUseDbRsp
;
...
...
include/libs/catalog/catalog.h
浏览文件 @
eff3783a
...
...
@@ -32,7 +32,7 @@ extern "C" {
struct
SCatalog
;
typedef
struct
SCatalogReq
{
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
//????
char
dbName
[
TSDB_DB_NAME_LEN
];
SArray
*
pTableName
;
// table full name
SArray
*
pUdf
;
// udf name
bool
qNodeRequired
;
// valid qnode
...
...
@@ -45,42 +45,10 @@ typedef struct SMetaData {
SEpSet
*
pEpSet
;
// qnode epset list
}
SMetaData
;
typedef
struct
STableComInfo
{
uint8_t
numOfTags
;
// the number of tags in schema
uint8_t
precision
;
// the number of precision
int16_t
numOfColumns
;
// the number of columns
int32_t
rowSize
;
// row size of the schema
}
STableComInfo
;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/
typedef
struct
SCTableMeta
{
int32_t
vgId
:
24
;
int8_t
tableType
;
uint64_t
uid
;
uint64_t
suid
;
}
SCTableMeta
;
/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta.
*/
typedef
struct
STableMeta
{
int32_t
vgId
:
24
;
int8_t
tableType
;
uint64_t
uid
;
uint64_t
suid
;
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info
int16_t
sversion
;
int16_t
tversion
;
STableComInfo
tableInfo
;
SSchema
schema
[];
}
STableMeta
;
typedef
struct
SCatalogCfg
{
bool
enableVgroupCache
;
uint32_t
maxTblCacheNum
;
uint32_t
maxDBCacheNum
;
}
SCatalogCfg
;
int32_t
catalogInit
(
SCatalogCfg
*
cfg
);
...
...
@@ -93,22 +61,14 @@ int32_t catalogInit(SCatalogCfg *cfg);
*/
int32_t
catalogGetHandle
(
const
char
*
clusterId
,
struct
SCatalog
**
catalogHandle
);
int32_t
catalogGetVgroupVersion
(
struct
SCatalog
*
pCatalog
,
int32_t
*
version
);
int32_t
catalogGetVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SArray
**
pVgroupList
);
int32_t
catalogUpdateVgroup
(
struct
SCatalog
*
pCatalog
,
SVgroupListInfo
*
pVgroup
);
int32_t
catalogGetDBVgroupVersion
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
int32_t
*
version
);
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
*
dbInfo
);
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogUpdateDBVgroup
Cache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
p
TableName
,
STableMeta
*
pTableMeta
);
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
STableMeta
*
pTableMeta
);
int32_t
catalogRenewAndGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
STableMeta
*
pTableMeta
,
STableMeta
*
pNew
TableMeta
);
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
p
DBName
,
const
char
*
pTableName
,
STableMeta
*
*
pTableMeta
);
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
);
int32_t
catalogRenewAndGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
STableMeta
**
p
TableMeta
);
/**
...
...
@@ -117,7 +77,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
* @pVgroupList - array of SVgroupInfo
* @return
*/
int32_t
catalogGetTableVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pTableName
,
SArray
*
pVgroupList
);
int32_t
catalogGetTableVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
p
DBName
,
const
char
*
p
TableName
,
SArray
*
pVgroupList
);
/**
...
...
@@ -129,7 +89,7 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe
* @param pMetaData
* @return
*/
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
const
SEpSet
*
pMgmtEps
,
SEpSet
*
pQnodeEpSet
);
...
...
include/libs/parser/parser.h
浏览文件 @
eff3783a
...
...
@@ -132,13 +132,15 @@ struct SInsertStmtInfo;
bool
qIsInsertSql
(
const
char
*
pStr
,
size_t
length
);
typedef
struct
SParseContext
{
const
char
*
pSql
;
// sql string
size_t
sqlLen
;
// length of the sql string
int64_t
id
;
// operator id, generated by uuid generator
const
char
*
pAcctId
;
const
char
*
pDbname
;
const
SEpSet
*
pEpSet
;
void
*
pRpc
;
const
char
*
pClusterId
;
const
SEpSet
*
pEpSet
;
int64_t
id
;
// query id, generated by uuid generator
int8_t
schemaAttached
;
// denote if submit block is built with table schema or not
const
char
*
pSql
;
// sql string
size_t
sqlLen
;
// length of the sql string
char
*
pMsg
;
// extended error message if exists to help avoid the problem in sql statement.
int32_t
msgLen
;
// max length of the msg
}
SParseContext
;
...
...
include/libs/planner/planner.h
浏览文件 @
eff3783a
...
...
@@ -20,52 +20,92 @@
extern
"C"
{
#endif
#include "taosmsg.h"
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
enum
OPERATOR_TYPE_E
{
OP_TableScan
=
1
,
OP_DataBlocksOptScan
=
2
,
OP_TableSeqScan
=
3
,
OP_TagScan
=
4
,
OP_TableBlockInfoScan
=
5
,
OP_Aggregate
=
6
,
OP_Project
=
7
,
OP_Groupby
=
8
,
OP_Limit
=
9
,
OP_SLimit
=
10
,
OP_TimeWindow
=
11
,
OP_SessionWindow
=
12
,
OP_StateWindow
=
22
,
OP_Fill
=
13
,
OP_MultiTableAggregate
=
14
,
OP_MultiTableTimeInterval
=
15
,
// OP_DummyInput = 16, //TODO remove it after fully refactor.
// OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
// OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter
=
19
,
OP_Distinct
=
20
,
OP_Join
=
21
,
OP_AllTimeWindow
=
23
,
OP_AllMultiTableTimeInterval
=
24
,
OP_Order
=
25
,
OP_Exchange
=
26
,
OP_Unknown
,
#define INCLUDE_AS_ENUM
#include "plannerOp.h"
#undef INCLUDE_AS_ENUM
OP_TotalNum
};
struct
SEpSet
;
struct
SQueryPlanNode
;
struct
SPhyNode
;
struct
SQueryStmtInfo
;
typedef
SSchema
SSlotSchema
;
typedef
struct
SDataBlockSchema
{
SSlotSchema
*
pSchema
;
int32_t
numOfCols
;
// number of columns
}
SDataBlockSchema
;
typedef
struct
SQueryNodeBasicInfo
{
int32_t
type
;
// operator type
const
char
*
name
;
// operator name
}
SQueryNodeBasicInfo
;
typedef
struct
SPhyNode
{
SQueryNodeBasicInfo
info
;
SArray
*
pTargets
;
// target list to be computed or scanned at this node
SArray
*
pConditions
;
// implicitly-ANDed qual conditions
SDataBlockSchema
targetSchema
;
// children plan to generated result for current node to process
// in case of join, multiple plan nodes exist.
SArray
*
pChildren
;
struct
SPhyNode
*
pParent
;
}
SPhyNode
;
typedef
struct
SScanPhyNode
{
SPhyNode
node
;
uint64_t
uid
;
// unique id of the table
int8_t
tableType
;
}
SScanPhyNode
;
typedef
SScanPhyNode
SSystemTableScanPhyNode
;
typedef
SScanPhyNode
STagScanPhyNode
;
typedef
struct
STableScanPhyNode
{
SScanPhyNode
scan
;
uint8_t
scanFlag
;
// denotes reversed scan of data or not
STimeWindow
window
;
SArray
*
pTagsConditions
;
// implicitly-ANDed tag qual conditions
}
STableScanPhyNode
;
typedef
STableScanPhyNode
STableSeqScanPhyNode
;
typedef
struct
SProjectPhyNode
{
SPhyNode
node
;
}
SProjectPhyNode
;
typedef
struct
SExchangePhyNode
{
SPhyNode
node
;
uint64_t
templateId
;
SArray
*
pSourceEpSet
;
// SEpSet
}
SExchangePhyNode
;
typedef
struct
SSubplanId
{
uint64_t
queryId
;
uint64_t
templateId
;
uint64_t
subplanId
;
}
SSubplanId
;
typedef
struct
SSubplan
{
int32_t
type
;
// QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
SArray
*
pDatasource
;
// the datasource subplan,from which to fetch the result
struct
SPhyNode
*
pNode
;
// physical plan of current subplan
SSubplanId
id
;
// unique id of the subplan
int32_t
type
;
// QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
int32_t
level
;
// the execution level of current subplan, starting from 0.
SEpSet
execEpSet
;
// for the scan sub plan, the optional execution node
SArray
*
pChildern
;
// the datasource subplan,from which to fetch the result
SArray
*
pParents
;
// the data destination subplan, get data from current subplan
SPhyNode
*
pNode
;
// physical plan of current subplan
}
SSubplan
;
typedef
struct
SQueryDag
{
SArray
*
*
pSubplans
;
SArray
*
pSubplans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
SQueryDag
;
/**
...
...
@@ -75,6 +115,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
int32_t
qExplainQuery
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
/**
* Convert to subplan to string for the scheduler to send to the executor
*/
...
...
include/libs/planner/plannerOp.h
0 → 100644
浏览文件 @
eff3783a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if defined(INCLUDE_AS_ENUM) // enum define mode
#undef OP_ENUM_MACRO
#define OP_ENUM_MACRO(op) OP_##op,
#elif defined(INCLUDE_AS_NAME) // comment define mode
#undef OP_ENUM_MACRO
#define OP_ENUM_MACRO(op) #op,
#else
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
#endif
OP_ENUM_MACRO
(
TableScan
)
OP_ENUM_MACRO
(
DataBlocksOptScan
)
OP_ENUM_MACRO
(
TableSeqScan
)
OP_ENUM_MACRO
(
TagScan
)
OP_ENUM_MACRO
(
TableBlockInfoScan
)
OP_ENUM_MACRO
(
Aggregate
)
OP_ENUM_MACRO
(
Project
)
OP_ENUM_MACRO
(
Groupby
)
OP_ENUM_MACRO
(
Limit
)
OP_ENUM_MACRO
(
SLimit
)
OP_ENUM_MACRO
(
TimeWindow
)
OP_ENUM_MACRO
(
SessionWindow
)
OP_ENUM_MACRO
(
StateWindow
)
OP_ENUM_MACRO
(
Fill
)
OP_ENUM_MACRO
(
MultiTableAggregate
)
OP_ENUM_MACRO
(
MultiTableTimeInterval
)
OP_ENUM_MACRO
(
Filter
)
OP_ENUM_MACRO
(
Distinct
)
OP_ENUM_MACRO
(
Join
)
OP_ENUM_MACRO
(
AllTimeWindow
)
OP_ENUM_MACRO
(
AllMultiTableTimeInterval
)
OP_ENUM_MACRO
(
Order
)
OP_ENUM_MACRO
(
Exchange
)
include/libs/query/query.h
浏览文件 @
eff3783a
...
...
@@ -21,21 +21,66 @@ extern "C" {
#endif
#include "tarray.h"
#include "thash.h"
typedef
SVgroupListRspMsg
SVgroupListInfo
;
typedef
struct
STableComInfo
{
uint8_t
numOfTags
;
// the number of tags in schema
uint8_t
precision
;
// the number of precision
int16_t
numOfColumns
;
// the number of columns
int32_t
rowSize
;
// row size of the schema
}
STableComInfo
;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/
typedef
struct
SCTableMeta
{
int32_t
vgId
:
24
;
int8_t
tableType
;
uint64_t
uid
;
uint64_t
suid
;
}
SCTableMeta
;
/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta.
*/
typedef
struct
STableMeta
{
//BEGIN: KEEP THIS PART SAME WITH SCTableMeta
int32_t
vgId
:
24
;
int8_t
tableType
;
uint64_t
uid
;
uint64_t
suid
;
//END: KEEP THIS PART SAME WITH SCTableMeta
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info
int16_t
sversion
;
int16_t
tversion
;
STableComInfo
tableInfo
;
SSchema
schema
[];
}
STableMeta
;
typedef
struct
SDBVgroupInfo
{
int32_t
vgroupVersion
;
SArray
*
vgI
d
;
int32_t
hashRange
;
int32_t
vgVersion
;
int8_t
hashMetho
d
;
SHashObj
*
vgInfo
;
//key:vgId, value:SVgroupInfo
}
SDBVgroupInfo
;
typedef
struct
SUseDbOutput
{
SVgroupListInfo
*
vgroupList
;
char
db
[
TSDB_TABLE_FNAME_LEN
];
SDBVgroupInfo
*
dbVgroup
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
SDBVgroupInfo
dbVgroup
;
}
SUseDbOutput
;
typedef
struct
STableMetaOutput
{
int32_t
metaNum
;
char
ctbFname
[
TSDB_TABLE_FNAME_LEN
];
char
tbFname
[
TSDB_TABLE_FNAME_LEN
];
SCTableMeta
ctbMeta
;
STableMeta
*
tbMeta
;
}
STableMetaOutput
;
extern
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
...
...
include/util/tarray.h
浏览文件 @
eff3783a
...
...
@@ -41,7 +41,7 @@ typedef struct SArray {
* @param elemSize
* @return
*/
void
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
);
SArray
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
);
/**
*
...
...
source/common/inc/commonInt.h
浏览文件 @
eff3783a
...
...
@@ -20,6 +20,7 @@
extern
"C"
{
#endif
extern
bool
tIsValidSchema
(
struct
SSchema
*
pSchema
,
int32_t
numOfCols
,
int32_t
numOfTags
);
...
...
source/libs/catalog/CMakeLists.txt
浏览文件 @
eff3783a
...
...
@@ -10,3 +10,5 @@ target_link_libraries(
catalog
PRIVATE os util common transport query
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
source/libs/catalog/inc/catalogInt.h
浏览文件 @
eff3783a
...
...
@@ -24,16 +24,16 @@ extern "C" {
#include "common.h"
#include "tlog.h"
#define CTG_DEFAULT_CLUSTER_NUMBER 6
#define CTG_DEFAULT_VGROUP_NUMBER 100
#define CTG_DEFAULT_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000
#define CTG_DEFAULT_INVALID_VERSION (-1)
typedef
struct
SVgroupListCache
{
int32_t
vgroupVersion
;
SHashObj
*
cache
;
// key:vgId, value:SVgroupInfo*
SArray
*
arrayCache
;
// SVgroupInfo
SHashObj
*
cache
;
// key:vgId, value:SVgroupInfo
}
SVgroupListCache
;
typedef
struct
SDBVgroupCache
{
...
...
@@ -41,20 +41,23 @@ typedef struct SDBVgroupCache {
}
SDBVgroupCache
;
typedef
struct
STableMetaCache
{
SHashObj
*
cache
;
//key:fulltablename, value:STableMeta
SHashObj
*
cache
;
//key:fulltablename, value:STableMeta
SHashObj
*
stableCache
;
//key:suid, value:STableMeta*
}
STableMetaCache
;
typedef
struct
SCatalog
{
SVgroupListCache
vgroupCache
;
SDBVgroupCache
dbCache
;
STableMetaCache
tableCache
;
SDBVgroupCache
dbCache
;
STableMetaCache
tableCache
;
}
SCatalog
;
typedef
struct
SCatalogMgmt
{
void
*
pMsgSender
;
// used to send messsage to mnode to fetch necessary metadata
SHashObj
*
pCluster
;
// items cached for each cluster, the hash key is the cluster-id got from mgmt node
SCatalogCfg
cfg
;
}
SCatalogMgmt
;
typedef
uint32_t
(
*
tableNameHashFp
)(
const
char
*
,
uint32_t
);
extern
int32_t
ctgDebugFlag
;
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
eff3783a
此差异已折叠。
点击以展开。
source/libs/catalog/test/CMakeLists.txt
0 → 100644
浏览文件 @
eff3783a
MESSAGE
(
STATUS
"build catalog unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
catalogTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
catalogTest
PUBLIC os util common catalog transport gtest query
)
TARGET_INCLUDE_DIRECTORIES
(
catalogTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/catalog/"
PRIVATE
"
${
CMAKE_SOURCE_DIR
}
/source/libs/catalog/inc"
)
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
eff3783a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "catalog.h"
namespace
{
}
TEST
(
testCase
,
normalCase
)
{
char
*
clusterId
=
"cluster1"
;
struct
SCatalog
*
pCtg
=
NULL
;
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
clusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
}
/*
TEST(testCase, normalCase) {
SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMsgBuf buf;
buf.len = 128;
buf.buf = msg;
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
SQueryStmtInfo* pQueryInfo = createQueryInfo();
setTableMetaInfo(pQueryInfo, &req);
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0);
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
ASSERT_EQ(ret, 0);
SArray* pExprList = pQueryInfo->exprList[0];
int32_t num = tsCompatibleModel? 2:1;
ASSERT_EQ(taosArrayGetSize(pExprList), num);
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
ASSERT_EQ(p1->base.pColumns->uid, 110);
ASSERT_EQ(p1->base.numOfParams, 1);
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP);
ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.interBytes, 16);
ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
ASSERT_STREQ(p1->pExpr->_function.functionName, "top");
tExprNode* pParam = p1->pExpr->_function.pChild[0];
ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE);
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
struct SQueryPlanNode* n = nullptr;
code = createQueryPlan(pQueryInfo, &n);
char* str = NULL;
queryPlanToString(n, &str);
printf("%s\n", str);
destroyQueryInfo(pQueryInfo);
qParserClearupMetaRequestInfo(&req);
destroySqlInfo(&info1);
}
TEST(testCase, displayPlan) {
generateLogicplan("select count(*) from `t.1abc`");
generateLogicplan("select count(*)+ 22 from `t.1abc`");
generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30");
generateLogicplan("select count(*) from `t.1abc` group by a");
generateLogicplan("select count(A+B) from `t.1abc` group by a");
generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc ");
generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20");
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)");
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)");
// order by + group by column + limit offset
generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1");
// fill
generateLogicplan("select min(a) from `t.1abc` where ts>now and ts<now+2h interval(1s) fill(linear)");
// union + union all
// join
// Aggregate(count(*) [count(*) #5056], sum(a) [sum(a) #5057], avg(b) [avg(b) #5058], min(a+b) [min(a+b) #5060])
// Projection(cols: [a+b #5059]) filters:(nil)
// Projection(cols: [ts #0], [a #1], [b #2]) filters:(nil)
// TableScan(t.1abc #110) time_range: -9223372036854775808 - 9223372036854775807
}
*/
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
source/libs/index/inc/index_fst.h
浏览文件 @
eff3783a
...
...
@@ -30,6 +30,7 @@ extern "C" {
typedef
struct
Fst
Fst
;
typedef
struct
FstNode
FstNode
;
typedef
struct
StreamWithState
StreamWithState
;
typedef
enum
{
Included
,
Excluded
,
Unbounded
}
FstBound
;
...
...
@@ -283,6 +284,9 @@ Output fstEmptyFinalOutput(Fst *fst, bool *null);
FstStreamBuilder
*
fstSearch
(
Fst
*
fst
,
AutomationCtx
*
ctx
);
FstStreamWithStateBuilder
*
fstSearchWithState
(
Fst
*
fst
,
AutomationCtx
*
ctx
);
// into stream to expand later
StreamWithState
*
streamBuilderIntoStream
(
FstStreamBuilder
*
sb
);
bool
fstVerify
(
Fst
*
fst
);
...
...
source/libs/index/src/index_fst.c
浏览文件 @
eff3783a
...
...
@@ -1094,6 +1094,10 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
FstStreamBuilder
*
fstSearch
(
Fst
*
fst
,
AutomationCtx
*
ctx
)
{
return
fstStreamBuilderCreate
(
fst
,
ctx
);
}
StreamWithState
*
streamBuilderIntoStream
(
FstStreamBuilder
*
sb
)
{
if
(
sb
==
NULL
)
{
return
NULL
;
}
return
streamWithStateCreate
(
sb
->
fst
,
sb
->
aut
,
sb
->
min
,
sb
->
max
);
}
FstStreamWithStateBuilder
*
fstSearchWithState
(
Fst
*
fst
,
AutomationCtx
*
ctx
)
{
return
fstStreamBuilderCreate
(
fst
,
ctx
);
}
...
...
@@ -1119,7 +1123,7 @@ CompiledAddr fstGetRootAddr(Fst *fst) {
Output
fstEmptyFinalOutput
(
Fst
*
fst
,
bool
*
null
)
{
Output
res
=
0
;
FstNode
*
node
=
fst
->
root
;
FstNode
*
node
=
fst
GetRoot
(
fst
)
;
if
(
FST_NODE_IS_FINAL
(
node
))
{
*
null
=
false
;
res
=
FST_NODE_FINAL_OUTPUT
(
node
);
...
...
@@ -1176,7 +1180,7 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) {
bool
fstBoundWithDataIsIncluded
(
FstBoundWithData
*
bound
)
{
return
bound
->
type
==
Included
?
true
:
fals
e
;
return
bound
->
type
==
Excluded
?
false
:
tru
e
;
}
void
fstBoundDestroy
(
FstBoundWithData
*
bound
)
{
...
...
source/libs/index/src/index_fst_automation.c
浏览文件 @
eff3783a
...
...
@@ -68,7 +68,7 @@ StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) {
// prefix query, impl later
static
void
*
prefixStart
(
AutomationCtx
*
ctx
)
{
StartWithStateValue
*
data
=
(
StartWithStateValue
*
)(
ctx
->
data
);
StartWithStateValue
*
data
=
(
StartWithStateValue
*
)(
ctx
->
st
data
);
return
startWithStateValueDump
(
data
);
};
static
bool
prefixIsMatch
(
AutomationCtx
*
ctx
,
void
*
sv
)
{
...
...
@@ -145,7 +145,8 @@ AutomationCtx* automCtxCreate(void *data,AutomationType atype) {
StartWithStateValue
*
sv
=
NULL
;
if
(
atype
==
AUTOMATION_PREFIX
)
{
sv
=
startWithStateValueCreate
(
Running
,
FST_INT
,
0
);
int
val
=
0
;
sv
=
startWithStateValueCreate
(
Running
,
FST_INT
,
&
val
);
ctx
->
stdata
=
(
void
*
)
sv
;
}
else
if
(
atype
==
AUTMMATION_MATCH
)
{
...
...
source/libs/index/test/indexTests.cpp
浏览文件 @
eff3783a
...
...
@@ -59,9 +59,22 @@ class FstReadMemory {
return
ok
;
}
// add later
bool
Search
(
const
std
::
string
&
key
,
std
::
vector
<
uint64_t
>
&
result
)
{
bool
Search
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>
&
result
)
{
FstStreamBuilder
*
sb
=
fstSearch
(
_fst
,
ctx
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithStateResult
*
rt
=
NULL
;
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
result
.
push_back
((
uint64_t
)(
rt
->
out
.
out
));
}
return
true
;
}
bool
SearchWithTimeCostUs
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>
&
result
)
{
int64_t
s
=
taosGetTimestampUs
();
bool
ok
=
this
->
Search
(
ctx
,
result
);
int64_t
e
=
taosGetTimestampUs
();
return
ok
;
}
~
FstReadMemory
()
{
fstCountingWriterDestroy
(
_w
);
...
...
@@ -186,11 +199,43 @@ void checkFstPerf() {
printf
(
"success to init fst read"
);
}
Performance_fstReadRecords
(
m
);
delete
m
;
}
void
checkFstPrefixSearch
()
{
FstWriter
*
fw
=
new
FstWriter
;
int64_t
s
=
taosGetTimestampUs
();
int
count
=
2
;
std
::
string
key
(
"ab"
);
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
key
[
1
]
=
key
[
1
]
+
i
;
fw
->
Put
(
key
,
i
);
}
int64_t
e
=
taosGetTimestampUs
();
std
::
cout
<<
"insert data count : "
<<
count
<<
"elapas time: "
<<
e
-
s
<<
std
::
endl
;
delete
fw
;
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
if
(
m
->
init
()
==
false
)
{
std
::
cout
<<
"init readMemory failed"
<<
std
::
endl
;
delete
m
;
return
;
}
// prefix search
std
::
vector
<
uint64_t
>
result
;
AutomationCtx
*
ctx
=
automCtxCreate
((
void
*
)
"ab"
,
AUTOMATION_PREFIX
);
m
->
Search
(
ctx
,
result
);
assert
(
result
.
size
()
==
count
);
for
(
int
i
=
0
;
i
<
result
.
size
();
i
++
)
{
assert
(
result
[
i
]
==
i
);
// check result
}
free
(
ctx
);
delete
m
;
}
void
validateFst
()
{
int
val
=
100
;
int
count
=
100
;
...
...
@@ -209,6 +254,8 @@ void validateFst() {
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
if
(
m
->
init
()
==
false
)
{
std
::
cout
<<
"init readMemory failed"
<<
std
::
endl
;
delete
m
;
return
;
}
{
...
...
@@ -230,10 +277,12 @@ void validateFst() {
}
}
delete
m
;
}
int
main
(
int
argc
,
char
**
argv
)
{
checkFstPerf
();
//checkFstPrefixSearch();
return
1
;
}
...
...
source/libs/parser/src/astValidate.c
浏览文件 @
eff3783a
...
...
@@ -4088,7 +4088,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
}
// load the meta data from catalog
code
=
catalogGetAllMeta
(
pCatalog
,
NULL
,
&
req
,
&
data
);
code
=
catalogGetAllMeta
(
pCatalog
,
NULL
,
NULL
,
&
req
,
&
data
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
source/libs/parser/src/insertParser.c
浏览文件 @
eff3783a
...
...
@@ -71,8 +71,7 @@ typedef struct SInsertParseContext {
const
char
*
pSql
;
SMsgBuf
msg
;
struct
SCatalog
*
pCatalog
;
SMetaData
meta
;
// need release
const
STableMeta
*
pTableMeta
;
STableMeta
*
pTableMeta
;
SHashObj
*
pTableBlockHashObj
;
// data block for each table. need release
int32_t
totalNum
;
SInsertStmtInfo
*
pOutput
;
...
...
@@ -165,29 +164,29 @@ static int32_t skipInsertInto(SInsertParseContext* pCxt) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
build
TableName
(
SInsertParseContext
*
pCxt
,
SToken
*
pStname
,
SArray
*
tableNameList
)
{
static
int32_t
build
Name
(
SInsertParseContext
*
pCxt
,
SToken
*
pStname
,
char
*
fullDbName
,
char
*
tableName
)
{
if
(
parserValidateIdToken
(
pStname
)
!=
TSDB_CODE_SUCCESS
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"invalid table name"
,
pStname
->
z
);
}
SName
name
=
{
0
};
strcpy
(
name
.
dbname
,
pCxt
->
pComCxt
->
pDbname
);
strncpy
(
name
.
tname
,
pStname
->
z
,
pStname
->
n
);
taosArrayPush
(
tableNameList
,
&
name
);
char
*
p
=
strnchr
(
pStname
->
z
,
TS_PATH_DELIMITER
[
0
],
pStname
->
n
,
false
);
if
(
NULL
!=
p
)
{
// db.table
strcpy
(
fullDbName
,
pCxt
->
pComCxt
->
pAcctId
);
fullDbName
[
strlen
(
pCxt
->
pComCxt
->
pAcctId
)]
=
TS_PATH_DELIMITER
[
0
];
strncpy
(
fullDbName
,
pStname
->
z
,
p
-
pStname
->
z
);
strncpy
(
tableName
,
p
+
1
,
pStname
->
n
-
(
p
-
pStname
->
z
)
-
1
);
}
else
{
snprintf
(
fullDbName
,
TSDB_FULL_DB_NAME_LEN
,
"%s.%s"
,
pCxt
->
pComCxt
->
pAcctId
,
pCxt
->
pComCxt
->
pDbname
);
strncpy
(
tableName
,
pStname
->
z
,
pStname
->
n
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildMetaReq
(
SInsertParseContext
*
pCxt
,
SToken
*
pStname
,
SCatalogReq
*
pMetaReq
)
{
pMetaReq
->
pTableName
=
taosArrayInit
(
4
,
sizeof
(
SName
));
return
buildTableName
(
pCxt
,
pStname
,
pMetaReq
->
pTableName
);
}
static
int32_t
getTableMeta
(
SInsertParseContext
*
pCxt
,
SToken
*
pTname
)
{
SCatalogReq
req
;
CHECK_CODE
(
buildMetaReq
(
pCxt
,
pTname
,
&
req
))
;
CHECK_CODE
(
catalogGetTableMeta
(
pCxt
->
pCatalog
,
NULL
,
NULL
,
NULL
,
&
pCxt
->
meta
));
//TODO
pCxt
->
pTableMeta
=
(
STableMeta
*
)
taosArrayGetP
(
pCxt
->
meta
.
pTableMeta
,
0
);
char
fullDbName
[
TSDB_FULL_DB_NAME_LEN
]
=
{
0
}
;
char
tableName
[
TSDB_TABLE_NAME_LEN
]
=
{
0
}
;
CHECK_CODE
(
buildName
(
pCxt
,
pTname
,
fullDbName
,
tableName
));
CHECK_CODE
(
catalogGetTableMeta
(
pCxt
->
pCatalog
,
pCxt
->
pComCxt
->
pRpc
,
pCxt
->
pComCxt
->
pEpSet
,
fullDbName
,
tableName
,
&
pCxt
->
pTableMeta
)
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -868,12 +867,11 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
.
pOutput
=
*
pInfo
};
CHECK_CODE
(
catalogGetHandle
(
NULL
,
&
context
.
pCatalog
));
//TODO
if
(
NULL
==
context
.
pTableBlockHashObj
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
CHECK_CODE
(
catalogGetHandle
(
pContext
->
pClusterId
,
&
context
.
pCatalog
));
CHECK_CODE
(
skipInsertInto
(
&
context
));
CHECK_CODE
(
parseInsertBody
(
&
context
));
...
...
source/libs/parser/src/parserUtil.c
浏览文件 @
eff3783a
...
...
@@ -1464,29 +1464,6 @@ int32_t copyTagData(STagData* dst, const STagData* src) {
return
0
;
}
STableMeta
*
createSuperTableMeta
(
STableMetaMsg
*
pChild
)
{
assert
(
pChild
!=
NULL
);
int32_t
total
=
pChild
->
numOfColumns
+
pChild
->
numOfTags
;
STableMeta
*
pTableMeta
=
calloc
(
1
,
sizeof
(
STableMeta
)
+
sizeof
(
SSchema
)
*
total
);
pTableMeta
->
tableType
=
TSDB_SUPER_TABLE
;
pTableMeta
->
tableInfo
.
numOfTags
=
pChild
->
numOfTags
;
pTableMeta
->
tableInfo
.
numOfColumns
=
pChild
->
numOfColumns
;
pTableMeta
->
tableInfo
.
precision
=
pChild
->
precision
;
pTableMeta
->
uid
=
pChild
->
suid
;
pTableMeta
->
tversion
=
pChild
->
tversion
;
pTableMeta
->
sversion
=
pChild
->
sversion
;
memcpy
(
pTableMeta
->
schema
,
pChild
->
pSchema
,
sizeof
(
SSchema
)
*
total
);
int32_t
num
=
pTableMeta
->
tableInfo
.
numOfColumns
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
pTableMeta
->
tableInfo
.
rowSize
+=
pTableMeta
->
schema
[
i
].
bytes
;
}
return
pTableMeta
;
}
uint32_t
getTableMetaSize
(
const
STableMeta
*
pTableMeta
)
{
assert
(
pTableMeta
!=
NULL
);
...
...
source/libs/parser/test/CMakeLists.txt
浏览文件 @
eff3783a
...
...
@@ -6,13 +6,16 @@ SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
parserTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
parserTest
PUBLIC os util common parser catalog transport gtest function planner query
)
TARGET_INCLUDE_DIRECTORIES
(
parserTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/parser/"
PRIVATE
"
${
CMAKE_SOURCE_DIR
}
/source/libs/parser/inc"
)
TARGET_LINK_LIBRARIES
(
parserTest
PUBLIC os util common parser catalog transport gtest function planner query
)
TARGET_LINK_OPTIONS
(
parserTest PRIVATE -Wl,-wrap,malloc
)
source/libs/parser/test/insertTest.cpp
浏览文件 @
eff3783a
...
...
@@ -27,6 +27,27 @@ namespace {
}
}
extern
"C"
{
#include <execinfo.h>
void
*
__real_malloc
(
size_t
);
void
*
__wrap_malloc
(
size_t
c
)
{
// printf("My MALLOC called: %d\n", c);
// void *array[32];
// int size = backtrace(array, 32);
// char **symbols = backtrace_symbols(array, size);
// for (int i = 0; i < size; ++i) {
// cout << symbols[i] << endl;
// }
// free(symbols);
return
__real_malloc
(
c
);
}
}
// syntax:
// INSERT INTO
// tb_name
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
eff3783a
...
...
@@ -23,25 +23,23 @@ extern "C" {
#include "common.h"
#include "tarray.h"
#include "planner.h"
#include "parser.h"
#include "taosmsg.h"
enum
LOGIC_PLAN_E
{
LP_SCAN
=
1
,
LP_SESSION
=
2
,
LP_STATE
=
3
,
LP_INTERVAL
=
4
,
LP_FILL
=
5
,
LP_AGG
=
6
,
LP_JOIN
=
7
,
LP_PROJECT
=
8
,
LP_DISTINCT
=
9
,
LP_ORDER
=
10
};
typedef
struct
SQueryNodeBasicInfo
{
int32_t
type
;
// operator type
char
*
name
;
// operator name
}
SQueryNodeBasicInfo
;
#define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2
#define QNODE_PROJECT 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DISTINCT 8
#define QNODE_SORT 9
#define QNODE_UNION 10
#define QNODE_TIMEWINDOW 11
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
typedef
struct
SQueryDistPlanNodeInfo
{
bool
stableQuery
;
// super table query or not
...
...
@@ -52,8 +50,9 @@ typedef struct SQueryDistPlanNodeInfo {
}
SQueryDistPlanNodeInfo
;
typedef
struct
SQueryTableInfo
{
char
*
tableName
;
uint64_t
uid
;
char
*
tableName
;
// to be deleted
uint64_t
uid
;
// to be deleted
STableMetaInfo
*
pMeta
;
STimeWindow
window
;
}
SQueryTableInfo
;
...
...
@@ -64,50 +63,12 @@ typedef struct SQueryPlanNode {
SArray
*
pExpr
;
// the query functions or sql aggregations
int32_t
numOfExpr
;
// number of result columns, which is also the number of pExprs
void
*
pExtInfo
;
// additional information
//
previous
operator to generated result for current node to process
//
children
operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
SArray
*
p
PrevNodes
;
// upstream nodes
struct
SQueryPlanNode
*
nextNode
;
SArray
*
p
Children
;
// upstream nodes
struct
SQueryPlanNode
*
pParent
;
}
SQueryPlanNode
;
typedef
SSchema
SSlotSchema
;
typedef
struct
SDataBlockSchema
{
int32_t
index
;
SSlotSchema
*
pSchema
;
int32_t
numOfCols
;
// number of columns
}
SDataBlockSchema
;
typedef
struct
SPhyNode
{
SQueryNodeBasicInfo
info
;
SArray
*
pTargets
;
// target list to be computed or scanned at this node
SArray
*
pConditions
;
// implicitly-ANDed qual conditions
SDataBlockSchema
targetSchema
;
// children plan to generated result for current node to process
// in case of join, multiple plan nodes exist.
SArray
*
pChildren
;
}
SPhyNode
;
typedef
struct
SScanPhyNode
{
SPhyNode
node
;
uint64_t
uid
;
// unique id of the table
}
SScanPhyNode
;
typedef
SScanPhyNode
STagScanPhyNode
;
typedef
SScanPhyNode
SSystemTableScanPhyNode
;
typedef
struct
SMultiTableScanPhyNode
{
SScanPhyNode
scan
;
SArray
*
pTagsConditions
;
// implicitly-ANDed tag qual conditions
}
SMultiTableScanPhyNode
;
typedef
SMultiTableScanPhyNode
SMultiTableSeqScanPhyNode
;
typedef
struct
SProjectPhyNode
{
SPhyNode
node
;
}
SProjectPhyNode
;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
eff3783a
...
...
@@ -14,23 +14,147 @@
*/
#include "plannerInt.h"
#include "parser.h"
static
const
char
*
gOpName
[]
=
{
"Unknown"
,
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
typedef
struct
SPlanContext
{
struct
SCatalog
*
pCatalog
;
struct
SQueryDag
*
pDag
;
SSubplan
*
pCurrentSubplan
;
SSubplanId
nextId
;
}
SPlanContext
;
static
void
toDataBlockSchema
(
SQueryPlanNode
*
pPlanNode
,
SDataBlockSchema
*
dataBlockSchema
)
{
SWAP
(
dataBlockSchema
->
pSchema
,
pPlanNode
->
pSchema
,
SSchema
*
);
dataBlockSchema
->
numOfCols
=
pPlanNode
->
numOfCols
;
}
static
SPhyNode
*
initPhyNode
(
SQueryPlanNode
*
pPlanNode
,
int32_t
type
,
int32_t
size
)
{
SPhyNode
*
node
=
(
SPhyNode
*
)
calloc
(
1
,
size
);
node
->
info
.
type
=
type
;
node
->
info
.
name
=
gOpName
[
type
];
SWAP
(
node
->
pTargets
,
pPlanNode
->
pExpr
,
SArray
*
);
toDataBlockSchema
(
pPlanNode
,
&
(
node
->
targetSchema
));
}
static
SPhyNode
*
createTagScanNode
(
SQueryPlanNode
*
pPlanNode
)
{
return
initPhyNode
(
pPlanNode
,
OP_TagScan
,
sizeof
(
STagScanPhyNode
));
}
static
SSubplan
*
initSubplan
(
SPlanContext
*
pCxt
,
int32_t
type
)
{
SSubplan
*
subplan
=
calloc
(
1
,
sizeof
(
SSubplan
));
subplan
->
id
=
pCxt
->
nextId
;
++
(
pCxt
->
nextId
.
subplanId
);
subplan
->
type
=
type
;
subplan
->
level
=
0
;
if
(
NULL
!=
pCxt
->
pCurrentSubplan
)
{
subplan
->
level
=
pCxt
->
pCurrentSubplan
->
level
+
1
;
if
(
NULL
==
pCxt
->
pCurrentSubplan
->
pChildern
)
{
pCxt
->
pCurrentSubplan
->
pChildern
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
}
taosArrayPush
(
pCxt
->
pCurrentSubplan
->
pChildern
,
subplan
);
subplan
->
pParents
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
taosArrayPush
(
subplan
->
pParents
,
pCxt
->
pCurrentSubplan
);
}
pCxt
->
pCurrentSubplan
=
subplan
;
return
subplan
;
}
static
uint8_t
getScanFlag
(
SQueryPlanNode
*
pPlanNode
)
{
// todo
return
MASTER_SCAN
;
}
static
SPhyNode
*
createTableScanNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
)
{
STableScanPhyNode
*
node
=
(
STableScanPhyNode
*
)
initPhyNode
(
pPlanNode
,
OP_TableScan
,
sizeof
(
STableScanPhyNode
));
node
->
scan
.
uid
=
pTable
->
pMeta
->
pTableMeta
->
uid
;
node
->
scan
.
tableType
=
pTable
->
pMeta
->
pTableMeta
->
tableType
;
node
->
scanFlag
=
getScanFlag
(
pPlanNode
);
node
->
window
=
pTable
->
window
;
// todo tag cond
}
static
void
vgroupToEpSet
(
const
SVgroupMsg
*
vg
,
SEpSet
*
epSet
)
{
// todo
}
static
void
splitSubplanBySTable
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
)
{
SVgroupsInfo
*
vgroupList
=
pTable
->
pMeta
->
vgroupList
;
for
(
int32_t
i
=
0
;
i
<
pTable
->
pMeta
->
vgroupList
->
numOfVgroups
;
++
i
)
{
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_SCAN
);
vgroupToEpSet
(
&
(
pTable
->
pMeta
->
vgroupList
->
vgroups
[
i
]),
&
subplan
->
execEpSet
);
subplan
->
pNode
=
createTableScanNode
(
pCxt
,
pPlanNode
,
pTable
);
// todo reset pCxt->pCurrentSubplan
}
}
static
SPhyNode
*
createExchangeNode
()
{
SPhyNode
*
createScanNode
(
SQueryPlanNode
*
pPlanNode
)
{
return
NULL
;
}
SPhyNode
*
createPhyNode
(
SQueryPlanNode
*
node
)
{
switch
(
node
->
info
.
type
)
{
case
LP_SCAN
:
return
createScanNode
(
node
);
static
SPhyNode
*
createScanNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
SQueryTableInfo
*
pTable
=
(
SQueryTableInfo
*
)
pPlanNode
->
pExtInfo
;
if
(
TSDB_SUPER_TABLE
==
pTable
->
pMeta
->
pTableMeta
->
tableType
)
{
splitSubplanBySTable
(
pCxt
,
pPlanNode
,
pTable
);
return
createExchangeNode
(
pCxt
,
pTable
);
}
return
NULL
;
return
createTableScanNode
(
pCxt
,
pPlanNode
,
pTable
)
;
}
SPhyNode
*
createSubplan
(
SQueryPlanNode
*
pSubquery
)
{
return
NULL
;
static
SPhyNode
*
createPhyNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
SPhyNode
*
node
=
NULL
;
switch
(
pPlanNode
->
info
.
type
)
{
case
QNODE_TAGSCAN
:
node
=
createTagScanNode
(
pPlanNode
);
break
;
case
QNODE_TABLESCAN
:
node
=
createScanNode
(
pCxt
,
pPlanNode
);
break
;
default:
assert
(
false
);
}
if
(
pPlanNode
->
pChildren
!=
NULL
&&
taosArrayGetSize
(
pPlanNode
->
pChildren
)
>
0
)
{
node
->
pChildren
=
taosArrayInit
(
4
,
POINTER_BYTES
);
size_t
size
=
taosArrayGetSize
(
pPlanNode
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhyNode
*
child
=
createPhyNode
(
pCxt
,
taosArrayGet
(
pPlanNode
->
pChildren
,
i
));
child
->
pParent
=
node
;
taosArrayPush
(
node
->
pChildren
,
&
child
);
}
}
return
node
;
}
static
void
createSubplanByLevel
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pRoot
)
{
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_MERGE
);
subplan
->
pNode
=
createPhyNode
(
pCxt
,
pRoot
);
SArray
*
l0
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
taosArrayPush
(
l0
,
&
subplan
);
taosArrayPush
(
pCxt
->
pDag
->
pSubplans
,
&
l0
);
// todo deal subquery
}
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
)
{
SPlanContext
context
=
{
.
pCatalog
=
pCatalog
,
.
pDag
=
calloc
(
1
,
sizeof
(
SQueryDag
)),
.
pCurrentSubplan
=
NULL
};
if
(
NULL
==
context
.
pDag
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
context
.
pDag
->
pSubplans
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
createSubplanByLevel
(
&
context
,
pQueryNode
);
*
pDag
=
context
.
pDag
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
createDag
(
struct
SQueryPlanNode
*
pQueryNode
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
)
{
return
0
;
int32_t
subPlanToString
(
struct
SSubplan
*
pPhyNode
,
char
**
str
)
{
return
TSDB_CODE_SUCCESS
;
}
source/libs/planner/src/planner.c
浏览文件 @
eff3783a
...
...
@@ -18,21 +18,6 @@
#include "parser.h"
#include "plannerInt.h"
#define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2
#define QNODE_PROJECT 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DISTINCT 8
#define QNODE_SORT 9
#define QNODE_UNION 10
#define QNODE_TIMEWINDOW 11
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
typedef
struct
SFillEssInfo
{
int32_t
fillType
;
// fill type
int64_t
*
val
;
// fill value
...
...
@@ -104,12 +89,13 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
taosArrayPush
(
pNode
->
pExpr
,
&
pExpr
[
i
]);
}
pNode
->
p
PrevNodes
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pNode
->
p
Children
=
taosArrayInit
(
4
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
numOfPrev
;
++
i
)
{
taosArrayPush
(
pNode
->
p
PrevNodes
,
&
prev
[
i
]);
taosArrayPush
(
pNode
->
p
Children
,
&
prev
[
i
]);
}
switch
(
type
)
{
case
QNODE_TAGSCAN
:
case
QNODE_TABLESCAN
:
{
SQueryTableInfo
*
info
=
calloc
(
1
,
sizeof
(
SQueryTableInfo
));
memcpy
(
info
,
pExtInfo
,
sizeof
(
SQueryTableInfo
));
...
...
@@ -177,7 +163,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
SArray
*
pExprs
,
SArray
*
tableCols
)
{
if
(
pQueryInfo
->
info
.
onlyTagQuery
)
{
int32_t
num
=
(
int32_t
)
taosArrayGetSize
(
pExprs
);
SQueryPlanNode
*
pNode
=
createQueryNode
(
QNODE_TAGSCAN
,
"TableTagScan"
,
NULL
,
0
,
pExprs
->
pData
,
num
,
NULL
);
SQueryPlanNode
*
pNode
=
createQueryNode
(
QNODE_TAGSCAN
,
"TableTagScan"
,
NULL
,
0
,
pExprs
->
pData
,
num
,
info
);
if
(
pQueryInfo
->
info
.
distinct
)
{
pNode
=
createQueryNode
(
QNODE_DISTINCT
,
"Distinct"
,
&
pNode
,
1
,
pExprs
->
pData
,
num
,
NULL
);
...
...
@@ -386,14 +372,14 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
tfree
(
pQueryNode
->
info
.
name
);
// dropAllExprInfo(pQueryNode->pExpr);
if
(
pQueryNode
->
p
PrevNodes
!=
NULL
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pQueryNode
->
p
PrevNodes
);
if
(
pQueryNode
->
p
Children
!=
NULL
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pQueryNode
->
p
Children
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SQueryPlanNode
*
p
=
taosArrayGetP
(
pQueryNode
->
p
PrevNodes
,
i
);
SQueryPlanNode
*
p
=
taosArrayGetP
(
pQueryNode
->
p
Children
,
i
);
doDestroyQueryNode
(
p
);
}
taosArrayDestroy
(
pQueryNode
->
p
PrevNodes
);
taosArrayDestroy
(
pQueryNode
->
p
Children
);
}
tfree
(
pQueryNode
);
...
...
@@ -607,8 +593,8 @@ int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t
int32_t
queryPlanToStringImpl
(
char
*
buf
,
SQueryPlanNode
*
pQueryNode
,
int32_t
level
,
int32_t
totalLen
)
{
int32_t
len
=
doPrintPlan
(
buf
,
pQueryNode
,
level
,
totalLen
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pQueryNode
->
p
PrevNodes
);
++
i
)
{
SQueryPlanNode
*
p1
=
taosArrayGetP
(
pQueryNode
->
p
PrevNodes
,
i
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pQueryNode
->
p
Children
);
++
i
)
{
SQueryPlanNode
*
p1
=
taosArrayGetP
(
pQueryNode
->
p
Children
,
i
);
int32_t
len1
=
queryPlanToStringImpl
(
buf
,
p1
,
level
+
1
,
len
);
len
=
len1
;
}
...
...
source/libs/query/src/querymsg.c
浏览文件 @
eff3783a
...
...
@@ -21,17 +21,6 @@ int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msg
int32_t
(
*
queryProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
=
{
0
};
int32_t
queryBuildVgroupListReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
*
msgLen
=
0
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildTableMetaReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
input
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
...
...
@@ -81,8 +70,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
strncpy
(
bMsg
->
db
,
bInput
->
db
,
sizeof
(
bMsg
->
db
));
bMsg
->
db
[
sizeof
(
bMsg
->
db
)
-
1
]
=
0
;
bMsg
->
vgroupVersion
=
bInput
->
vgroupVersion
;
bMsg
->
dbGroupVersion
=
bInput
->
dbGroupVersion
;
bMsg
->
vgVersion
=
bInput
->
vgVersion
;
*
msgLen
=
(
int32_t
)
sizeof
(
*
bMsg
);
...
...
@@ -90,168 +78,196 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
}
int32_t
queryProcessVgroupListRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
int32_t
queryProcessUseDBRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SVgroupListRspMsg
*
pRsp
=
(
SVgroupListRspMsg
*
)
msg
;
pRsp
->
vgroupNum
=
htonl
(
pRsp
->
vgroupNum
);
pRsp
->
vgroupVersion
=
htonl
(
pRsp
->
vgroupVersion
);
SUseDbRsp
*
pRsp
=
(
SUseDbRsp
*
)
msg
;
SUseDbOutput
*
pOut
=
(
SUseDbOutput
*
)
output
;
int32_t
code
=
0
;
if
(
pRsp
->
vgroupNum
<
0
)
{
qError
(
"
vgroup number[%d] in rsp is invalid"
,
pRsp
->
vgroupNum
);
if
(
msgSize
<=
sizeof
(
*
pRsp
)
)
{
qError
(
"
invalid use db rsp msg size, msgSize:%d"
,
msgSize
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
pRsp
->
vgVersion
=
htonl
(
pRsp
->
vgVersion
);
pRsp
->
vgNum
=
htonl
(
pRsp
->
vgNum
);
if
(
pRsp
->
vg
roupVersion
<
0
)
{
qError
(
"
vgroup vgroupVersion[%d] in rsp is invalid"
,
pRsp
->
vgroupVersion
);
return
TSDB_CODE_TSC_
VALUE_OUT_OF_RANG
E
;
if
(
pRsp
->
vg
Num
<
0
)
{
qError
(
"
invalid db[%s] vgroup number[%d]"
,
pRsp
->
db
,
pRsp
->
vgNum
);
return
TSDB_CODE_TSC_
INVALID_VALU
E
;
}
if
(
msgSize
!=
(
pRsp
->
vgroupNum
*
sizeof
(
pRsp
->
vgroupInfo
[
0
])
+
sizeof
(
*
pRsp
)))
{
qError
(
"vgroup list msg size mis-match, msgSize:%d, vgroup number:%d"
,
msgSize
,
pRsp
->
vgroupNum
);
int32_t
expectSize
=
pRsp
->
vgNum
*
sizeof
(
pRsp
->
vgroupInfo
[
0
])
+
sizeof
(
*
pRsp
);
if
(
msgSize
!=
expectSize
)
{
qError
(
"use db rsp size mis-match, msgSize:%d, expected:%d, vgnumber:%d"
,
msgSize
,
expectSize
,
pRsp
->
vgNum
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
// keep SVgroupListInfo/SVgroupListRspMsg the same
*
(
SVgroupListInfo
**
)
output
=
(
SVgroupListInfo
*
)
msg
;
if
(
pRsp
->
vgroupNum
==
0
)
{
return
TSDB_CODE_SUCCESS
;
pOut
->
dbVgroup
.
vgVersion
=
pRsp
->
vgVersion
;
pOut
->
dbVgroup
.
hashMethod
=
pRsp
->
hashMethod
;
pOut
->
dbVgroup
.
vgInfo
=
taosHashInit
(
pRsp
->
vgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pOut
->
dbVgroup
.
vgInfo
)
{
qError
(
"hash init[%d] failed"
,
pRsp
->
vgNum
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vg
roup
Num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
pRsp
->
vgroupInfo
[
i
].
hashBegin
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
hashBegin
);
pRsp
->
vgroupInfo
[
i
].
hashEnd
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
hashEnd
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
}
return
TSDB_CODE_SUCCESS
;
}
if
(
0
!=
taosHashPut
(
pOut
->
dbVgroup
.
vgInfo
,
&
pRsp
->
vgroupInfo
[
i
].
vgId
,
sizeof
(
pRsp
->
vgroupInfo
[
i
].
vgId
),
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
])))
{
qError
(
"hash push failed"
);
goto
_return
;
}
}
memcpy
(
pOut
->
db
,
pRsp
->
db
,
sizeof
(
pOut
->
db
));
return
code
;
int32_t
queryProcessUseDBRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
_return:
if
(
pOut
)
{
tfree
(
pOut
->
dbVgroup
.
vgInfo
)
;
}
return
code
;
}
SUseDbRspMsg
*
pRsp
=
(
SUseDbRspMsg
*
)
msg
;
SUseDbOutput
*
pOut
=
(
SUseDbOutput
*
)
output
;
int32_t
code
=
0
;
if
(
msgSize
<=
sizeof
(
*
pRsp
))
{
qError
(
"invalid use db rsp msg size, msgSize:%d"
,
msgSize
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
static
int32_t
queryConvertTableMetaMsg
(
STableMetaMsg
*
pMetaMsg
)
{
pMetaMsg
->
numOfTags
=
htonl
(
pMetaMsg
->
numOfTags
);
pMetaMsg
->
numOfColumns
=
htonl
(
pMetaMsg
->
numOfColumns
);
pMetaMsg
->
sversion
=
htonl
(
pMetaMsg
->
sversion
);
pMetaMsg
->
tversion
=
htonl
(
pMetaMsg
->
tversion
);
pMetaMsg
->
tuid
=
htobe64
(
pMetaMsg
->
tuid
);
pMetaMsg
->
suid
=
htobe64
(
pMetaMsg
->
suid
);
pMetaMsg
->
vgId
=
htonl
(
pMetaMsg
->
vgId
);
if
(
pMetaMsg
->
numOfTags
<
0
||
pMetaMsg
->
numOfTags
>
TSDB_MAX_TAGS
)
{
qError
(
"invalid numOfTags[%d] in table meta rsp msg"
,
pMetaMsg
->
numOfTags
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
pRsp
->
vgroupVersion
=
htonl
(
pRsp
->
vgroupVersion
);
pRsp
->
dbVgroupVersion
=
htonl
(
pRsp
->
dbVgroupVersion
);
pRsp
->
vgroupNum
=
htonl
(
pRsp
->
vgroupNum
);
pRsp
->
dbVgroupNum
=
htonl
(
pRsp
->
dbVgroupNum
);
if
(
pMetaMsg
->
numOfColumns
>
TSDB_MAX_COLUMNS
||
pMetaMsg
->
numOfColumns
<=
0
)
{
qError
(
"invalid numOfColumns[%d] in table meta rsp msg"
,
pMetaMsg
->
numOfColumns
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
if
(
p
Rsp
->
vgroupNum
<
0
)
{
qError
(
"invalid
vgroup number[%d]"
,
pRsp
->
vgroupNum
);
if
(
p
MetaMsg
->
tableType
!=
TSDB_SUPER_TABLE
&&
pMetaMsg
->
tableType
!=
TSDB_CHILD_TABLE
&&
pMetaMsg
->
tableType
!=
TSDB_NORMAL_TABLE
)
{
qError
(
"invalid
tableType[%d] in table meta rsp msg"
,
pMetaMsg
->
tableType
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
if
(
p
Rsp
->
dbVgroupNum
<
0
)
{
qError
(
"invalid
db vgroup number[%d]"
,
pRsp
->
dbVgroupNum
);
if
(
p
MetaMsg
->
sversion
<
0
)
{
qError
(
"invalid
sversion[%d] in table meta rsp msg"
,
pMetaMsg
->
sversion
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
int32_t
expectSize
=
pRsp
->
vgroupNum
*
sizeof
(
pRsp
->
vgroupInfo
[
0
])
+
pRsp
->
dbVgroupNum
*
sizeof
(
int32_t
)
+
sizeof
(
*
pRsp
);
if
(
msgSize
!=
expectSize
)
{
qError
(
"vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d"
,
msgSize
,
expectSize
,
pRsp
->
vgroupNum
,
pRsp
->
dbVgroupNum
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
if
(
pMetaMsg
->
tversion
<
0
)
{
qError
(
"invalid tversion[%d] in table meta rsp msg"
,
pMetaMsg
->
tversion
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
SSchema
*
pSchema
=
pMetaMsg
->
pSchema
;
if
(
pRsp
->
vgroupVersion
<
0
)
{
qInfo
(
"no new vgroup list info"
);
if
(
pRsp
->
vgroupNum
!=
0
)
{
qError
(
"invalid vgroup number[%d] for no new vgroup list case"
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
}
else
{
int32_t
s
=
sizeof
(
*
pOut
->
vgroupList
)
+
sizeof
(
pOut
->
vgroupList
->
vgroupInfo
[
0
])
*
pRsp
->
vgroupNum
;
pOut
->
vgroupList
=
calloc
(
1
,
s
);
if
(
NULL
==
pOut
->
vgroupList
)
{
qError
(
"calloc size[%d] failed"
,
s
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
int32_t
numOfTotalCols
=
pMetaMsg
->
numOfColumns
+
pMetaMsg
->
numOfTags
;
for
(
int
i
=
0
;
i
<
numOfTotalCols
;
++
i
)
{
pSchema
->
bytes
=
htonl
(
pSchema
->
bytes
);
pSchema
->
colId
=
htonl
(
pSchema
->
colId
);
p
Out
->
vgroupList
->
vgroupNum
=
pRsp
->
vgroupNum
;
pOut
->
vgroupList
->
vgroupVersion
=
pRsp
->
vgroupVersion
;
p
Schema
++
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgroupNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
if
(
pMetaMsg
->
pSchema
[
0
].
colId
!=
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
qError
(
"invalid colId[%d] for the first column in table meta rsp msg"
,
pMetaMsg
->
pSchema
[
0
].
colId
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
memcpy
(
&
pOut
->
vgroupList
->
vgroupInfo
[
i
],
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
]));
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryCreateTableMetaFromMsg
(
STableMetaMsg
*
msg
,
bool
isSuperTable
,
STableMeta
**
pMeta
)
{
int32_t
total
=
msg
->
numOfColumns
+
msg
->
numOfTags
;
int32_t
metaSize
=
sizeof
(
STableMeta
)
+
sizeof
(
SSchema
)
*
total
;
STableMeta
*
pTableMeta
=
calloc
(
1
,
metaSize
);
if
(
NULL
==
pTableMeta
)
{
qError
(
"calloc size[%d] failed"
,
metaSize
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pTableMeta
->
tableType
=
isSuperTable
?
TSDB_SUPER_TABLE
:
msg
->
tableType
;
pTableMeta
->
uid
=
msg
->
suid
;
pTableMeta
->
suid
=
msg
->
suid
;
pTableMeta
->
sversion
=
msg
->
sversion
;
pTableMeta
->
tversion
=
msg
->
tversion
;
pTableMeta
->
tableInfo
.
numOfTags
=
msg
->
numOfTags
;
pTableMeta
->
tableInfo
.
precision
=
msg
->
precision
;
pTableMeta
->
tableInfo
.
numOfColumns
=
msg
->
numOfColumns
;
for
(
int32_t
i
=
0
;
i
<
msg
->
numOfColumns
;
++
i
)
{
pTableMeta
->
tableInfo
.
rowSize
+=
pTableMeta
->
schema
[
i
].
bytes
;
}
int32_t
*
vgIdList
=
(
int32_t
*
)((
char
*
)
pRsp
->
vgroupInfo
+
sizeof
(
pRsp
->
vgroupInfo
[
0
])
*
pRsp
->
vgroupNum
);
memcpy
(
pTableMeta
->
schema
,
msg
->
pSchema
,
sizeof
(
SSchema
)
*
total
);
memcpy
(
pOut
->
db
,
pRsp
->
db
,
sizeof
(
pOut
->
db
))
;
*
pMeta
=
pTableMeta
;
if
(
pRsp
->
dbVgroupVersion
<
0
)
{
qInfo
(
"no new vgroup info for db[%s]"
,
pRsp
->
db
);
}
else
{
pOut
->
dbVgroup
=
calloc
(
1
,
sizeof
(
*
pOut
->
dbVgroup
));
if
(
NULL
==
pOut
->
dbVgroup
)
{
qError
(
"calloc size[%d] failed"
,
(
int32_t
)
sizeof
(
*
pOut
->
dbVgroup
));
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_exit
;
}
return
TSDB_CODE_SUCCESS
;
}
pOut
->
dbVgroup
->
vgId
=
taosArrayInit
(
pRsp
->
dbVgroupNum
,
sizeof
(
int32_t
));
if
(
NULL
==
pOut
->
dbVgroup
->
vgId
)
{
qError
(
"taosArrayInit size[%d] failed"
,
pRsp
->
dbVgroupNum
);
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_exit
;
}
pOut
->
dbVgroup
->
vgroupVersion
=
pRsp
->
dbVgroupVersion
;
pOut
->
dbVgroup
->
hashRange
=
htonl
(
pRsp
->
dbHashRange
);
for
(
int32_t
i
=
0
;
i
<
pRsp
->
dbVgroupNum
;
++
i
)
{
*
(
vgIdList
+
i
)
=
htonl
(
*
(
vgIdList
+
i
))
;
taosArrayPush
(
pOut
->
dbVgroup
->
vgId
,
vgIdList
+
i
)
;
}
int32_t
queryProcessTableMetaRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
STableMetaMsg
*
pMetaMsg
=
(
STableMetaMsg
*
)
msg
;
int32_t
code
=
queryConvertTableMetaMsg
(
pMetaMsg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
return
code
;
STableMetaOutput
*
pOut
=
(
STableMetaOutput
*
)
output
;
if
(
!
tIsValidSchema
(
pMetaMsg
->
pSchema
,
pMetaMsg
->
numOfColumns
,
pMetaMsg
->
numOfTags
))
{
qError
(
"validate table meta schema in rsp msg failed"
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
if
(
pMetaMsg
->
tableType
==
TSDB_CHILD_TABLE
)
{
pOut
->
metaNum
=
2
;
memcpy
(
pOut
->
ctbFname
,
pMetaMsg
->
tbFname
,
sizeof
(
pOut
->
ctbFname
));
memcpy
(
pOut
->
tbFname
,
pMetaMsg
->
stbFname
,
sizeof
(
pOut
->
tbFname
));
pOut
->
ctbMeta
.
vgId
=
pMetaMsg
->
vgId
;
pOut
->
ctbMeta
.
tableType
=
pMetaMsg
->
tableType
;
pOut
->
ctbMeta
.
uid
=
pMetaMsg
->
tuid
;
pOut
->
ctbMeta
.
suid
=
pMetaMsg
->
suid
;
_exit:
if
(
pOut
->
dbVgroup
&&
pOut
->
dbVgroup
->
vgId
)
{
taosArrayDestroy
(
pOut
->
dbVgroup
->
vgId
);
pOut
->
dbVgroup
->
vgId
=
NULL
;
code
=
queryCreateTableMetaFromMsg
(
pMetaMsg
,
true
,
&
pOut
->
tbMeta
);
}
else
{
pOut
->
metaNum
=
1
;
memcpy
(
pOut
->
tbFname
,
pMetaMsg
->
tbFname
,
sizeof
(
pOut
->
tbFname
));
code
=
queryCreateTableMetaFromMsg
(
pMetaMsg
,
false
,
&
pOut
->
tbMeta
);
}
tfree
(
pOut
->
dbVgroup
);
tfree
(
pOut
->
vgroupList
);
return
code
;
}
void
msgInit
()
{
queryBuildMsg
[
TSDB_MSG_TYPE_TABLE_META
]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryBuildVgroupListReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_USE_DB
]
=
queryBuildUseDbMsg
;
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryProcessVgroupListRsp
;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_TABLE_META
]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_USE_DB
]
=
queryProcessUseDBRsp
;
/*
...
...
source/libs/scheduler/CMakeLists.txt
浏览文件 @
eff3783a
...
...
@@ -9,5 +9,5 @@ target_include_directories(
target_link_libraries
(
scheduler
PRIVATE os util planner
PRIVATE os util planner
common
)
\ No newline at end of file
source/util/src/tarray.c
浏览文件 @
eff3783a
...
...
@@ -17,7 +17,7 @@
#include "tarray.h"
#include "talgo.h"
void
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
)
{
SArray
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
)
{
assert
(
elemSize
>
0
);
if
(
size
<
TARRAY_MIN_SIZE
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录