Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
021c5176
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
021c5176
编写于
12月 22, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
5335c084
1ce9487f
变更
55
展开全部
隐藏空白更改
内联
并排
Showing
55 changed file
with
1501 addition
and
3250 deletion
+1501
-3250
include/common/taosmsg.h
include/common/taosmsg.h
+34
-16
include/common/tmsgtype.h
include/common/tmsgtype.h
+0
-8
include/common/tname.h
include/common/tname.h
+0
-2
include/dnode/vnode/tq/tq.h
include/dnode/vnode/tq/tq.h
+51
-23
include/dnode/vnode/vnode.h
include/dnode/vnode/vnode.h
+10
-0
include/libs/parser/parsenodes.h
include/libs/parser/parsenodes.h
+167
-0
include/libs/parser/parser.h
include/libs/parser/parser.h
+14
-137
include/libs/planner/planner.h
include/libs/planner/planner.h
+16
-13
include/libs/qcom/query.h
include/libs/qcom/query.h
+16
-4
include/libs/wal/wal.h
include/libs/wal/wal.h
+4
-1
include/util/ttimer.h
include/util/ttimer.h
+2
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+6
-3
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+6
-18
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+23
-9
source/client/src/clientMain.c
source/client/src/clientMain.c
+9
-10
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+29
-2769
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+35
-2
source/common/src/tname.c
source/common/src/tname.c
+5
-30
source/dnode/vnode/tq/CMakeLists.txt
source/dnode/vnode/tq/CMakeLists.txt
+1
-0
source/dnode/vnode/tq/inc/tqInt.h
source/dnode/vnode/tq/inc/tqInt.h
+1
-0
source/dnode/vnode/tq/src/tq.c
source/dnode/vnode/tq/src/tq.c
+182
-41
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+1
-1
source/libs/index/inc/index_tfile.h
source/libs/index/inc/index_tfile.h
+1
-1
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+68
-36
source/libs/parser/inc/astToMsg.h
source/libs/parser/inc/astToMsg.h
+1
-0
source/libs/parser/inc/dataBlockMgt.h
source/libs/parser/inc/dataBlockMgt.h
+1
-1
source/libs/parser/inc/parserInt.h
source/libs/parser/inc/parserInt.h
+9
-1
source/libs/parser/inc/parserUtil.h
source/libs/parser/inc/parserUtil.h
+2
-0
source/libs/parser/src/astGenerator.c
source/libs/parser/src/astGenerator.c
+3
-2
source/libs/parser/src/astToMsg.c
source/libs/parser/src/astToMsg.c
+124
-0
source/libs/parser/src/astValidate.c
source/libs/parser/src/astValidate.c
+183
-4
source/libs/parser/src/dataBlockMgt.c
source/libs/parser/src/dataBlockMgt.c
+0
-3
source/libs/parser/src/insertParser.c
source/libs/parser/src/insertParser.c
+45
-35
source/libs/parser/src/parser.c
source/libs/parser/src/parser.c
+4
-4
source/libs/parser/src/parserUtil.c
source/libs/parser/src/parserUtil.c
+13
-2
source/libs/parser/src/queryInfoUtil.c
source/libs/parser/src/queryInfoUtil.c
+1
-1
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+1
-0
source/libs/parser/test/insertParserTest.cpp
source/libs/parser/test/insertParserTest.cpp
+3
-3
source/libs/parser/test/mockCatalog.cpp
source/libs/parser/test/mockCatalog.cpp
+15
-2
source/libs/parser/test/mockCatalogService.cpp
source/libs/parser/test/mockCatalogService.cpp
+12
-7
source/libs/parser/test/mockCatalogService.h
source/libs/parser/test/mockCatalogService.h
+2
-3
source/libs/parser/test/parserTests.cpp
source/libs/parser/test/parserTests.cpp
+6
-3
source/libs/parser/test/plannerTest.cpp
source/libs/parser/test/plannerTest.cpp
+2
-2
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+6
-2
source/libs/planner/src/logicPlan.c
source/libs/planner/src/logicPlan.c
+36
-12
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+106
-26
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+82
-2
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+4
-4
source/libs/planner/test/phyPlanTests.cpp
source/libs/planner/test/phyPlanTests.cpp
+2
-1
source/libs/qcom/CMakeLists.txt
source/libs/qcom/CMakeLists.txt
+2
-0
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+46
-1
source/libs/qcom/test/CMakeLists.txt
source/libs/qcom/test/CMakeLists.txt
+19
-0
source/libs/qcom/test/queryTest.cpp
source/libs/qcom/test/queryTest.cpp
+83
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+2
-2
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+5
-3
未找到文件。
include/common/taosmsg.h
浏览文件 @
021c5176
...
...
@@ -94,15 +94,15 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message from mnode to vnode
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_STB_IN
,
"create-stb-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_STB_IN
,
"alter-stb-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_STB_IN
,
"alter-stb-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_STB_IN
,
"drop-stb-internal"
)
// message from mnode to mnode
// message from mnode to qnode
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_VNODE_IN
,
"create-vnode-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_VNODE_IN
,
"alter-vnode-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_VNODE_IN
,
"drop-vnode-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_AUTH_VNODE_IN
,
"auth-vnode-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_VNODE_IN
,
"drop-vnode-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_AUTH_VNODE_IN
,
"auth-vnode-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_SYNC_VNODE_IN
,
"sync-vnode-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_COMPACT_VNODE_IN
,
"compact-vnode-internal"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_MNODE_IN
,
"create-mnode-internal"
)
...
...
@@ -289,6 +289,37 @@ typedef struct SSchema {
char
name
[
TSDB_COL_NAME_LEN
];
}
SSchema
;
typedef
struct
{
int32_t
contLen
;
int32_t
vgId
;
int8_t
tableType
;
int16_t
numOfColumns
;
int16_t
numOfTags
;
int32_t
tid
;
int32_t
sversion
;
int32_t
tversion
;
int32_t
tagDataLen
;
int32_t
sqlDataLen
;
uint64_t
uid
;
uint64_t
superTableUid
;
uint64_t
createdTime
;
char
tableFname
[
TSDB_TABLE_FNAME_LEN
];
char
stbFname
[
TSDB_TABLE_FNAME_LEN
];
char
data
[];
}
SMDCreateTableMsg
;
typedef
struct
{
int32_t
len
;
// one create table message
char
tableName
[
TSDB_TABLE_FNAME_LEN
];
int16_t
numOfTags
;
int16_t
numOfColumns
;
int16_t
sqlLen
;
// the length of SQL, it starts after schema , sql is a null-terminated string
int8_t
igExists
;
int8_t
rspMeta
;
int8_t
reserved
[
16
];
char
schema
[];
}
SCreateTableMsg
;
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igExists
;
...
...
@@ -326,19 +357,6 @@ typedef struct {
uint64_t
suid
;
}
SDropStbInternalMsg
;
typedef
struct
{
SMsgHead
head
;
char
name
[
TSDB_TABLE_FNAME_LEN
];
char
stbFname
[
TSDB_TABLE_FNAME_LEN
];
int8_t
tableType
;
uint64_t
suid
;
int32_t
sversion
;
int32_t
numOfTags
;
int32_t
numOfColumns
;
int32_t
tagDataLen
;
char
data
[];
}
SCreateTableMsg
;
typedef
struct
{
SMsgHead
head
;
char
name
[
TSDB_TABLE_FNAME_LEN
];
...
...
include/common/tmsgtype.h
浏览文件 @
021c5176
...
...
@@ -102,14 +102,6 @@ enum {
TSDB_DEFINE_SQL_TYPE
(
TSDB_SQL_MAX
,
"max"
)
};
// create table operation type
enum
TSQL_CREATE_TABLE_TYPE
{
TSQL_CREATE_TABLE
=
0x1
,
TSQL_CREATE_STABLE
=
0x2
,
TSQL_CREATE_CTABLE
=
0x3
,
TSQL_CREATE_STREAM
=
0x4
,
};
#ifdef __cplusplus
}
#endif
...
...
include/common/tname.h
浏览文件 @
021c5176
...
...
@@ -16,8 +16,6 @@
#ifndef TDENGINE_TNAME_H
#define TDENGINE_TNAME_H
//#include "taosmsg.h"
#define TSDB_DB_NAME_T 1
#define TSDB_TABLE_NAME_T 2
...
...
include/dnode/vnode/tq/tq.h
浏览文件 @
021c5176
...
...
@@ -22,6 +22,8 @@
#include "taoserror.h"
#include "taosmsg.h"
#include "tlist.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
#ifdef __cplusplus
...
...
@@ -54,6 +56,7 @@ typedef struct STqSetCurReq {
typedef
struct
STqConsumeReq
{
STqMsgHead
head
;
int64_t
blockingTime
;
// milisec
STqAcks
acks
;
}
STqConsumeReq
;
...
...
@@ -101,33 +104,44 @@ typedef struct STqTopicVhandle {
typedef
struct
STqExec
{
void
*
runtimeEnv
;
SSDataBlock
*
(
*
exec
)(
void
*
runtimeEnv
);
void
*
(
*
assign
)(
void
*
runtimeEnv
,
SSubmitBlk
*
inputData
);
void
*
(
*
assign
)(
void
*
runtimeEnv
,
void
*
inputData
);
void
(
*
clear
)(
void
*
runtimeEnv
);
char
*
(
*
serialize
)(
struct
STqExec
*
);
struct
STqExec
*
(
*
deserialize
)(
char
*
);
}
STqExec
;
typedef
struct
STqRspHandle
{
void
*
handle
;
void
*
ahandle
;
}
STqRspHandle
;
typedef
enum
{
TQ_ITEM_READY
,
TQ_ITEM_PROCESS
,
TQ_ITEM_EMPTY
}
STqItemStatus
;
typedef
struct
STqTopic
STqTopic
;
typedef
struct
STqBufferItem
{
int64_t
offset
;
// executors are identical but not concurrent
// so there must be a copy in each item
STqExec
*
executor
;
int32_t
status
;
int64_t
size
;
void
*
content
;
STqExec
*
executor
;
int32_t
status
;
int64_t
size
;
void
*
content
;
STqTopic
*
pTopic
;
}
STqMsgItem
;
typedef
struct
STqTopic
{
struct
STqTopic
{
// char* topic; //c style, end with '\0'
// int64_t cgId;
// void* ahandle;
// int32_t head;
// int32_t tail;
int64_t
nextConsumeOffset
;
int64_t
floatingCursor
;
int64_t
topicId
;
int32_t
head
;
int32_t
tail
;
void
*
logReader
;
STqMsgItem
buffer
[
TQ_BUFFER_SIZE
];
}
STqTopic
;
};
typedef
struct
STqListHandle
{
STqTopic
topic
;
...
...
@@ -135,13 +149,13 @@ typedef struct STqListHandle {
}
STqList
;
typedef
struct
STqGroup
{
int64_t
clientId
;
int64_t
cgId
;
void
*
ahandle
;
int32_t
topicNum
;
STqList
*
head
;
SList
*
topicList
;
// SList<STqTopic>
void
*
returnMsg
;
// SVReadMsg
int64_t
clientId
;
int64_t
cgId
;
void
*
ahandle
;
int32_t
topicNum
;
STqList
*
head
;
SList
*
topicList
;
// SList<STqTopic>
STqRspHandle
rspHandle
;
}
STqGroup
;
typedef
struct
STqQueryMsg
{
...
...
@@ -149,20 +163,23 @@ typedef struct STqQueryMsg {
struct
STqQueryMsg
*
next
;
}
STqQueryMsg
;
typedef
struct
STqLog
Reader
{
typedef
struct
STqLog
Handle
{
void
*
logHandle
;
int32_t
(
*
logRead
)(
void
*
logHandle
,
void
**
data
,
int64_t
ver
);
void
*
(
*
openLogReader
)(
void
*
logHandle
);
void
(
*
closeLogReader
)(
void
*
logReader
);
int32_t
(
*
logRead
)(
void
*
logReader
,
void
**
data
,
int64_t
ver
);
int64_t
(
*
logGetFirstVer
)(
void
*
logHandle
);
int64_t
(
*
logGetSnapshotVer
)(
void
*
logHandle
);
int64_t
(
*
logGetLastVer
)(
void
*
logHandle
);
}
STqLog
Reader
;
}
STqLog
Handle
;
typedef
struct
STqCfg
{
// TODO
}
STqCfg
;
typedef
struct
STqMemRef
{
SMemAllocatorFactory
*
pAlloctorFactory
;
SMemAllocatorFactory
*
pAlloc
a
torFactory
;
SMemAllocator
*
pAllocator
;
}
STqMemRef
;
...
...
@@ -252,19 +269,30 @@ typedef struct STQ {
// the handle of meta kvstore
char
*
path
;
STqCfg
*
tqConfig
;
STqLog
Reader
*
tqLogReader
;
STqLog
Handle
*
tqLogHandle
;
STqMemRef
tqMemRef
;
STqMetaStore
*
tqMeta
;
}
STQ
;
typedef
struct
STqMgmt
{
int8_t
inited
;
tmr_h
timer
;
}
STqMgmt
;
static
STqMgmt
tqMgmt
;
// init once
int
tqInit
();
void
tqCleanUp
();
// open in each vnode
STQ
*
tqOpen
(
const
char
*
path
,
STqCfg
*
tqConfig
,
STqLog
Reader
*
tqLogReader
,
SMemAllocatorFactory
*
allocFac
);
STQ
*
tqOpen
(
const
char
*
path
,
STqCfg
*
tqConfig
,
STqLog
Handle
*
tqLogHandle
,
SMemAllocatorFactory
*
allocFac
);
void
tqClose
(
STQ
*
);
// void* will be replace by a msg type
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int64_t
version
);
int
tqCommit
(
STQ
*
);
int
tqConsume
(
STQ
*
,
S
TqConsumeReq
*
);
int
tqConsume
(
STQ
*
,
S
RpcMsg
*
pReq
,
SRpcMsg
**
pRsp
);
int
tqSetCursor
(
STQ
*
,
STqSetCurReq
*
pMsg
);
int
tqBufferSetOffset
(
STqTopic
*
,
int64_t
offset
);
...
...
include/dnode/vnode/vnode.h
浏览文件 @
021c5176
...
...
@@ -122,6 +122,16 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
*/
int
vnodeApplyWMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
/**
* @brief Process a consume message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int
vnodeProcessCMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
/**
* @brief Process the sync request
*
...
...
include/libs/parser/parsenodes.h
0 → 100644
浏览文件 @
021c5176
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PARSENODES_H_
#define _TD_PARSENODES_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "catalog.h"
#include "common.h"
#include "function.h"
#include "tmsgtype.h"
#include "tname.h"
#include "tvariant.h"
/*
* The first field of a node of any type is guaranteed to be the int16_t.
* Hence the type of any node can be gotten by casting it to SQueryNode.
*/
typedef
struct
SQueryNode
{
int16_t
type
;
}
SQueryNode
;
#define nodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type)
typedef
struct
SField
{
char
name
[
TSDB_COL_NAME_LEN
];
uint8_t
type
;
int16_t
bytes
;
}
SField
;
typedef
struct
SParseBasicCtx
{
const
char
*
db
;
int32_t
acctId
;
uint64_t
requestId
;
}
SParseBasicCtx
;
typedef
struct
SFieldInfo
{
int16_t
numOfOutput
;
// number of column in result
SField
*
final
;
SArray
*
internalField
;
// SArray<SInternalField>
}
SFieldInfo
;
typedef
struct
SCond
{
uint64_t
uid
;
int32_t
len
;
// length of tag query condition data
char
*
cond
;
}
SCond
;
typedef
struct
SJoinNode
{
uint64_t
uid
;
int16_t
tagColId
;
SArray
*
tsJoin
;
SArray
*
tagJoin
;
}
SJoinNode
;
typedef
struct
SJoinInfo
{
bool
hasJoin
;
SJoinNode
*
joinTables
[
TSDB_MAX_JOIN_TABLE_NUM
];
}
SJoinInfo
;
typedef
struct
STagCond
{
int16_t
relType
;
// relation between tbname list and query condition, including : TK_AND or TK_OR
SCond
tbnameCond
;
// tbname query condition, only support tbname query condition on one table
SJoinInfo
joinInfo
;
// join condition, only support two tables join currently
SArray
*
pCond
;
// for different table, the query condition must be seperated
}
STagCond
;
typedef
struct
STableMetaInfo
{
STableMeta
*
pTableMeta
;
// table meta, cached in client side and acquired by name
SVgroupsInfo
*
vgroupList
;
SName
name
;
char
aliasName
[
TSDB_TABLE_NAME_LEN
];
// alias name of table specified in query sql
SArray
*
tagColList
;
// SArray<SColumn*>, involved tag columns
}
STableMetaInfo
;
typedef
struct
SColumnIndex
{
int16_t
tableIndex
;
int16_t
columnIndex
;
int16_t
type
;
// normal column/tag/ user input constant column
}
SColumnIndex
;
// select statement
typedef
struct
SQueryStmtInfo
{
int16_t
command
;
// the command may be different for each subclause, so keep it seperately.
uint32_t
type
;
// query/insert type
STimeWindow
window
;
// the whole query time window
SInterval
interval
;
// tumble time window
SSessionWindow
sessionWindow
;
// session time window
SStateWindow
stateWindow
;
// state window query
SGroupbyExpr
groupbyExpr
;
// groupby tags info
SArray
*
colList
;
// SArray<SColumn*>
SFieldInfo
fieldsInfo
;
SArray
**
exprList
;
// SArray<SExprInfo*>
SLimit
limit
;
SLimit
slimit
;
STagCond
tagCond
;
SArray
*
colCond
;
SArray
*
order
;
int16_t
numOfTables
;
int16_t
curTableIdx
;
STableMetaInfo
**
pTableMetaInfo
;
struct
STSBuf
*
tsBuf
;
int16_t
fillType
;
// final result fill type
int64_t
*
fillVal
;
// default value for fill
int32_t
numOfFillVal
;
// fill value size
char
*
msg
;
// pointer to the pCmd->payload to keep error message temporarily
int64_t
clauseLimit
;
// limit for current sub clause
int64_t
prjOffset
;
// offset value in the original sql expression, only applied at client side
int64_t
vgroupLimit
;
// table limit in case of super table projection query + global order + limit
int32_t
udColumnId
;
// current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int32_t
bufLen
;
char
*
buf
;
SArray
*
pUdfInfo
;
struct
SQueryStmtInfo
*
sibling
;
// sibling
struct
SQueryStmtInfo
*
pDownstream
;
SMultiFunctionsDesc
info
;
SArray
*
pUpstream
;
// SArray<struct SQueryStmtInfo>
int32_t
havingFieldNum
;
int32_t
exprListLevelIndex
;
}
SQueryStmtInfo
;
typedef
enum
{
PAYLOAD_TYPE_KV
=
0
,
PAYLOAD_TYPE_RAW
=
1
,
}
EPayloadType
;
typedef
struct
SVgDataBlocks
{
SVgroupInfo
vg
;
int32_t
numOfTables
;
// number of tables in current submit block
uint32_t
size
;
char
*
pData
;
// SMsgDesc + SSubmitMsg + SSubmitBlk + ...
}
SVgDataBlocks
;
typedef
struct
SInsertStmtInfo
{
int16_t
nodeType
;
SArray
*
pDataBlocks
;
// data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t
schemaAttache
;
// denote if submit block is built with table schema or not
uint8_t
payloadType
;
// EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t
insertType
;
// insert data from [file|sql statement| bound statement]
const
char
*
sql
;
// current sql statement position
}
SInsertStmtInfo
;
#ifdef __cplusplus
}
#endif
#endif
/*_TD_PARSENODES_H_*/
include/libs/parser/parser.h
浏览文件 @
021c5176
...
...
@@ -20,108 +20,7 @@
extern
"C"
{
#endif
#include "catalog.h"
#include "common.h"
#include "tname.h"
#include "tvariant.h"
#include "function.h"
typedef
struct
SField
{
char
name
[
TSDB_COL_NAME_LEN
];
uint8_t
type
;
int16_t
bytes
;
}
SField
;
typedef
struct
SFieldInfo
{
int16_t
numOfOutput
;
// number of column in result
SField
*
final
;
SArray
*
internalField
;
// SArray<SInternalField>
}
SFieldInfo
;
typedef
struct
SCond
{
uint64_t
uid
;
int32_t
len
;
// length of tag query condition data
char
*
cond
;
}
SCond
;
typedef
struct
SJoinNode
{
uint64_t
uid
;
int16_t
tagColId
;
SArray
*
tsJoin
;
SArray
*
tagJoin
;
}
SJoinNode
;
typedef
struct
SJoinInfo
{
bool
hasJoin
;
SJoinNode
*
joinTables
[
TSDB_MAX_JOIN_TABLE_NUM
];
}
SJoinInfo
;
typedef
struct
STagCond
{
int16_t
relType
;
// relation between tbname list and query condition, including : TK_AND or TK_OR
SCond
tbnameCond
;
// tbname query condition, only support tbname query condition on one table
SJoinInfo
joinInfo
;
// join condition, only support two tables join currently
SArray
*
pCond
;
// for different table, the query condition must be seperated
}
STagCond
;
typedef
struct
STableMetaInfo
{
STableMeta
*
pTableMeta
;
// table meta, cached in client side and acquired by name
SVgroupsInfo
*
vgroupList
;
SName
name
;
char
aliasName
[
TSDB_TABLE_NAME_LEN
];
// alias name of table specified in query sql
SArray
*
tagColList
;
// SArray<SColumn*>, involved tag columns
}
STableMetaInfo
;
typedef
struct
SQueryStmtInfo
{
int16_t
command
;
// the command may be different for each subclause, so keep it seperately.
uint32_t
type
;
// query/insert type
STimeWindow
window
;
// the whole query time window
SInterval
interval
;
// tumble time window
SSessionWindow
sessionWindow
;
// session time window
SStateWindow
stateWindow
;
// state window query
SGroupbyExpr
groupbyExpr
;
// groupby tags info
SArray
*
colList
;
// SArray<SColumn*>
SFieldInfo
fieldsInfo
;
SArray
**
exprList
;
// SArray<SExprInfo*>
SLimit
limit
;
SLimit
slimit
;
STagCond
tagCond
;
SArray
*
colCond
;
SArray
*
order
;
int16_t
numOfTables
;
int16_t
curTableIdx
;
STableMetaInfo
**
pTableMetaInfo
;
struct
STSBuf
*
tsBuf
;
int16_t
fillType
;
// final result fill type
int64_t
*
fillVal
;
// default value for fill
int32_t
numOfFillVal
;
// fill value size
char
*
msg
;
// pointer to the pCmd->payload to keep error message temporarily
int64_t
clauseLimit
;
// limit for current sub clause
int64_t
prjOffset
;
// offset value in the original sql expression, only applied at client side
int64_t
vgroupLimit
;
// table limit in case of super table projection query + global order + limit
int32_t
udColumnId
;
// current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int32_t
bufLen
;
char
*
buf
;
SArray
*
pUdfInfo
;
struct
SQueryStmtInfo
*
sibling
;
// sibling
struct
SQueryStmtInfo
*
pDownstream
;
SMultiFunctionsDesc
info
;
SArray
*
pUpstream
;
// SArray<struct SQueryStmtInfo>
int32_t
havingFieldNum
;
int32_t
exprListLevelIndex
;
}
SQueryStmtInfo
;
typedef
struct
SColumnIndex
{
int16_t
tableIndex
;
int16_t
columnIndex
;
int16_t
type
;
// normal column/tag/ user input constant column
}
SColumnIndex
;
struct
SInsertStmtInfo
;
#include "parsenodes.h"
/**
* True will be returned if the input sql string is insert, false otherwise.
...
...
@@ -132,18 +31,16 @@ struct SInsertStmtInfo;
bool
qIsInsertSql
(
const
char
*
pStr
,
size_t
length
);
typedef
struct
SParseContext
{
const
char
*
pAcctId
;
const
char
*
pDbname
;
void
*
pRpc
;
const
char
*
pClusterId
;
struct
SCatalog
*
pCatalog
;
const
SEpSet
*
pEpSet
;
int64_t
id
;
// query id, generated by uuid generator
int8_t
schemaAttached
;
// denote if submit block is built with table schema or not
const
char
*
pSql
;
// sql string
size_t
sqlLen
;
// length of the sql string
char
*
pMsg
;
// extended error message if exists to help avoid the problem in sql statement.
int32_t
msgLen
;
// max length of the msg
SParseBasicCtx
ctx
;
void
*
pRpc
;
struct
SCatalog
*
pCatalog
;
const
SEpSet
*
pEpSet
;
int64_t
id
;
// query id, generated by uuid generator
int8_t
schemaAttached
;
// denote if submit block is built with table schema or not
const
char
*
pSql
;
// sql string
size_t
sqlLen
;
// length of the sql string
char
*
pMsg
;
// extended error message if exists to help avoid the problem in sql statement.
int32_t
msgLen
;
// max length of the msg
}
SParseContext
;
/**
...
...
@@ -154,27 +51,7 @@ typedef struct SParseContext {
* @param msg extended error message if exists.
* @return error code
*/
int32_t
qParseQuerySql
(
const
char
*
pStr
,
size_t
length
,
int64_t
id
,
int32_t
*
type
,
void
**
pOutput
,
int32_t
*
outputLen
,
char
*
msg
,
int32_t
msgLen
);
typedef
enum
{
PAYLOAD_TYPE_KV
=
0
,
PAYLOAD_TYPE_RAW
=
1
,
}
EPayloadType
;
typedef
struct
SVgDataBlocks
{
int64_t
vgId
;
// virtual group id
int32_t
numOfTables
;
// number of tables in current submit block
uint32_t
size
;
char
*
pData
;
// SMsgDesc + SSubmitMsg + SSubmitBlk + ...
}
SVgDataBlocks
;
typedef
struct
SInsertStmtInfo
{
SArray
*
pDataBlocks
;
// data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t
schemaAttache
;
// denote if submit block is built with table schema or not
uint8_t
payloadType
;
// EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t
insertType
;
// insert data from [file|sql statement| bound statement]
const
char
*
sql
;
// current sql statement position
}
SInsertStmtInfo
;
int32_t
qParseQuerySql
(
const
char
*
pStr
,
size_t
length
,
SParseBasicCtx
*
pParseCtx
,
int32_t
*
type
,
void
**
pOutput
,
int32_t
*
outputLen
,
char
*
msg
,
int32_t
msgLen
);
/**
* Parse the insert sql statement.
...
...
@@ -211,9 +88,9 @@ typedef struct SSourceParam {
SExprInfo
*
createExprInfo
(
STableMetaInfo
*
pTableMetaInfo
,
const
char
*
funcName
,
SSourceParam
*
pSource
,
SSchema
*
pResSchema
,
int16_t
interSize
);
int32_t
copyExprInfoList
(
SArray
*
dst
,
const
SArray
*
src
,
uint64_t
uid
,
bool
deepcopy
);
int32_t
copyAllExprInfo
(
SArray
*
dst
,
const
SArray
*
src
,
bool
deepcopy
);
int32_t
getExprFunctionLevel
(
SQueryStmtInfo
*
pQueryInfo
);
int32_t
getExprFunctionLevel
(
const
SQueryStmtInfo
*
pQueryInfo
);
STableMetaInfo
*
getMetaInfo
(
SQueryStmtInfo
*
pQueryInfo
,
int32_t
tableIndex
);
STableMetaInfo
*
getMetaInfo
(
const
SQueryStmtInfo
*
pQueryInfo
,
int32_t
tableIndex
);
SSchema
*
getOneColumnSchema
(
const
STableMeta
*
pTableMeta
,
int32_t
colIndex
);
SSchema
createSchema
(
uint8_t
type
,
int16_t
bytes
,
int16_t
colId
,
const
char
*
name
);
...
...
include/libs/planner/planner.h
浏览文件 @
021c5176
...
...
@@ -25,6 +25,7 @@ extern "C" {
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
#define QUERY_TYPE_MODIFY 4
enum
OPERATOR_TYPE_E
{
OP_Unknown
,
...
...
@@ -58,18 +59,17 @@ typedef struct SQueryNodeBasicInfo {
typedef
struct
SDataSink
{
SQueryNodeBasicInfo
info
;
SDataBlockSchema
schema
;
}
SDataSink
;
typedef
struct
SDataDispatcher
{
SDataSink
sink
;
// todo
}
SDataDispatcher
;
typedef
struct
SDataInserter
{
SDataSink
sink
;
uint64_t
uid
;
// unique id of the table
// todo data field
int32_t
numOfTables
;
uint32_t
size
;
char
*
pData
;
}
SDataInserter
;
typedef
struct
SPhyNode
{
...
...
@@ -119,12 +119,13 @@ typedef struct SSubplanId {
typedef
struct
SSubplan
{
SSubplanId
id
;
// unique id of the subplan
int32_t
type
;
// QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
int32_t
level
;
// the execution level of current subplan, starting from 0.
SEpSet
execEpSet
;
// for the scan sub plan, the optional execution node
SArray
*
pChildern
;
// the datasource subplan,from which to fetch the result
SArray
*
pParents
;
// the data destination subplan, get data from current subplan
SPhyNode
*
pNode
;
// physical plan of current subplan
int32_t
type
;
// QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
int32_t
level
;
// the execution level of current subplan, starting from 0.
SEpSet
execEpSet
;
// for the scan/modify subplan, the optional execution node
SArray
*
pChildern
;
// the datasource subplan,from which to fetch the result
SArray
*
pParents
;
// the data destination subplan, get data from current subplan
SPhyNode
*
pNode
;
// physical plan of current subplan
SDataSink
*
pDataSink
;
// data of the subplan flow into the datasink
}
SSubplan
;
typedef
struct
SQueryDag
{
...
...
@@ -133,10 +134,12 @@ typedef struct SQueryDag {
SArray
*
pSubplans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
SQueryDag
;
struct
SQueryNode
;
/**
* Create the physical plan for the query, according to the AST.
*/
int32_t
qCreateQueryDag
(
const
struct
SQuery
StmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
);
int32_t
qCreateQueryDag
(
const
struct
SQuery
Node
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
...
...
@@ -144,12 +147,12 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
// @ep one execution location of this group of datasource subplans
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SEpAddr
*
ep
);
int32_t
qExplainQuery
(
const
struct
SQuery
StmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
int32_t
qExplainQuery
(
const
struct
SQuery
Node
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
/**
* Convert to subplan to string for the scheduler to send to the executor
*/
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
);
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
,
int32_t
*
len
);
int32_t
qStringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
);
...
...
include/libs/qcom/query.h
浏览文件 @
021c5176
...
...
@@ -90,15 +90,27 @@ typedef struct STableMetaOutput {
STableMeta
*
tbMeta
;
}
STableMetaOutput
;
typedef
int32_t
__async_exec_fn_t
(
void
*
param
);
bool
tIsValidSchema
(
struct
SSchema
*
pSchema
,
int32_t
numOfCols
,
int32_t
numOfTags
);
extern
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
int32_t
initTaskQueue
();
int32_t
cleanupTaskQueue
();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t
taosAsyncExec
(
__async_exec_fn_t
execFn
,
void
*
execParam
,
int32_t
*
code
);
SSchema
*
tGetTbnameColumnSchema
();
extern
void
msgInit
();
void
msgInit
();
extern
int32_t
qDebugFlag
;
extern
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
...
...
include/libs/wal/wal.h
浏览文件 @
021c5176
...
...
@@ -174,8 +174,11 @@ SWalReadHandle *walOpenReadHandle(SWal *);
void
walCloseReadHandle
(
SWalReadHandle
*
);
int32_t
walReadWithHandle
(
SWalReadHandle
*
pRead
,
int64_t
ver
);
// deprecated
#if 0
int32_t walRead(SWal *, SWalHead **, int64_t ver);
// int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
#endif
// lifecycle check
int64_t
walGetFirstVer
(
SWal
*
);
...
...
include/util/ttimer.h
浏览文件 @
021c5176
...
...
@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_TIMER_H
#define _TD_UTIL_TIMER_H
#include "os.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/client/inc/clientInt.h
浏览文件 @
021c5176
...
...
@@ -148,13 +148,16 @@ int taos_init();
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
ip
,
uint32_t
port
,
SAppInstInfo
*
pAppInfo
);
void
destroyTscObj
(
void
*
pObj
);
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
);
void
destroyRequest
(
SRequestObj
*
pRequest
);
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
);
void
destroyRequest
(
SRequestObj
*
pRequest
);
char
*
getConnectionDB
(
STscObj
*
pObj
);
void
setConnectionDB
(
STscObj
*
pTscObj
,
const
char
*
db
);
void
taos_init_imp
(
void
);
int
taos_options_imp
(
TSDB_OPTION
option
,
const
char
*
str
);
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
);
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThreads
);
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
initMsgHandleFp
();
...
...
source/client/src/
tsc
Env.c
→
source/client/src/
client
Env.c
浏览文件 @
021c5176
...
...
@@ -13,17 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "taosmsg.h"
#include "query.h"
#include "clientInt.h"
#include "clientLog.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tref.h"
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimezone.h"
...
...
@@ -33,10 +33,8 @@
SAppInfo
appInfo
;
int32_t
tscReqRef
=
-
1
;
int32_t
tscConnRef
=
-
1
;
void
*
tscQhandle
=
NULL
;
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
int32_t
tsNumOfThreads
=
1
;
volatile
int32_t
tscInitRes
=
0
;
static
void
registerRequest
(
SRequestObj
*
pRequest
)
{
...
...
@@ -98,12 +96,12 @@ void closeTransporter(STscObj* pTscObj) {
}
// TODO refactor
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
)
{
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThread
)
{
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"TSC"
;
rpcInit
.
numOfThreads
=
tsNumOfThreads
;
rpcInit
.
numOfThreads
=
numOfThread
;
rpcInit
.
cfp
=
processMsgFromServer
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
...
...
@@ -229,18 +227,8 @@ void taos_init_imp(void) {
taosSetCoreDump
(
true
);
double
factor
=
4
.
0
;
int32_t
numOfThreads
=
MAX
((
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
),
2
);
int32_t
queueSize
=
tsMaxConnections
*
2
;
tscQhandle
=
taosInitScheduler
(
queueSize
,
numOfThreads
,
"tsc"
);
if
(
NULL
==
tscQhandle
)
{
tscError
(
"failed to init task queue"
);
tscInitRes
=
-
1
;
return
;
}
initTaskQueue
();
tscDebug
(
"client task queue is initialized, numOfThreads: %d"
,
numOfThreads
);
tscConnRef
=
taosOpenRef
(
200
,
destroyTscObj
);
tscReqRef
=
taosOpenRef
(
40960
,
doDestroyRequest
);
...
...
source/client/src/clientImpl.c
浏览文件 @
021c5176
...
...
@@ -102,9 +102,8 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo
**
pInst
=
taosHashGet
(
appInfo
.
pInstMap
,
key
,
strlen
(
key
));
if
(
pInst
==
NULL
)
{
SAppInstInfo
*
p
=
calloc
(
1
,
sizeof
(
struct
SAppInstInfo
));
p
->
mgmtEp
=
epSet
;
p
->
pTransporter
=
openTransporter
(
user
,
secretEncrypt
);
p
->
pTransporter
=
openTransporter
(
user
,
secretEncrypt
,
tsNumOfCores
);
taosHashPut
(
appInfo
.
pInstMap
,
key
,
strlen
(
key
),
&
p
,
POINTER_BYTES
);
pInst
=
&
p
;
...
...
@@ -152,8 +151,12 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
int32_t
type
=
0
;
void
*
output
=
NULL
;
int32_t
outputLen
=
0
;
code
=
qParseQuerySql
(
pRequest
->
sqlstr
,
sqlLen
,
pRequest
->
requestId
,
&
type
,
&
output
,
&
outputLen
,
pRequest
->
msgBuf
,
ERROR_MSG_BUF_DEFAULT_SIZE
);
if
(
type
==
TSDB_SQL_CREATE_USER
||
type
==
TSDB_SQL_SHOW
||
type
==
TSDB_SQL_DROP_USER
||
type
==
TSDB_SQL_DROP_ACCT
||
type
==
TSDB_SQL_CREATE_DB
||
type
==
TSDB_SQL_CREATE_ACCT
)
{
SParseBasicCtx
c
=
{.
requestId
=
pRequest
->
requestId
,
.
acctId
=
pTscObj
->
acctId
,
.
db
=
getConnectionDB
(
pTscObj
)};
code
=
qParseQuerySql
(
pRequest
->
sqlstr
,
sqlLen
,
&
c
,
&
type
,
&
output
,
&
outputLen
,
pRequest
->
msgBuf
,
ERROR_MSG_BUF_DEFAULT_SIZE
);
if
(
type
==
TSDB_SQL_CREATE_USER
||
type
==
TSDB_SQL_SHOW
||
type
==
TSDB_SQL_DROP_USER
||
type
==
TSDB_SQL_DROP_ACCT
||
type
==
TSDB_SQL_CREATE_DB
||
type
==
TSDB_SQL_CREATE_ACCT
||
type
==
TSDB_SQL_CREATE_TABLE
||
type
==
TSDB_SQL_USE_DB
)
{
pRequest
->
type
=
type
;
pRequest
->
body
.
requestMsg
=
(
SReqMsgInfo
){.
pMsg
=
output
,
.
len
=
outputLen
};
...
...
@@ -164,12 +167,12 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
sendMsgToServer
(
pTscObj
->
pTransporter
,
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
body
,
&
transporterId
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
destroyRequestMsgBody
(
&
body
);
}
else
{
assert
(
0
);
}
tfree
(
c
.
db
);
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -437,8 +440,19 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
}
}
const
char
*
taos_get_client_info
()
{
return
version
;
}
char
*
getConnectionDB
(
STscObj
*
pObj
)
{
char
*
p
=
NULL
;
pthread_mutex_lock
(
&
pObj
->
mutex
);
p
=
strndup
(
pObj
->
db
,
tListLen
(
pObj
->
db
));
pthread_mutex_unlock
(
&
pObj
->
mutex
);
int
taos_affected_rows
(
TAOS_RES
*
res
)
{
return
1
;
}
return
p
;
}
void
setConnectionDB
(
STscObj
*
pTscObj
,
const
char
*
db
)
{
assert
(
db
!=
NULL
&&
pTscObj
!=
NULL
);
pthread_mutex_lock
(
&
pTscObj
->
mutex
);
tstrncpy
(
pTscObj
->
db
,
db
,
tListLen
(
pTscObj
->
db
));
pthread_mutex_unlock
(
&
pTscObj
->
mutex
);
}
int
taos_result_precision
(
TAOS_RES
*
res
)
{
return
TSDB_TIME_PRECISION_MILLI
;
}
source/client/src/clientMain.c
浏览文件 @
021c5176
#include "os.h"
#include "clientInt.h"
#include "clientLog.h"
#include "
os
.h"
#include "
query
.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tref.h"
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimezone.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
...
...
@@ -44,9 +39,7 @@ void taos_cleanup(void) {
tscReqRef
=
-
1
;
taosCloseRef
(
id
);
void
*
p
=
tscQhandle
;
tscQhandle
=
NULL
;
taosCleanUpScheduler
(
p
);
cleanupTaskQueue
();
id
=
tscConnRef
;
tscConnRef
=
-
1
;
...
...
@@ -262,3 +255,9 @@ const char *taos_data_type(int type) {
default:
return
"UNKNOWN"
;
}
}
const
char
*
taos_get_client_info
()
{
return
version
;
}
int
taos_affected_rows
(
TAOS_RES
*
res
)
{
return
1
;
}
int
taos_result_precision
(
TAOS_RES
*
res
)
{
return
TSDB_TIME_PRECISION_MILLI
;
}
source/client/src/clientMsgHandler.c
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/client/test/clientTests.cpp
浏览文件 @
021c5176
...
...
@@ -16,15 +16,15 @@
#include <gtest/gtest.h>
#include <taoserror.h>
#include <iostream>
#include "tglobal.h"
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "../inc/clientInt.h"
#include "taos.h"
#include "tglobal.h"
#include "../inc/clientInt.h"
namespace
{
}
// namespace
...
...
@@ -148,3 +148,36 @@ TEST(testCase, create_db_Test) {
taos_close
(
pConn
);
}
TEST
(
testCase
,
use_db_test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
ASSERT_TRUE
(
pFields
==
NULL
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
ASSERT_EQ
(
numOfFields
,
0
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
create_stable_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable st1(ts timestamp, k int) tags(a int)"
);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
ASSERT_TRUE
(
pFields
==
NULL
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
ASSERT_EQ
(
numOfFields
,
0
);
taos_close
(
pConn
);
}
source/common/src/tname.c
浏览文件 @
021c5176
...
...
@@ -6,21 +6,6 @@
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
char
*
extractDBName
(
const
char
*
tableId
,
char
*
name
)
{
size_t
offset1
=
strcspn
(
tableId
,
&
TS_PATH_DELIMITER
[
0
]);
size_t
len
=
strcspn
(
&
tableId
[
offset1
+
1
],
&
TS_PATH_DELIMITER
[
0
]);
return
strncpy
(
name
,
&
tableId
[
offset1
+
1
],
len
);
}
// todo remove it
size_t
tableIdPrefix
(
const
char
*
name
,
char
*
prefix
,
int32_t
len
)
{
tstrncpy
(
prefix
,
name
,
len
);
strcat
(
prefix
,
TS_PATH_DELIMITER
);
return
strlen
(
prefix
);
}
bool
tscValidateTableNameLength
(
size_t
len
)
{
return
len
<
TSDB_TABLE_NAME_LEN
;
}
...
...
@@ -125,7 +110,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
return
-
1
;
}
int32_t
len
=
snprintf
(
dst
,
TSDB_FULL_DB_NAME_LEN
,
"%
s
.%s"
,
name
->
acctId
,
name
->
dbname
);
int32_t
len
=
snprintf
(
dst
,
TSDB_FULL_DB_NAME_LEN
,
"%
d
.%s"
,
name
->
acctId
,
name
->
dbname
);
size_t
tnameLen
=
strlen
(
name
->
tname
);
if
(
tnameLen
>
0
)
{
...
...
@@ -141,7 +126,9 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
int32_t
tNameLen
(
const
SName
*
name
)
{
assert
(
name
!=
NULL
);
int32_t
len
=
(
int32_t
)
strlen
(
name
->
acctId
);
char
tmp
[
12
]
=
{
0
};
int32_t
len
=
sprintf
(
tmp
,
"%d"
,
name
->
acctId
);
int32_t
len1
=
(
int32_t
)
strlen
(
name
->
dbname
);
int32_t
len2
=
(
int32_t
)
strlen
(
name
->
tname
);
...
...
@@ -161,10 +148,6 @@ bool tNameIsValid(const SName* name) {
return
false
;
}
if
(
strlen
(
name
->
acctId
)
<=
0
)
{
return
false
;
}
if
(
name
->
type
==
TSDB_DB_NAME_T
)
{
return
strlen
(
name
->
dbname
)
>
0
;
}
else
{
...
...
@@ -237,13 +220,6 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return
-
1
;
}
int32_t
len
=
(
int32_t
)(
p
-
str
);
// too long account id or too long db name
// if ((len >= tListLen(dst->acctId)) || (len <= 0)) {
// return -1;
// }
// memcpy (dst->acctId, str, len);
dst
->
acctId
=
strtoll
(
str
,
NULL
,
10
);
}
...
...
@@ -272,9 +248,8 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
dst
->
type
=
TSDB_TABLE_NAME_T
;
char
*
start
=
(
char
*
)
((
p
==
NULL
)
?
str
:
(
p
+
1
));
int32_t
len
=
(
int32_t
)
strlen
(
start
);
// too long account id or too long db name
int32_t
len
=
(
int32_t
)
strlen
(
start
);
if
((
len
>=
tListLen
(
dst
->
tname
))
||
(
len
<=
0
))
{
return
-
1
;
}
...
...
source/dnode/vnode/tq/CMakeLists.txt
浏览文件 @
021c5176
...
...
@@ -12,6 +12,7 @@ target_link_libraries(
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC transport
)
if
(
${
BUILD_TEST
}
)
...
...
source/dnode/vnode/tq/inc/tqInt.h
浏览文件 @
021c5176
...
...
@@ -18,6 +18,7 @@
#include "tq.h"
#include "tlog.h"
#include "trpc.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/dnode/vnode/tq/src/tq.c
浏览文件 @
021c5176
...
...
@@ -35,7 +35,22 @@ void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
const
void
*
tqDeserializeTopic
(
const
void
*
pBytes
,
STqTopic
*
pTopic
);
const
void
*
tqDeserializeItem
(
const
void
*
pBytes
,
STqMsgItem
*
pItem
);
STQ
*
tqOpen
(
const
char
*
path
,
STqCfg
*
tqConfig
,
STqLogReader
*
tqLogReader
,
SMemAllocatorFactory
*
allocFac
)
{
int
tqInit
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tqMgmt
.
inited
,
0
,
1
);
if
(
old
==
1
)
return
0
;
tqMgmt
.
timer
=
taosTmrInit
(
0
,
0
,
0
,
"TQ"
);
return
0
;
}
void
tqCleanUp
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tqMgmt
.
inited
,
1
,
0
);
if
(
old
==
0
)
return
;
taosTmrStop
(
tqMgmt
.
timer
);
taosTmrCleanUp
(
tqMgmt
.
timer
);
}
STQ
*
tqOpen
(
const
char
*
path
,
STqCfg
*
tqConfig
,
STqLogHandle
*
tqLogHandle
,
SMemAllocatorFactory
*
allocFac
)
{
STQ
*
pTq
=
malloc
(
sizeof
(
STQ
));
if
(
pTq
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
...
...
@@ -43,9 +58,9 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
}
pTq
->
path
=
strdup
(
path
);
pTq
->
tqConfig
=
tqConfig
;
pTq
->
tqLog
Reader
=
tqLogReader
;
pTq
->
tqLog
Handle
=
tqLogHandle
;
#if 0
pTq->tqMemRef.pAlloctorFactory = allocFac;
pTq->tqMemRef.pAlloc
a
torFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if (pTq->tqMemRef.pAllocator == NULL) {
// TODO: error code of buffer pool
...
...
@@ -53,16 +68,24 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
#endif
pTq
->
tqMeta
=
tqStoreOpen
(
path
,
(
FTqSerialize
)
tqSerializeGroup
,
(
FTqDeserialize
)
tqDeserializeGroup
,
free
,
0
);
if
(
pTq
->
tqMeta
==
NULL
)
{
// TODO: free STQ
free
(
pTq
);
#if 0
allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
return
NULL
;
}
return
pTq
;
}
void
tqClose
(
STQ
*
pTq
)
{
// TODO
}
static
int
tqProtoCheck
(
STqMsgHead
*
pMsg
)
{
return
pMsg
->
protoVer
==
0
;
}
static
int
tqProtoCheck
(
STqMsgHead
*
pMsg
)
{
// TODO
return
pMsg
->
protoVer
==
0
;
}
static
int
tqAckOneTopic
(
STqTopic
*
pTopic
,
STqOneAck
*
pAck
,
STqQueryMsg
**
ppQuery
)
{
// clean old item and move forward
...
...
@@ -126,6 +149,13 @@ int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup
*
ppGroup
=
pGroup
;
memset
(
pGroup
,
0
,
sizeof
(
STqGroup
));
pGroup
->
topicList
=
tdListNew
(
sizeof
(
STqTopic
));
if
(
pGroup
->
topicList
==
NULL
)
{
free
(
pGroup
);
return
-
1
;
}
*
ppGroup
=
pGroup
;
return
0
;
}
...
...
@@ -154,46 +184,55 @@ int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
return
0
;
}
static
int
tqFetch
(
STqGroup
*
pGroup
,
void
**
msg
)
{
STqList
*
h
ead
=
pGroup
->
head
;
STqList
*
node
=
h
ead
;
static
int
tqFetch
(
STqGroup
*
pGroup
,
STqConsumeRsp
**
pRsp
)
{
STqList
*
pH
ead
=
pGroup
->
head
;
STqList
*
pNode
=
pH
ead
;
int
totSize
=
0
;
int
numOfMsgs
=
0
;
// TODO: make it a macro
int
sizeLimit
=
4
*
1024
;
STqMsgContent
*
buffer
=
malloc
(
sizeLimit
);
if
(
buffer
==
NULL
)
{
// TODO:memory insufficient
int
sizeLimit
=
4
*
1024
;
void
*
ptr
=
realloc
(
*
pRsp
,
sizeof
(
STqConsumeRsp
)
+
sizeLimit
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
*
pRsp
=
ptr
;
STqMsgContent
*
buffer
=
(
*
pRsp
)
->
msgs
;
// iterate the list to get msgs of all topics
// until all topic iterated or msgs over sizeLimit
while
(
n
ode
->
next
)
{
node
=
n
ode
->
next
;
STqTopic
*
topicHandle
=
&
n
ode
->
topic
;
int
idx
=
topicHandle
->
nextConsumeOffset
%
TQ_BUFFER_SIZE
;
if
(
topicHandle
->
buffer
[
idx
].
content
!=
NULL
&&
topicHandle
->
buffer
[
idx
].
offset
==
topicHandle
->
nextConsumeOffset
)
{
totSize
+=
topicHandle
->
buffer
[
idx
].
size
;
while
(
pN
ode
->
next
)
{
pNode
=
pN
ode
->
next
;
STqTopic
*
pTopic
=
&
pN
ode
->
topic
;
int
idx
=
pTopic
->
nextConsumeOffset
%
TQ_BUFFER_SIZE
;
if
(
pTopic
->
buffer
[
idx
].
content
!=
NULL
&&
pTopic
->
buffer
[
idx
].
offset
==
pTopic
->
nextConsumeOffset
)
{
totSize
+=
pTopic
->
buffer
[
idx
].
size
;
if
(
totSize
>
sizeLimit
)
{
void
*
ptr
=
realloc
(
buffer
,
totSize
);
void
*
ptr
=
realloc
(
*
pRsp
,
sizeof
(
STqConsumeRsp
)
+
totSize
);
if
(
ptr
==
NULL
)
{
totSize
-=
topicHandle
->
buffer
[
idx
].
size
;
// TODO:memory insufficient
totSize
-=
pTopic
->
buffer
[
idx
].
size
;
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
// return msgs already copied
break
;
}
*
pRsp
=
ptr
;
break
;
}
*
((
int64_t
*
)
buffer
)
=
topicHandle
->
topicId
;
*
((
int64_t
*
)
buffer
)
=
pTopic
->
topicId
;
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
*
((
int64_t
*
)
buffer
)
=
topicHandle
->
buffer
[
idx
].
size
;
*
((
int64_t
*
)
buffer
)
=
pTopic
->
buffer
[
idx
].
size
;
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
memcpy
(
buffer
,
topicHandle
->
buffer
[
idx
].
content
,
topicHandle
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
topicHandle
->
buffer
[
idx
].
size
);
memcpy
(
buffer
,
pTopic
->
buffer
[
idx
].
content
,
pTopic
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
pTopic
->
buffer
[
idx
].
size
);
numOfMsgs
++
;
if
(
totSize
>
sizeLimit
)
{
break
;
}
}
}
return
totSize
;
(
*
pRsp
)
->
bodySize
=
totSize
;
return
numOfMsgs
;
}
STqGroup
*
tqGetGroup
(
STQ
*
pTq
,
int64_t
clientId
)
{
return
tqHandleGet
(
pTq
->
tqMeta
,
clientId
);
}
...
...
@@ -275,28 +314,130 @@ int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
return
0
;
}
int
tqConsume
(
STQ
*
pTq
,
STqConsumeReq
*
pMsg
)
{
// temporary
int
tqProcessCMsg
(
STQ
*
pTq
,
STqConsumeReq
*
pMsg
,
STqRspHandle
*
pRsp
)
{
int64_t
clientId
=
pMsg
->
head
.
clientId
;
STqGroup
*
pGroup
=
tqGetGroup
(
pTq
,
clientId
);
if
(
pGroup
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_GROUP_NOT_SET
;
return
-
1
;
}
pGroup
->
rspHandle
.
handle
=
pRsp
->
handle
;
pGroup
->
rspHandle
.
ahandle
=
pRsp
->
ahandle
;
STqConsumeRsp
*
pRsp
=
(
STqConsumeRsp
*
)
pMsg
;
int
numOfMsgs
=
tqFetch
(
pGroup
,
(
void
**
)
&
pRsp
->
msgs
);
if
(
numOfMsgs
<
0
)
{
return
0
;
}
int
tqConsume
(
STQ
*
pTq
,
SRpcMsg
*
pReq
,
SRpcMsg
**
pRsp
)
{
STqConsumeReq
*
pMsg
=
pReq
->
pCont
;
int64_t
clientId
=
pMsg
->
head
.
clientId
;
STqGroup
*
pGroup
=
tqGetGroup
(
pTq
,
clientId
);
if
(
pGroup
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_GROUP_NOT_SET
;
return
-
1
;
}
if
(
numOfMsgs
==
0
)
{
SList
*
topicList
=
pGroup
->
topicList
;
int
totSize
=
0
;
int
numOfMsgs
=
0
;
int
sizeLimit
=
4096
;
STqConsumeRsp
*
pCsmRsp
=
(
*
pRsp
)
->
pCont
;
void
*
ptr
=
realloc
((
*
pRsp
)
->
pCont
,
sizeof
(
STqConsumeRsp
)
+
sizeLimit
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
(
*
pRsp
)
->
pCont
=
ptr
;
SListIter
iter
;
tdListInitIter
(
topicList
,
&
iter
,
TD_LIST_FORWARD
);
STqMsgContent
*
buffer
=
NULL
;
SArray
*
pArray
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
SListNode
*
pn
;
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
STqTopic
*
pTopic
=
*
(
STqTopic
**
)
pn
->
data
;
int
idx
=
pTopic
->
floatingCursor
%
TQ_BUFFER_SIZE
;
STqMsgItem
*
pItem
=
&
pTopic
->
buffer
[
idx
];
if
(
pItem
->
content
!=
NULL
&&
pItem
->
offset
==
pTopic
->
floatingCursor
)
{
if
(
pItem
->
status
==
TQ_ITEM_READY
)
{
//if has data
totSize
+=
pTopic
->
buffer
[
idx
].
size
;
if
(
totSize
>
sizeLimit
)
{
void
*
ptr
=
realloc
((
*
pRsp
)
->
pCont
,
sizeof
(
STqConsumeRsp
)
+
totSize
);
if
(
ptr
==
NULL
)
{
totSize
-=
pTopic
->
buffer
[
idx
].
size
;
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
// return msgs already copied
break
;
}
(
*
pRsp
)
->
pCont
=
ptr
;
break
;
}
*
((
int64_t
*
)
buffer
)
=
htobe64
(
pTopic
->
topicId
);
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
*
((
int64_t
*
)
buffer
)
=
htobe64
(
pTopic
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
memcpy
(
buffer
,
pTopic
->
buffer
[
idx
].
content
,
pTopic
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
pTopic
->
buffer
[
idx
].
size
);
numOfMsgs
++
;
if
(
totSize
>
sizeLimit
)
{
break
;
}
}
else
if
(
pItem
->
status
==
TQ_ITEM_PROCESS
)
{
//if not have data but in process
}
else
if
(
pItem
->
status
==
TQ_ITEM_EMPTY
){
//if not have data and not in process
int32_t
old
=
atomic_val_compare_exchange_32
(
&
pItem
->
status
,
TQ_ITEM_EMPTY
,
TQ_ITEM_PROCESS
);
if
(
old
!=
TQ_ITEM_EMPTY
)
{
continue
;
}
pItem
->
offset
=
pTopic
->
floatingCursor
;
taosArrayPush
(
pArray
,
&
pItem
);
}
else
{
ASSERT
(
0
);
}
}
}
if
(
numOfMsgs
>
0
)
{
// set code and other msg
rpcSendResponse
(
*
pRsp
);
}
else
{
// most recent data has been fetched
// enable timer for blocking wait
// once new data written during wait time
// launch query and response
// once new data written when waiting, launch query and rsp
}
// fetched a num of msgs, rpc response
for
(
int
i
=
0
;
i
<
pArray
->
size
;
i
++
)
{
STqMsgItem
*
pItem
=
taosArrayGet
(
pArray
,
i
);
//read from wal
void
*
raw
=
NULL
;
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
int
code
=
pTq
->
tqLogHandle
->
logRead
(
pItem
->
pTopic
->
logReader
,
&
raw
,
pItem
->
offset
);
if
(
code
<
0
)
{
//TODO: error
}
//get msgType
//if submitblk
pItem
->
executor
->
assign
(
pItem
->
executor
->
runtimeEnv
,
raw
);
SSDataBlock
*
content
=
pItem
->
executor
->
exec
(
pItem
->
executor
->
runtimeEnv
);
pItem
->
content
=
content
;
//if other type, send just put into buffer
/*pItem->content = raw;*/
int32_t
old
=
atomic_val_compare_exchange_32
(
&
pItem
->
status
,
TQ_ITEM_PROCESS
,
TQ_ITEM_READY
);
ASSERT
(
old
==
TQ_ITEM_PROCESS
);
}
taosArrayDestroy
(
pArray
);
return
0
;
}
...
...
@@ -378,10 +519,10 @@ void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
*
(
int64_t
*
)
ptr
=
pTopic
->
topicId
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
*
(
int32_t
*
)
ptr
=
pTopic
->
head
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int32_t
));
*
(
int32_t
*
)
ptr
=
pTopic
->
tail
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int32_t
));
/**(int32_t*)ptr = pTopic->head;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
/**(int32_t*)ptr = pTopic->tail;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
for
(
int
i
=
0
;
i
<
TQ_BUFFER_SIZE
;
i
++
)
{
ptr
=
tqSerializeItem
(
&
pTopic
->
buffer
[
i
],
ptr
);
}
...
...
@@ -435,10 +576,10 @@ const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
topic
->
topicId
=
*
(
int64_t
*
)
ptr
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
topic
->
head
=
*
(
int32_t
*
)
ptr
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int32_t
));
topic
->
tail
=
*
(
int32_t
*
)
ptr
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int32_t
));
/*topic->head = *(int32_t*)ptr;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
/*topic->tail = *(int32_t*)ptr;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
for
(
int
i
=
0
;
i
<
TQ_BUFFER_SIZE
;
i
++
)
{
ptr
=
tqDeserializeItem
(
ptr
,
&
topic
->
buffer
[
i
]);
}
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
021c5176
...
...
@@ -605,7 +605,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
SName
*
name
=
taosArrayGet
(
pReq
->
pTableName
,
i
);
STableMeta
*
pTableMeta
=
NULL
;
snprintf
(
dbName
,
sizeof
(
dbName
),
"%
s
.%s"
,
name
->
acctId
,
name
->
dbname
);
snprintf
(
dbName
,
sizeof
(
dbName
),
"%
d
.%s"
,
name
->
acctId
,
name
->
dbname
);
CTG_ERR_JRET
(
catalogGetTableMeta
(
pCatalog
,
pRpc
,
pMgmtEps
,
dbName
,
name
->
tname
,
&
pTableMeta
));
...
...
source/libs/index/inc/index_tfile.h
浏览文件 @
021c5176
...
...
@@ -47,7 +47,6 @@ typedef struct TFileHeader {
typedef
struct
TFileCacheKey
{
uint64_t
suid
;
uint8_t
colType
;
int32_t
version
;
char
*
colName
;
int32_t
nColName
;
}
TFileCacheKey
;
...
...
@@ -67,6 +66,7 @@ typedef struct TFileWriter {
uint32_t
offset
;
}
TFileWriter
;
// multi reader and single write
typedef
struct
TFileReader
{
T_REF_DECLARE
()
Fst
*
fst
;
...
...
source/libs/index/src/index_tfile.c
浏览文件 @
021c5176
...
...
@@ -38,11 +38,14 @@ static int tfileWriteHeader(TFileWriter* writer);
static
int
tfileWriteFstOffset
(
TFileWriter
*
tw
,
int32_t
offset
);
static
int
tfileWriteData
(
TFileWriter
*
write
,
TFileValue
*
tval
);
static
int
tfileReadLoadHeader
(
TFileReader
*
reader
);
static
int
tfileReadLoadFst
(
TFileReader
*
reader
);
static
int
tfileReadLoadTableIds
(
TFileReader
*
reader
,
int32_t
offset
,
SArray
*
result
);
static
int
tfileReadLoadHeader
(
TFileReader
*
reader
);
static
int
tfileReadLoadFst
(
TFileReader
*
reader
);
static
int
tfileReadLoadTableIds
(
TFileReader
*
reader
,
int32_t
offset
,
SArray
*
result
);
static
void
tfileReadRef
(
TFileReader
*
reader
);
static
void
tfileReadUnRef
(
TFileReader
*
reader
);
static
int
tfileGetFileList
(
const
char
*
path
,
SArray
*
result
);
static
int
tfileRmExpireFile
(
SArray
*
result
);
static
void
tfileDestroyFileName
(
void
*
elem
);
static
int
tfileCompare
(
const
void
*
a
,
const
void
*
b
);
static
int
tfileParseFileName
(
const
char
*
filename
,
uint64_t
*
suid
,
int
*
colId
,
int
*
version
);
...
...
@@ -58,6 +61,8 @@ TFileCache* tfileCacheCreate(const char* path) {
SArray
*
files
=
taosArrayInit
(
4
,
sizeof
(
void
*
));
tfileGetFileList
(
path
,
files
);
taosArraySort
(
files
,
tfileCompare
);
tfileRmExpireFile
(
files
);
uint64_t
suid
;
int32_t
colId
,
version
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
files
);
i
++
)
{
...
...
@@ -66,29 +71,29 @@ TFileCache* tfileCacheCreate(const char* path) {
indexInfo
(
"try parse invalid file: %s, skip it"
,
file
);
continue
;
}
WriterCtx
*
wc
=
writerCtxCreate
(
TFile
,
file
,
true
,
1024
*
64
);
if
(
wc
==
NULL
)
{
indexError
(
"failed to open index: %s"
,
file
);
goto
End
;
}
TFileReader
*
reader
=
tfileReaderCreate
(
wc
);
if
(
0
!=
tfileReadLoadHeader
(
reader
))
{
tfileReaderDestroy
(
reader
);
indexError
(
"failed to load index header, index file: %s"
,
file
);
goto
End
;
}
if
(
0
!=
tfileReadLoadFst
(
reader
))
{
tfileReaderDestroy
(
reader
);
indexError
(
"failed to load index fst, index file: %s"
,
file
);
goto
End
;
}
tfileReadRef
(
reader
);
// loader fst and validate it
TFileHeader
*
header
=
&
reader
->
header
;
TFileCacheKey
key
=
{.
suid
=
header
->
suid
,
.
version
=
header
->
version
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
),
.
colType
=
header
->
colType
};
TFileCacheKey
key
=
{.
suid
=
header
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
),
.
colType
=
header
->
colType
};
char
buf
[
128
]
=
{
0
};
tfileSerialCacheKey
(
&
key
,
buf
);
...
...
@@ -110,7 +115,7 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader
*
p
=
*
reader
;
indexInfo
(
"drop table cache suid: %"
PRIu64
", colName: %s, colType: %d"
,
p
->
header
.
suid
,
p
->
header
.
colName
,
p
->
header
.
colType
);
tfileRead
erDestroy
(
p
);
tfileRead
UnRef
(
p
);
reader
=
taosHashIterate
(
tcache
->
tableCache
,
reader
);
}
taosHashCleanup
(
tcache
->
tableCache
);
...
...
@@ -120,12 +125,24 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
)
{
char
buf
[
128
]
=
{
0
};
tfileSerialCacheKey
(
key
,
buf
);
TFileReader
*
reader
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
strlen
(
buf
));
tfileReadRef
(
reader
);
return
reader
;
}
void
tfileCachePut
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
,
TFileReader
*
reader
)
{
char
buf
[
128
]
=
{
0
};
tfileSerialCacheKey
(
key
,
buf
);
// remove last version index reader
TFileReader
**
p
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
strlen
(
buf
));
if
(
*
p
!=
NULL
)
{
TFileReader
*
oldReader
=
*
p
;
taosHashRemove
(
tcache
->
tableCache
,
buf
,
strlen
(
buf
));
tfileReadUnRef
(
oldReader
);
}
tfileReadRef
(
reader
);
taosHashPut
(
tcache
->
tableCache
,
buf
,
strlen
(
buf
),
&
reader
,
sizeof
(
void
*
));
return
;
}
...
...
@@ -147,25 +164,29 @@ void tfileReaderDestroy(TFileReader* reader) {
}
int
tfileReaderSearch
(
TFileReader
*
reader
,
SIndexTermQuery
*
query
,
SArray
*
result
)
{
SIndexTerm
*
term
=
query
->
term
;
SIndexTerm
*
term
=
query
->
term
;
EIndexQueryType
qtype
=
query
->
qType
;
int
ret
=
-
1
;
// refactor to callback later
if
(
q
uery
->
qT
ype
==
QUERY_TERM
)
{
if
(
q
t
ype
==
QUERY_TERM
)
{
uint64_t
offset
;
FstSlice
key
=
fstSliceCreate
(
term
->
colVal
,
term
->
nColVal
);
if
(
fstGet
(
reader
->
fst
,
&
key
,
&
offset
))
{
return
tfileReadLoadTableIds
(
reader
,
offset
,
result
);
indexInfo
(
"index: %"
PRIu64
", col: %s, colVal: %s, found table info in tindex"
,
term
->
suid
,
term
->
colName
,
term
->
colVal
);
ret
=
tfileReadLoadTableIds
(
reader
,
offset
,
result
);
}
else
{
indexInfo
(
"index: %"
PRIu64
", col: %s, colVal: %s, not found in tindex"
,
term
->
suid
,
term
->
colName
,
term
->
colVal
);
indexInfo
(
"index: %"
PRIu64
", col: %s, colVal: %s, not found
table info
in tindex"
,
term
->
suid
,
term
->
colName
,
term
->
colVal
);
}
return
0
;
}
else
if
(
q
uery
->
qT
ype
==
QUERY_PREFIX
)
{
fstSliceDestroy
(
&
key
)
;
}
else
if
(
q
t
ype
==
QUERY_PREFIX
)
{
// handle later
//
}
else
{
// handle later
}
return
0
;
tfileReadUnRef
(
reader
);
return
ret
;
}
TFileWriter
*
tfileWriterCreate
(
WriterCtx
*
ctx
,
TFileHeader
*
header
)
{
...
...
@@ -209,7 +230,6 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
int32_t
tbsz
=
taosArrayGetSize
(
v
->
tableId
);
fstOffset
+=
TF_TABLE_TATOAL_SIZE
(
tbsz
);
}
// check result or not
tfileWriteFstOffset
(
tw
,
fstOffset
);
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -237,6 +257,7 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
// write reversed data in buf to tindex
tw
->
ctx
->
write
(
tw
->
ctx
,
buf
,
offset
);
}
tfree
(
buf
);
// write fst
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -244,11 +265,8 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
TFileValue
*
v
=
taosArrayGetP
((
SArray
*
)
data
,
i
);
if
(
tfileWriteData
(
tw
,
v
)
==
0
)
{
//
//
}
}
tfree
(
buf
);
return
0
;
}
void
tfileWriterDestroy
(
TFileWriter
*
tw
)
{
...
...
@@ -270,17 +288,19 @@ void IndexTFileDestroy(IndexTFile* tfile) {
}
int
indexTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SArray
*
result
)
{
if
(
tfile
==
NULL
)
{
return
-
1
;
}
int
ret
=
-
1
;
if
(
tfile
==
NULL
)
{
return
ret
;
}
IndexTFile
*
pTfile
=
(
IndexTFile
*
)
tfile
;
SIndexTerm
*
term
=
query
->
term
;
TFileCacheKey
key
=
{.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
version
=
0
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
};
TFileCacheKey
key
=
{.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
};
TFileReader
*
reader
=
tfileCacheGet
(
pTfile
->
cache
,
&
key
);
TFileReader
*
reader
=
tfileCacheGet
(
pTfile
->
cache
,
&
key
);
return
tfileReaderSearch
(
reader
,
query
,
result
);
}
int
indexTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
TFileWriterOpt
wOpt
=
{.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
,
.
version
=
1
};
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version =
// 1};
return
0
;
}
...
...
@@ -309,8 +329,7 @@ static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
return
0
;
}
static
int
tfileWriteHeader
(
TFileWriter
*
writer
)
{
char
buf
[
TFILE_HEADER_NO_FST
]
=
{
0
};
char
*
p
=
buf
;
char
buf
[
TFILE_HEADER_NO_FST
]
=
{
0
};
TFileHeader
*
header
=
&
writer
->
header
;
memcpy
(
buf
,
(
char
*
)
header
,
sizeof
(
buf
));
...
...
@@ -338,8 +357,7 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
}
static
int
tfileReadLoadHeader
(
TFileReader
*
reader
)
{
// TODO simple tfile header later
char
buf
[
TFILE_HEADER_SIZE
]
=
{
0
};
char
*
p
=
buf
;
char
buf
[
TFILE_HEADER_SIZE
]
=
{
0
};
int64_t
nread
=
reader
->
ctx
->
read
(
reader
->
ctx
,
buf
,
sizeof
(
buf
));
assert
(
nread
==
sizeof
(
buf
));
...
...
@@ -368,20 +386,32 @@ static int tfileReadLoadFst(TFileReader* reader) {
static
int
tfileReadLoadTableIds
(
TFileReader
*
reader
,
int32_t
offset
,
SArray
*
result
)
{
int32_t
nid
;
WriterCtx
*
ctx
=
reader
->
ctx
;
int32_t
nread
=
ctx
->
readFrom
(
ctx
,
(
char
*
)
&
nid
,
sizeof
(
nid
),
offset
);
int32_t
nread
=
ctx
->
readFrom
(
ctx
,
(
char
*
)
&
nid
,
sizeof
(
nid
),
offset
);
assert
(
sizeof
(
nid
)
==
nread
);
char
*
buf
=
calloc
(
1
,
sizeof
(
uint64_t
)
*
nid
);
int32_t
total
=
sizeof
(
uint64_t
)
*
nid
;
char
*
buf
=
calloc
(
1
,
total
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
nread
=
ctx
->
read
(
ctx
,
buf
,
sizeof
(
uint64_t
)
*
nid
);
uint64_t
*
ids
=
(
uint64_t
*
)
buf
;
nread
=
ctx
->
read
(
ctx
,
buf
,
total
);
assert
(
total
==
nread
);
for
(
int32_t
i
=
0
;
i
<
nid
;
i
++
)
{
taosArrayPush
(
result
,
ids
+
i
);
taosArrayPush
(
result
,
(
uint64_t
*
)
buf
+
i
);
}
free
(
buf
);
return
0
;
}
static
void
tfileReadRef
(
TFileReader
*
reader
)
{
int
ref
=
T_REF_INC
(
reader
);
UNUSED
(
ref
);
}
static
void
tfileReadUnRef
(
TFileReader
*
reader
)
{
int
ref
=
T_REF_DEC
(
reader
);
if
(
ref
==
0
)
{
tfileReaderDestroy
(
reader
);
}
}
static
int
tfileGetFileList
(
const
char
*
path
,
SArray
*
result
)
{
DIR
*
dir
=
opendir
(
path
);
...
...
@@ -397,6 +427,10 @@ static int tfileGetFileList(const char* path, SArray* result) {
closedir
(
dir
);
return
0
;
}
static
int
tfileRmExpireFile
(
SArray
*
result
)
{
// TODO(yihao): remove expire tindex after restart
return
0
;
}
static
void
tfileDestroyFileName
(
void
*
elem
)
{
char
*
p
=
*
(
char
**
)
elem
;
free
(
p
);
...
...
@@ -423,7 +457,5 @@ static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
colType
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
version
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
SERIALIZE_STR_MEM_TO_BUF
(
buf
,
key
,
colName
,
key
->
nColName
);
}
source/libs/parser/inc/astToMsg.h
浏览文件 @
021c5176
...
...
@@ -9,5 +9,6 @@ SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
SDropUserMsg
*
buildDropUserMsg
(
SSqlInfo
*
pInfo
,
int32_t
*
outputLen
,
int64_t
id
,
char
*
msgBuf
,
int32_t
msgLen
);
SShowMsg
*
buildShowMsg
(
SShowInfo
*
pShowInfo
,
int64_t
id
,
char
*
msgBuf
,
int32_t
msgLen
);
SCreateDbMsg
*
buildCreateDbMsg
(
SCreateDbInfo
*
pCreateDbInfo
,
char
*
msgBuf
,
int32_t
msgLen
);
SCreateStbMsg
*
buildCreateTableMsg
(
SCreateTableSql
*
pCreateTableSql
,
int32_t
*
len
,
SParseBasicCtx
*
pParseCtx
,
SMsgBuf
*
pMsgBuf
);
#endif // TDENGINE_ASTTOMSG_H
source/libs/parser/inc/dataBlockMgt.h
浏览文件 @
021c5176
...
...
@@ -78,7 +78,7 @@ typedef struct {
typedef
struct
STableDataBlocks
{
int8_t
tsSource
;
// where does the UNIX timestamp come from, server or client
bool
ordered
;
// if current rows are ordered or not
int
64
_t
vgId
;
// virtual group id
int
32
_t
vgId
;
// virtual group id
int64_t
prevTS
;
// previous timestamp, recorded to decide if the records array is ts ascending
int32_t
numOfTables
;
// number of tables in current submit block
int32_t
rowSize
;
// row size for current table
...
...
source/libs/parser/inc/parserInt.h
浏览文件 @
021c5176
...
...
@@ -38,6 +38,14 @@ typedef struct SMsgBuf {
char
*
buf
;
}
SMsgBuf
;
// create table operation type
enum
TSQL_CREATE_TABLE_TYPE
{
TSQL_CREATE_TABLE
=
0x1
,
TSQL_CREATE_STABLE
=
0x2
,
TSQL_CREATE_CTABLE
=
0x3
,
TSQL_CREATE_STREAM
=
0x4
,
};
void
clearTableMetaInfo
(
STableMetaInfo
*
pTableMetaInfo
);
void
clearAllTableMetaInfo
(
SQueryStmtInfo
*
pQueryInfo
,
bool
removeMeta
,
uint64_t
id
);
...
...
@@ -60,7 +68,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
* @param type
* @return
*/
int32_t
qParserValidateDclSqlNode
(
SSqlInfo
*
pInfo
,
int64_t
id
,
void
**
output
,
int32_t
*
outputLen
,
int32_t
*
type
,
char
*
msgBuf
,
int32_t
msgBufLen
);
int32_t
qParserValidateDclSqlNode
(
SSqlInfo
*
pInfo
,
SParseBasicCtx
*
pCtx
,
void
**
output
,
int32_t
*
outputLen
,
int32_t
*
type
,
char
*
msgBuf
,
int32_t
msgBufLen
);
/**
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
...
...
source/libs/parser/inc/parserUtil.h
浏览文件 @
021c5176
...
...
@@ -67,6 +67,8 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo);
STableMeta
*
tableMetaDup
(
const
STableMeta
*
pTableMeta
);
bool
isDclSqlStatement
(
SSqlInfo
*
pSqlInfo
);
bool
isDdlSqlStatement
(
SSqlInfo
*
pSqlInfo
);
bool
isDqlSqlStatement
(
SSqlInfo
*
pSqlInfo
);
#ifdef __cplusplus
}
...
...
source/libs/parser/src/astGenerator.c
浏览文件 @
021c5176
...
...
@@ -13,9 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taos.h"
#include "os.h"
#include "astGenerator.h"
#include <parserInt.h>
#include "os.h"
#include "taos.h"
#include "tmsgtype.h"
SArray
*
tListItemAppend
(
SArray
*
pList
,
SVariant
*
pVar
,
uint8_t
sortOrder
)
{
...
...
source/libs/parser/src/astToMsg.c
浏览文件 @
021c5176
#include <astGenerator.h>
#include "parserInt.h"
#include "parserUtil.h"
...
...
@@ -219,3 +220,126 @@ SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, char* msgBuf, int32
return
pCreateMsg
;
}
int32_t
createSName
(
SName
*
pName
,
SToken
*
pTableName
,
SParseBasicCtx
*
pParseCtx
,
SMsgBuf
*
pMsgBuf
)
{
const
char
*
msg1
=
"name too long"
;
const
char
*
msg2
=
"acctId too long"
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
char
*
p
=
strnchr
(
pTableName
->
z
,
TS_PATH_DELIMITER
[
0
],
pTableName
->
n
,
false
);
if
(
p
!=
NULL
)
{
// db has been specified in sql string so we ignore current db path
code
=
tNameSetAcctId
(
pName
,
pParseCtx
->
acctId
);
if
(
code
!=
0
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
);
}
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
strncpy
(
name
,
pTableName
->
z
,
pTableName
->
n
);
code
=
tNameFromString
(
pName
,
name
,
T_NAME_DB
|
T_NAME_TABLE
);
if
(
code
!=
0
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
}
}
else
{
// get current DB name first, and then set it into path
if
(
pTableName
->
n
>=
TSDB_TABLE_NAME_LEN
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
}
tNameSetDbName
(
pName
,
pParseCtx
->
acctId
,
pParseCtx
->
db
,
strlen
(
pParseCtx
->
db
));
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
strncpy
(
name
,
pTableName
->
z
,
pTableName
->
n
);
code
=
tNameFromString
(
pName
,
name
,
T_NAME_TABLE
);
if
(
code
!=
0
)
{
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
}
}
return
code
;
}
SCreateStbMsg
*
buildCreateTableMsg
(
SCreateTableSql
*
pCreateTableSql
,
int32_t
*
len
,
SParseBasicCtx
*
pParseCtx
,
SMsgBuf
*
pMsgBuf
)
{
SSchema
*
pSchema
;
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pCreateTableSql
->
colInfo
.
pColumns
);
int32_t
numOfTags
=
(
int32_t
)
taosArrayGetSize
(
pCreateTableSql
->
colInfo
.
pTagColumns
);
SCreateStbMsg
*
pCreateTableMsg
=
(
SCreateStbMsg
*
)
calloc
(
1
,
sizeof
(
SCreateStbMsg
)
+
(
numOfCols
+
numOfTags
)
*
sizeof
(
SSchema
));
char
*
pMsg
=
NULL
;
int8_t
type
=
pCreateTableSql
->
type
;
if
(
type
==
TSQL_CREATE_TABLE
)
{
// create by using super table, tags value
#if 0
SArray* list = pInfo->pCreateTableInfo->childTableInfo;
int32_t numOfTables = (int32_t)taosArrayGetSize(list);
pCreateTableMsg->numOfTables = htonl(numOfTables);
pMsg = (char*)pCreateMsg;
for (int32_t i = 0; i < numOfTables; ++i) {
SCreateTableMsg* pCreate = (SCreateTableMsg*)pMsg;
pCreate->numOfColumns = htons(pCmd->numOfCols);
pCreate->numOfTags = htons(pCmd->count);
pMsg += sizeof(SCreateTableMsg);
SCreatedTableInfo* p = taosArrayGet(list, i);
strcpy(pCreate->tableName, p->fullname);
pCreate->igExists = (p->igExist) ? 1 : 0;
// use dbinfo from table id without modifying current db info
pMsg = serializeTagData(&p->tagdata, pMsg);
int32_t len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
}
#endif
}
else
{
// create (super) table
SName
n
=
{
0
};
int32_t
code
=
createSName
(
&
n
,
&
pCreateTableSql
->
name
,
pParseCtx
,
pMsgBuf
);
if
(
code
!=
0
)
{
return
NULL
;
}
code
=
tNameExtractFullName
(
&
n
,
pCreateTableMsg
->
name
);
if
(
code
!=
0
)
{
buildInvalidOperationMsg
(
pMsgBuf
,
"invalid table name or database not specified"
);
return
NULL
;
}
pCreateTableMsg
->
igExists
=
pCreateTableSql
->
existCheck
?
1
:
0
;
pCreateTableMsg
->
numOfColumns
=
htonl
(
numOfCols
);
pCreateTableMsg
->
numOfTags
=
htonl
(
numOfTags
);
pSchema
=
(
SSchema
*
)
pCreateTableMsg
->
pSchema
;
for
(
int
i
=
0
;
i
<
numOfCols
;
++
i
)
{
TAOS_FIELD
*
pField
=
taosArrayGet
(
pCreateTableSql
->
colInfo
.
pColumns
,
i
);
pSchema
->
type
=
pField
->
type
;
pSchema
->
bytes
=
htonl
(
pField
->
bytes
);
strcpy
(
pSchema
->
name
,
pField
->
name
);
pSchema
++
;
}
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
TAOS_FIELD
*
pField
=
taosArrayGet
(
pCreateTableSql
->
colInfo
.
pTagColumns
,
i
);
pSchema
->
type
=
pField
->
type
;
pSchema
->
bytes
=
htonl
(
pField
->
bytes
);
strcpy
(
pSchema
->
name
,
pField
->
name
);
pSchema
++
;
}
pMsg
=
(
char
*
)
pSchema
;
}
int32_t
msgLen
=
(
int32_t
)(
pMsg
-
(
char
*
)
pCreateTableMsg
);
*
len
=
msgLen
;
return
pCreateTableMsg
;
}
source/libs/parser/src/astValidate.c
浏览文件 @
021c5176
...
...
@@ -4171,7 +4171,144 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qParserValidateDclSqlNode
(
SSqlInfo
*
pInfo
,
int64_t
id
,
void
**
output
,
int32_t
*
outputLen
,
int32_t
*
type
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
/* is contained in pFieldList or not */
static
bool
has
(
SArray
*
pFieldList
,
int32_t
startIndex
,
const
char
*
name
)
{
size_t
numOfCols
=
taosArrayGetSize
(
pFieldList
);
for
(
int32_t
j
=
startIndex
;
j
<
numOfCols
;
++
j
)
{
TAOS_FIELD
*
field
=
taosArrayGet
(
pFieldList
,
j
);
if
(
strncasecmp
(
name
,
field
->
name
,
sizeof
(
field
->
name
)
-
1
)
==
0
)
return
true
;
}
return
false
;
}
static
int32_t
validateTableColumns
(
SArray
*
pFieldList
,
int32_t
maxRowLength
,
int32_t
maxColumns
,
SMsgBuf
*
pMsgBuf
)
{
const
char
*
msg2
=
"row length exceeds max length"
;
const
char
*
msg3
=
"duplicated column names"
;
const
char
*
msg4
=
"invalid data type"
;
const
char
*
msg5
=
"invalid binary/nchar column length"
;
const
char
*
msg6
=
"invalid column name"
;
const
char
*
msg7
=
"too many columns"
;
const
char
*
msg8
=
"illegal number of columns"
;
size_t
numOfCols
=
taosArrayGetSize
(
pFieldList
);
if
(
numOfCols
>
maxColumns
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg7
);
}
int32_t
rowLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
TAOS_FIELD
*
pField
=
taosArrayGet
(
pFieldList
,
i
);
if
(
!
isValidDataType
(
pField
->
type
))
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg4
);
}
if
(
pField
->
bytes
==
0
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg5
);
}
if
((
pField
->
type
==
TSDB_DATA_TYPE_BINARY
&&
(
pField
->
bytes
<=
0
||
pField
->
bytes
>
TSDB_MAX_BINARY_LEN
))
||
(
pField
->
type
==
TSDB_DATA_TYPE_NCHAR
&&
(
pField
->
bytes
<=
0
||
pField
->
bytes
>
TSDB_MAX_NCHAR_LEN
)))
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg5
);
}
SToken
nameToken
=
{.
z
=
pField
->
name
,
.
n
=
strlen
(
pField
->
name
),
.
type
=
TK_ID
};
if
(
parserValidateNameToken
(
&
nameToken
)
!=
TSDB_CODE_SUCCESS
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg6
);
}
// field name must be unique
if
(
has
(
pFieldList
,
i
+
1
,
pField
->
name
)
==
true
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
);
}
rowLen
+=
pField
->
bytes
;
}
// max row length must be less than TSDB_MAX_BYTES_PER_ROW
if
(
rowLen
>
maxRowLength
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
validateTableColumnInfo
(
SArray
*
pFieldList
,
SMsgBuf
*
pMsgBuf
)
{
assert
(
pFieldList
!=
NULL
);
const
char
*
msg1
=
"first column must be timestamp"
;
const
char
*
msg2
=
"row length exceeds max length"
;
const
char
*
msg3
=
"duplicated column names"
;
const
char
*
msg4
=
"invalid data type"
;
const
char
*
msg5
=
"invalid binary/nchar column length"
;
const
char
*
msg6
=
"invalid column name"
;
const
char
*
msg7
=
"too many columns"
;
const
char
*
msg8
=
"illegal number of columns"
;
// first column must be timestamp
TAOS_FIELD
*
pField
=
taosArrayGet
(
pFieldList
,
0
);
if
(
pField
->
type
!=
TSDB_DATA_TYPE_TIMESTAMP
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
}
// number of fields no less than 2
size_t
numOfCols
=
taosArrayGetSize
(
pFieldList
);
if
(
numOfCols
<=
1
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg8
);
}
return
validateTableColumns
(
pFieldList
,
TSDB_MAX_BYTES_PER_ROW
,
TSDB_MAX_COLUMNS
,
pMsgBuf
);
}
static
int32_t
validateTagParams
(
SArray
*
pTagsList
,
SArray
*
pFieldList
,
SMsgBuf
*
pMsgBuf
)
{
assert
(
pTagsList
!=
NULL
);
const
char
*
msg1
=
"invalid number of tag columns"
;
const
char
*
msg3
=
"duplicated column names"
;
// number of fields at least 1
size_t
numOfTags
=
taosArrayGetSize
(
pTagsList
);
if
(
numOfTags
<
1
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
}
// field name must be unique
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
TAOS_FIELD
*
p
=
taosArrayGet
(
pTagsList
,
i
);
if
(
has
(
pFieldList
,
0
,
p
->
name
)
==
true
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
);
}
}
return
validateTableColumns
(
pFieldList
,
TSDB_MAX_TAGS_LEN
,
TSDB_MAX_TAGS
,
pMsgBuf
);
}
int32_t
doCheckForCreateTable
(
SSqlInfo
*
pInfo
,
SMsgBuf
*
pMsgBuf
)
{
const
char
*
msg1
=
"invalid table name"
;
SCreateTableSql
*
pCreateTable
=
pInfo
->
pCreateTableInfo
;
SArray
*
pFieldList
=
pCreateTable
->
colInfo
.
pColumns
;
SArray
*
pTagList
=
pCreateTable
->
colInfo
.
pTagColumns
;
assert
(
pFieldList
!=
NULL
);
// if sql specifies db, use it, otherwise use default db
SToken
*
pzTableName
=
&
(
pCreateTable
->
name
);
bool
dbIncluded
=
false
;
if
(
parserValidateNameToken
(
pzTableName
)
!=
TSDB_CODE_SUCCESS
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
}
if
(
validateTableColumnInfo
(
pFieldList
,
pMsgBuf
)
!=
TSDB_CODE_SUCCESS
||
(
pTagList
!=
NULL
&&
validateTagParams
(
pTagList
,
pFieldList
,
pMsgBuf
)
!=
TSDB_CODE_SUCCESS
))
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qParserValidateDclSqlNode
(
SSqlInfo
*
pInfo
,
SParseBasicCtx
*
pCtx
,
void
**
output
,
int32_t
*
outputLen
,
int32_t
*
type
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
int32_t
code
=
0
;
SMsgBuf
m
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
...
...
@@ -4224,7 +4361,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in
}
}
*
output
=
buildUserManipulationMsg
(
pInfo
,
outputLen
,
i
d
,
msgBuf
,
msgBufLen
);
*
output
=
buildUserManipulationMsg
(
pInfo
,
outputLen
,
pCtx
->
requestI
d
,
msgBuf
,
msgBufLen
);
break
;
}
...
...
@@ -4260,13 +4397,13 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in
}
}
*
output
=
buildAcctManipulationMsg
(
pInfo
,
outputLen
,
i
d
,
msgBuf
,
msgBufLen
);
*
output
=
buildAcctManipulationMsg
(
pInfo
,
outputLen
,
pCtx
->
requestI
d
,
msgBuf
,
msgBufLen
);
break
;
}
case
TSDB_SQL_DROP_ACCT
:
case
TSDB_SQL_DROP_USER
:
{
*
output
=
buildDropUserMsg
(
pInfo
,
outputLen
,
i
d
,
msgBuf
,
msgBufLen
);
*
output
=
buildDropUserMsg
(
pInfo
,
outputLen
,
pCtx
->
requestI
d
,
msgBuf
,
msgBufLen
);
break
;
}
...
...
@@ -4275,6 +4412,28 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in
break
;
}
case
TSDB_SQL_USE_DB
:
{
const
char
*
msg
=
"invalid db name"
;
SToken
*
pToken
=
taosArrayGet
(
pInfo
->
pMiscInfo
->
a
,
0
);
if
(
parserValidateNameToken
(
pToken
)
!=
TSDB_CODE_SUCCESS
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg
);
}
SName
n
=
{
0
};
int32_t
ret
=
tNameSetDbName
(
&
n
,
pCtx
->
acctId
,
pToken
->
z
,
pToken
->
n
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg
);
}
SUseDbMsg
*
pUseDbMsg
=
(
SUseDbMsg
*
)
calloc
(
1
,
sizeof
(
SUseDbMsg
));
tNameExtractFullName
(
&
n
,
pUseDbMsg
->
db
);
*
output
=
pUseDbMsg
;
*
outputLen
=
sizeof
(
SUseDbMsg
);
break
;
}
case
TSDB_SQL_ALTER_DB
:
case
TSDB_SQL_CREATE_DB
:
{
const
char
*
msg1
=
"invalid db name"
;
...
...
@@ -4304,6 +4463,26 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in
break
;
}
case
TSDB_SQL_CREATE_TABLE
:
{
SCreateTableSql
*
pCreateTable
=
pInfo
->
pCreateTableInfo
;
if
(
pCreateTable
->
type
==
TSQL_CREATE_TABLE
||
pCreateTable
->
type
==
TSQL_CREATE_STABLE
)
{
if
((
code
=
doCheckForCreateTable
(
pInfo
,
pMsgBuf
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
*
output
=
buildCreateTableMsg
(
pCreateTable
,
outputLen
,
pCtx
,
pMsgBuf
);
}
else
if
(
pCreateTable
->
type
==
TSQL_CREATE_CTABLE
)
{
// if ((code = doCheckForCreateFromStable(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
// return code;
// }
}
else
if
(
pCreateTable
->
type
==
TSQL_CREATE_STREAM
)
{
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
// return code;
}
break
;
}
default:
break
;
}
...
...
source/libs/parser/src/dataBlockMgt.c
浏览文件 @
021c5176
...
...
@@ -123,7 +123,6 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
dataBuf
->
nAllocSize
=
dataBuf
->
headerSize
*
2
;
}
//dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf
->
pData
=
malloc
(
dataBuf
->
nAllocSize
);
if
(
dataBuf
->
pData
==
NULL
)
{
tfree
(
dataBuf
);
...
...
@@ -145,8 +144,6 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
dataBuf
->
tsSource
=
-
1
;
dataBuf
->
vgId
=
dataBuf
->
pTableMeta
->
vgId
;
// tNameAssign(&dataBuf->tableName, name);
assert
(
defaultSize
>
0
&&
pTableMeta
!=
NULL
&&
dataBuf
->
pTableMeta
!=
NULL
);
*
dataBlocks
=
dataBuf
;
...
...
source/libs/parser/src/insertParser.c
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/parser/src/parser.c
浏览文件 @
021c5176
...
...
@@ -31,7 +31,7 @@ bool qIsInsertSql(const char* pStr, size_t length) {
}
while
(
1
);
}
int32_t
qParseQuerySql
(
const
char
*
pStr
,
size_t
length
,
int64_t
id
,
int32_t
*
type
,
void
**
pOutput
,
int32_t
*
outputLen
,
char
*
msg
,
int32_t
msgLen
)
{
int32_t
qParseQuerySql
(
const
char
*
pStr
,
size_t
length
,
SParseBasicCtx
*
pParseCtx
,
int32_t
*
type
,
void
**
pOutput
,
int32_t
*
outputLen
,
char
*
msg
,
int32_t
msgLen
)
{
SSqlInfo
info
=
doGenerateAST
(
pStr
);
if
(
!
info
.
valid
)
{
strncpy
(
msg
,
info
.
msg
,
msgLen
);
...
...
@@ -39,8 +39,8 @@ int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *typ
return
terrno
;
}
if
(
isDc
lSqlStatement
(
&
info
))
{
int32_t
code
=
qParserValidateDclSqlNode
(
&
info
,
id
,
pOutput
,
outputLen
,
type
,
msg
,
msgLen
);
if
(
!
isDq
lSqlStatement
(
&
info
))
{
int32_t
code
=
qParserValidateDclSqlNode
(
&
info
,
pParseCtx
,
pOutput
,
outputLen
,
type
,
msg
,
msgLen
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
// do nothing
}
...
...
@@ -53,7 +53,7 @@ int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *typ
struct
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
catalogGetHandle
(
NULL
,
&
pCatalog
);
code
=
qParserValidateSqlNode
(
pCatalog
,
&
info
,
pQueryInfo
,
i
d
,
msg
,
msgLen
);
code
=
qParserValidateSqlNode
(
pCatalog
,
&
info
,
pQueryInfo
,
pParseCtx
->
requestI
d
,
msg
,
msgLen
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
*
pOutput
=
pQueryInfo
;
}
...
...
source/libs/parser/src/parserUtil.c
浏览文件 @
021c5176
...
...
@@ -1167,7 +1167,7 @@ void cleanupTagCond(STagCond* pTagCond) {
* @param tableIndex denote the table index for join query, where more than one table exists
* @return
*/
STableMetaInfo
*
getMetaInfo
(
SQueryStmtInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
STableMetaInfo
*
getMetaInfo
(
const
SQueryStmtInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
assert
(
pQueryInfo
!=
NULL
);
if
(
pQueryInfo
->
pTableMetaInfo
==
NULL
)
{
assert
(
pQueryInfo
->
numOfTables
==
0
);
...
...
@@ -1613,7 +1613,18 @@ uint32_t convertRelationalOperator(SToken *pToken) {
}
bool
isDclSqlStatement
(
SSqlInfo
*
pSqlInfo
)
{
return
(
pSqlInfo
->
type
!=
TSDB_SQL_SELECT
);
int32_t
type
=
pSqlInfo
->
type
;
return
(
type
==
TSDB_SQL_CREATE_USER
||
type
==
TSDB_SQL_CREATE_ACCT
||
type
==
TSDB_SQL_DROP_USER
||
type
==
TSDB_SQL_DROP_ACCT
||
type
==
TSDB_SQL_SHOW
);
}
bool
isDdlSqlStatement
(
SSqlInfo
*
pSqlInfo
)
{
int32_t
type
=
pSqlInfo
->
type
;
return
(
type
==
TSDB_SQL_CREATE_TABLE
||
type
==
TSDB_SQL_CREATE_DB
||
type
==
TSDB_SQL_DROP_DB
);
}
bool
isDqlSqlStatement
(
SSqlInfo
*
pSqlInfo
)
{
return
pSqlInfo
->
type
==
TSDB_SQL_SELECT
;
}
#if 0
...
...
source/libs/parser/src/queryInfoUtil.c
浏览文件 @
021c5176
...
...
@@ -354,7 +354,7 @@ bool tscHasColumnFilter(SQueryStmtInfo* pQueryInfo) {
return
false
;
}
int32_t
getExprFunctionLevel
(
SQueryStmtInfo
*
pQueryInfo
)
{
int32_t
getExprFunctionLevel
(
const
SQueryStmtInfo
*
pQueryInfo
)
{
int32_t
n
=
10
;
int32_t
level
=
0
;
...
...
source/libs/parser/src/sql.c
浏览文件 @
021c5176
...
...
@@ -31,6 +31,7 @@
#include <assert.h>
#include <stdbool.h>
#include "astGenerator.h"
#include "parserInt.h"
#include "tmsgtype.h"
#include "ttoken.h"
#include "ttokendef.h"
...
...
source/libs/parser/test/insertParserTest.cpp
浏览文件 @
021c5176
...
...
@@ -43,8 +43,8 @@ protected:
void
bind
(
const
char
*
sql
)
{
reset
();
cxt_
.
pAcctId
=
acctId_
.
c_str
(
);
cxt_
.
pDbname
=
db_
.
c_str
();
cxt_
.
ctx
.
acctId
=
atoi
(
acctId_
.
c_str
()
);
cxt_
.
ctx
.
db
=
(
char
*
)
db_
.
c_str
();
strcpy
(
sqlBuf_
,
sql
);
cxt_
.
sqlLen
=
strlen
(
sql
);
sqlBuf_
[
cxt_
.
sqlLen
]
=
'\0'
;
...
...
@@ -69,7 +69,7 @@ protected:
cout
<<
"schemaAttache:"
<<
(
int32_t
)
res_
->
schemaAttache
<<
", payloadType:"
<<
(
int32_t
)
res_
->
payloadType
<<
", insertType:"
<<
res_
->
insertType
<<
", numOfVgs:"
<<
num
<<
endl
;
for
(
size_t
i
=
0
;
i
<
num
;
++
i
)
{
SVgDataBlocks
*
vg
=
(
SVgDataBlocks
*
)
taosArrayGetP
(
res_
->
pDataBlocks
,
i
);
cout
<<
"vgId:"
<<
vg
->
vgId
<<
", numOfTables:"
<<
vg
->
numOfTables
<<
", dataSize:"
<<
vg
->
size
<<
endl
;
cout
<<
"vgId:"
<<
vg
->
vg
.
vg
Id
<<
", numOfTables:"
<<
vg
->
numOfTables
<<
", dataSize:"
<<
vg
->
size
<<
endl
;
SMsgDesc
*
desc
=
(
SMsgDesc
*
)(
vg
->
pData
);
cout
<<
"numOfVnodes:"
<<
ntohl
(
desc
->
numOfVnodes
)
<<
endl
;
SSubmitMsg
*
submit
=
(
SSubmitMsg
*
)(
desc
+
1
);
...
...
source/libs/parser/test/mockCatalog.cpp
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/parser/test/mockCatalogService.cpp
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/parser/test/mockCatalogService.h
浏览文件 @
021c5176
...
...
@@ -57,9 +57,8 @@ public:
void
showTables
()
const
;
std
::
shared_ptr
<
MockTableMeta
>
getTableMeta
(
const
std
::
string
&
db
,
const
std
::
string
&
tbname
)
const
;
// mock interface
int32_t
catalogGetHandle
(
const
char
*
clusterId
,
struct
SCatalog
**
catalogHandle
)
const
;
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
STableMeta
**
pTableMeta
)
const
;
int32_t
catalogGetTableMeta
(
const
char
*
pDBName
,
const
char
*
pTableName
,
STableMeta
**
pTableMeta
)
const
;
int32_t
catalogGetTableHashVgroup
(
const
char
*
pDBName
,
const
char
*
pTableName
,
SVgroupInfo
*
vgInfo
)
const
;
private:
std
::
unique_ptr
<
MockCatalogServiceImpl
>
impl_
;
...
...
source/libs/parser/test/parserTests.cpp
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/parser/test/plannerTest.cpp
浏览文件 @
021c5176
...
...
@@ -93,7 +93,7 @@ void generateLogicplan(const char* sql) {
ASSERT_EQ
(
ret
,
0
);
struct
SQueryPlanNode
*
n
=
nullptr
;
code
=
createQueryPlan
(
pQueryInfo
,
&
n
);
code
=
createQueryPlan
(
(
const
SQueryNode
*
)
pQueryInfo
,
&
n
);
char
*
str
=
NULL
;
queryPlanToString
(
n
,
&
str
);
...
...
@@ -156,7 +156,7 @@ TEST(testCase, planner_test) {
ASSERT_EQ
(
pQueryInfo
->
fieldsInfo
.
numOfOutput
,
2
);
struct
SQueryPlanNode
*
n
=
nullptr
;
code
=
createQueryPlan
(
pQueryInfo
,
&
n
);
code
=
createQueryPlan
(
(
const
SQueryNode
*
)
pQueryInfo
,
&
n
);
char
*
str
=
NULL
;
queryPlanToString
(
n
,
&
str
);
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/planner/src/logicPlan.c
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/planner/src/physicalPlan.c
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/planner/src/planner.c
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/planner/test/phyPlanTests.cpp
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/qcom/CMakeLists.txt
浏览文件 @
021c5176
...
...
@@ -10,3 +10,5 @@ target_link_libraries(
qcom
PRIVATE os util transport
)
ADD_SUBDIRECTORY
(
test
)
source/libs/qcom/src/queryUtil.c
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/qcom/test/CMakeLists.txt
0 → 100644
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/qcom/test/queryTest.cpp
0 → 100644
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/scheduler/src/scheduler.c
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
source/libs/wal/src/walRead.c
浏览文件 @
021c5176
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录