Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d2886dee
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d2886dee
编写于
6月 01, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into hotfix/TD-4449
上级
3c01a281
f572be8f
变更
28
展开全部
隐藏空白更改
内联
并排
Showing
28 changed file
with
733 addition
and
760 deletion
+733
-760
documentation20/cn/03.architecture/02.replica/docs.md
documentation20/cn/03.architecture/02.replica/docs.md
+3
-3
documentation20/cn/03.architecture/docs.md
documentation20/cn/03.architecture/docs.md
+1
-1
documentation20/cn/08.connector/docs.md
documentation20/cn/08.connector/docs.md
+1
-1
documentation20/cn/12.taos-sql/docs.md
documentation20/cn/12.taos-sql/docs.md
+4
-3
src/client/inc/tscGlobalmerge.h
src/client/inc/tscGlobalmerge.h
+14
-12
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+9
-5
src/client/inc/tschemautil.h
src/client/inc/tschemautil.h
+0
-93
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+10
-137
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+2
-3
src/client/src/tscGlobalmerge.c
src/client/src/tscGlobalmerge.c
+68
-76
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+8
-4
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+161
-152
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+13
-13
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+15
-3
src/client/src/tscServer.c
src/client/src/tscServer.c
+7
-15
src/client/src/tscStream.c
src/client/src/tscStream.c
+0
-1
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+9
-10
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+153
-124
src/connector/go
src/connector/go
+1
-1
src/connector/grafanaplugin
src/connector/grafanaplugin
+1
-1
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+6
-11
src/query/inc/qPlan.h
src/query/inc/qPlan.h
+2
-0
src/query/inc/qSqlparser.h
src/query/inc/qSqlparser.h
+1
-2
src/query/inc/qTableMeta.h
src/query/inc/qTableMeta.h
+202
-0
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+23
-1
src/query/src/qPlan.c
src/query/src/qPlan.c
+1
-1
src/query/src/qTableMeta.c
src/query/src/qTableMeta.c
+17
-87
tests/script/general/parser/import_file.sim
tests/script/general/parser/import_file.sim
+1
-0
未找到文件。
documentation20/cn/03.architecture/02.replica/docs.md
浏览文件 @
d2886dee
...
...
@@ -107,9 +107,9 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
![
replica-forward.png
](
page://images/architecture/replica-forward.png
)
1.
应用对写请求做基本的合法性检查,通过,则给
改
请求包打上一个版本号(version, 单调递增)
1.
应用对写请求做基本的合法性检查,通过,则给
该
请求包打上一个版本号(version, 单调递增)
2.
应用将打上版本号的写请求封装一个WAL Head, 写入WAL(Write Ahead Log)
3.
应用调用API syncForwardToPeer,如
多
vnode B是slave状态,sync模块将包含WAL Head的数据包通过Forward消息发送给vnode B,否则就不转发。
3.
应用调用API syncForwardToPeer,如
果
vnode B是slave状态,sync模块将包含WAL Head的数据包通过Forward消息发送给vnode B,否则就不转发。
4.
vnode B收到Forward消息后,调用回调函数writeToCache, 交给应用处理
5.
vnode B应用在写入成功后,都需要调用syncAckForward通知sync模块已经写入成功。
6.
如果quorum大于1,vnode B需要等待应用的回复确认,收到确认后,vnode B发送Forward Response消息给node A。
...
...
@@ -219,7 +219,7 @@ Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系
不同之处:
-
选举流程不一样:Raft里任何一个节点是candidate时,主动向其他节点发出vote request
, 如果超过半数回答Yes, 这个candidate就成为Leader,开始一个新的term. 而TDengine的实现里,节点上线、离线或角色改变都会触发状态消息在节点组类传播,等节点组里状态稳定一致之后才触发选举流程,因为状态稳定一致,基于同样的状态信息,每个节点做出的决定会是一致的,一旦某个节点符合成为master的条件,无需其他节点认可,它会自动将自己设为master。TDengine里,任何一个节点检测到其他节点或自己的角色发生改变,就会给节点组内其他节点进行广播的
。Raft里不存在这样的机制,因此需要投票来解决。
-
选举流程不一样:Raft里任何一个节点是candidate时,主动向其他节点发出vote request
,如果超过半数回答Yes,这个candidate就成为Leader,开始一个新的term。而TDengine的实现里,节点上线、离线或角色改变都会触发状态消息在节点组内传播,等节点组里状态稳定一致之后才触发选举流程,因为状态稳定一致,基于同样的状态信息,每个节点做出的决定会是一致的,一旦某个节点符合成为master的条件,无需其他节点认可,它会自动将自己设为master。TDengine里,任何一个节点检测到其他节点或自己的角色发生改变,就会向节点组内其他节点进行广播
。Raft里不存在这样的机制,因此需要投票来解决。
-
对WAL的一条记录,Raft用term + index来做唯一标识。但TDengine只用version(类似index),在TDengine实现里,仅仅用version是完全可行的, 因为TDengine的选举机制,没有term的概念。
如果整个虚拟节点组全部宕机,重启,但不是所有虚拟节点都上线,这个时候TDengine是不会选出master的,因为未上线的节点有可能有最高version的数据。而RAFT协议,只要超过半数上线,就会选出Leader。
...
...
documentation20/cn/03.architecture/docs.md
浏览文件 @
d2886dee
...
...
@@ -343,7 +343,7 @@ TDengine采用数据驱动的方式让缓存中的数据写入硬盘进行持久
对于采集的数据,一般有保留时长,这个时长由系统配置参数keep决定。超过这个设置天数的数据文件,将被系统自动删除,释放存储空间。
给定days与keep两个参数,一个
vnode总的数据文件数为:keep/days
。总的数据文件个数不宜过大,也不宜过小。10到100以内合适。基于这个原则,可以设置合理的days。 目前的版本,参数keep可以修改,但对于参数days,一但设置后,不可修改。
给定days与keep两个参数,一个
典型工作状态的vnode中总的数据文件数为:
`向上取整(keep/days)+1`
个
。总的数据文件个数不宜过大,也不宜过小。10到100以内合适。基于这个原则,可以设置合理的days。 目前的版本,参数keep可以修改,但对于参数days,一但设置后,不可修改。
在每个数据文件里,一张表的数据是一块一块存储的。一张表可以有一到多个数据文件块。在一个文件块里,数据是列式存储的,占用的是一片连续的存储空间,这样大大提高读取速度。文件块的大小由系统参数maxRows(每块最大记录条数)决定,缺省值为4096。这个值不宜过大,也不宜过小。过大,定位具体时间段的数据的搜索时间会变长,影响读取速度;过小,数据块的索引太大,压缩效率偏低,也影响读取速度。
...
...
documentation20/cn/08.connector/docs.md
浏览文件 @
d2886dee
...
...
@@ -516,7 +516,7 @@ conn.close()
-
_TDengineCursor_ 类
参考python中help(taos.TDengineCursor)。
这个类对应客户端进行的写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能
夸
线程共享使用,否则会导致返回结果出现错误。
这个类对应客户端进行的写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能
跨
线程共享使用,否则会导致返回结果出现错误。
-
_connect_ 方法
...
...
documentation20/cn/12.taos-sql/docs.md
浏览文件 @
d2886dee
...
...
@@ -37,7 +37,7 @@ taos> DESCRIBE meters;
-
Epoch Time:时间戳也可以是一个长整数,表示从 1970-01-01 08:00:00.000 开始的毫秒数
-
时间可以加减,比如 now-2h,表明查询时刻向前推 2 个小时(最近 2 小时)。数字后面的时间单位可以是 u(微秒)、a(毫秒)、s(秒)、m(分)、h(小时)、d(天)、w(周)。 比如
`select * from t1 where ts > now-2w and ts <= now-1w`
,表示查询两周前整整一周的数据。在指定降频操作(down sampling)的时间窗口(interval)时,时间单位还可以使用 n(自然月) 和 y(自然年)。
TDengine 缺省的时间戳是毫秒精度,但通过
修改配置参数 enableMicrosecond
就可以支持微秒。
TDengine 缺省的时间戳是毫秒精度,但通过
在 CREATE DATABASE 时传递的 PRECISION 参数
就可以支持微秒。
在TDengine中,普通表的数据模型中可使用以下 10 种数据类型。
...
...
@@ -400,6 +400,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
tb2_name (tb2_field1_name, ...) [USING stb2_name TAGS (tag_value2, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...;
```
以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。
说明:
`(tb1_field1_name, ...)`
的部分可以省略掉,这样就是使用全列模式写入——也即在 VALUES 部分提供的数据,必须为数据表的每个列都显式地提供数据。全列写入速度会远快于指定列,因此建议尽可能采用全列写入方式,此时空列可以填入NULL。
从 2.0.20.5 版本开始,子表的列名可以不跟在子表名称后面,而是可以放在 TAGS 和 VALUES 之间,例如像下面这样写:
```
mysql
INSERT INTO tb1_name [USING stb1_name TAGS (tag_value1, ...)] (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...) ...;
...
...
@@ -423,9 +424,9 @@ Query OK, 1 row(s) in set (0.001029s)
taos> SHOW TABLES;
Query OK, 0 row(s) in set (0.000946s)
taos> INSERT INTO d1001 USING meters TAGS('Beijing.Chaoyang', 2);
taos> INSERT INTO d1001 USING meters TAGS('Beijing.Chaoyang', 2)
VALUES('a')
;
DB error: invalid SQL:
keyword VALUES or FILE required
DB error: invalid SQL:
'a' (invalid timestamp) (0.039494s)
taos> SHOW TABLES;
table_name | created_time | columns | stable_name |
...
...
src/client/inc/tsc
LocalM
erge.h
→
src/client/inc/tsc
Globalm
erge.h
浏览文件 @
d2886dee
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSC
LOC
ALMERGE_H
#define TDENGINE_TSC
LOC
ALMERGE_H
#ifndef TDENGINE_TSC
GLOB
ALMERGE_H
#define TDENGINE_TSC
GLOB
ALMERGE_H
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -24,7 +24,7 @@ extern "C" {
#include "qFill.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "
tsclient
.h"
#include "
qExecutor
.h"
#define MAX_NUM_OF_SUBQUERY_RETRY 3
...
...
@@ -38,7 +38,7 @@ typedef struct SLocalDataSource {
tFilePage
filePage
;
}
SLocalDataSource
;
typedef
struct
S
Loc
alMerger
{
typedef
struct
S
Glob
alMerger
{
SLocalDataSource
**
pLocalDataSrc
;
int32_t
numOfBuffer
;
int32_t
numOfCompleted
;
...
...
@@ -48,20 +48,22 @@ typedef struct SLocalMerger {
tOrderDescriptor
*
pDesc
;
tExtMemBuffer
**
pExtMemBuffer
;
// disk-based buffer
char
*
buf
;
// temp buffer
}
SLocalMerger
;
}
SGlobalMerger
;
struct
SSqlObj
;
typedef
struct
SRetrieveSupport
{
tExtMemBuffer
**
pExtMemBuffer
;
// for build loser tree
tOrderDescriptor
*
pOrderDescriptor
;
int32_t
subqueryIndex
;
// index of current vnode in vnode list
SSqlObj
*
pParentSql
;
struct
SSqlObj
*
pParentSql
;
tFilePage
*
localBuffer
;
// temp buffer, there is a buffer for each vnode to
uint32_t
numOfRetry
;
// record the number of retry times
}
SRetrieveSupport
;
int32_t
tsc
LocalReducerEnvCreate
(
SQueryInfo
*
pQueryInfo
,
tExtMemBuffer
***
pMemBuffer
,
int32_t
numOfSub
,
tOrderDescriptor
**
pDesc
,
uint32_t
nBufferSize
,
int64_t
id
);
int32_t
tsc
CreateGlobalMergerEnv
(
SQueryInfo
*
pQueryInfo
,
tExtMemBuffer
***
pMemBuffer
,
int32_t
numOfSub
,
tOrderDescriptor
**
pDesc
,
uint32_t
nBufferSize
,
int64_t
id
);
void
tsc
LocalReducerEnvDestroy
(
tExtMemBuffer
**
pMemBuffer
,
tOrderDescriptor
*
pDesc
,
int32_t
numOfVnodes
);
void
tsc
DestroyGlobalMergerEnv
(
tExtMemBuffer
**
pMemBuffer
,
tOrderDescriptor
*
pDesc
,
int32_t
numOfVnodes
);
int32_t
saveToBuffer
(
tExtMemBuffer
*
pMemoryBuf
,
tOrderDescriptor
*
pDesc
,
tFilePage
*
pPage
,
void
*
data
,
int32_t
numOfRows
,
int32_t
orderType
);
...
...
@@ -71,13 +73,13 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
/*
* create local reducer to launch the second-stage reduce process at client site
*/
int32_t
tscCreate
Loc
alMerger
(
tExtMemBuffer
**
pMemBuffer
,
int32_t
numOfBuffer
,
tOrderDescriptor
*
pDesc
,
SQueryInfo
*
pQueryInfo
,
S
Loc
alMerger
**
pMerger
,
int64_t
id
);
int32_t
tscCreate
Glob
alMerger
(
tExtMemBuffer
**
pMemBuffer
,
int32_t
numOfBuffer
,
tOrderDescriptor
*
pDesc
,
SQueryInfo
*
pQueryInfo
,
S
Glob
alMerger
**
pMerger
,
int64_t
id
);
void
tscDestroy
LocalMerger
(
SLocalMerger
*
pLocal
Merger
);
void
tscDestroy
GlobalMerger
(
SGlobalMerger
*
p
Merger
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSC
LOC
ALMERGE_H
#endif // TDENGINE_TSC
GLOB
ALMERGE_H
src/client/inc/tscUtil.h
浏览文件 @
d2886dee
...
...
@@ -20,13 +20,13 @@
extern
"C"
{
#endif
#include "tsched.h"
#include "exception.h"
#include "os.h"
#include "qExtbuffer.h"
#include "taosdef.h"
#include "tbuffer.h"
#include "tscLocalMerge.h"
#include "tscGlobalmerge.h"
#include "tsched.h"
#include "tsclient.h"
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
...
...
@@ -63,7 +63,7 @@ typedef struct SJoinSupporter {
SArray
*
exprList
;
SFieldInfo
fieldsInfo
;
STagCond
tagCond
;
SGroupbyExpr
groupInfo
;
// group by info
SGroupbyExpr
groupInfo
;
// group by info
struct
STSBuf
*
pTSBuf
;
// the TSBuf struct that holds the compressed timestamp array
FILE
*
f
;
// temporary file in order to create TSBuf
char
path
[
PATH_MAX
];
// temporary file path, todo dynamic allocate memory
...
...
@@ -110,7 +110,7 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void
*
tscDestroyBlockHashTable
(
SHashObj
*
pBlockHashTable
,
bool
removeMeta
);
int32_t
tscCopyDataBlockToPayload
(
SSqlObj
*
pSql
,
STableDataBlocks
*
pDataBlock
);
int32_t
tscMergeTableDataBlocks
(
S
SqlObj
*
pSql
,
bool
freeBlockMap
);
int32_t
tscMergeTableDataBlocks
(
S
InsertStatementParam
*
pInsertParam
,
bool
freeBlockMap
);
int32_t
tscGetDataBlockFromList
(
SHashObj
*
pHashList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
SName
*
pName
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
,
SArray
*
pBlockList
);
...
...
@@ -284,6 +284,7 @@ void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
SSqlObj
*
createSimpleSubObj
(
SSqlObj
*
pSql
,
__async_cb_func_t
fp
,
void
*
param
,
int32_t
cmd
);
void
registerSqlObj
(
SSqlObj
*
pSql
);
void
tscInitResForMerge
(
SSqlRes
*
pRes
);
SSqlObj
*
createSubqueryObj
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
__async_cb_func_t
fp
,
void
*
param
,
int32_t
cmd
,
SSqlObj
*
pPrevSql
);
void
addGroupInfoForSubquery
(
SSqlObj
*
pParentObj
,
SSqlObj
*
pSql
,
int32_t
subClauseIndex
,
int32_t
tableIndex
);
...
...
@@ -328,12 +329,15 @@ SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
int32_t
tscCreateQueryFromQueryInfo
(
SQueryInfo
*
pQueryInfo
,
SQueryAttr
*
pQueryAttr
,
void
*
addr
);
void
tsCreateSQLFunctionCtx
(
SQueryInfo
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
,
SSchema
*
pSchema
);
void
*
createQInfoFromQueryNode
(
SQueryInfo
*
pQueryInfo
,
S
ExprInfo
*
pExprs
,
S
TableGroupInfo
*
pTableGroupInfo
,
SOperatorInfo
*
pOperator
,
char
*
sql
,
void
*
addr
,
int32_t
stage
);
void
*
createQInfoFromQueryNode
(
SQueryInfo
*
pQueryInfo
,
STableGroupInfo
*
pTableGroupInfo
,
SOperatorInfo
*
pOperator
,
char
*
sql
,
void
*
addr
,
int32_t
stage
);
void
*
malloc_throw
(
size_t
size
);
void
*
calloc_throw
(
size_t
nmemb
,
size_t
size
);
char
*
strdup_throw
(
const
char
*
str
);
bool
vgroupInfoIdentical
(
SNewVgroupInfo
*
pExisted
,
SVgroupMsg
*
src
);
SNewVgroupInfo
createNewVgroupInfo
(
SVgroupMsg
*
pVgroupMsg
);
#ifdef __cplusplus
}
#endif
...
...
src/client/inc/tschemautil.h
已删除
100644 → 0
浏览文件 @
3c01a281
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCHEMAUTIL_H
#define TDENGINE_TSCHEMAUTIL_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "taosmsg.h"
#include "tsclient.h"
#include "ttoken.h"
/**
* get the number of tags of this table
* @param pTableMeta
* @return
*/
int32_t
tscGetNumOfTags
(
const
STableMeta
*
pTableMeta
);
/**
* get the number of columns of this table
* @param pTableMeta
* @return
*/
int32_t
tscGetNumOfColumns
(
const
STableMeta
*
pTableMeta
);
/**
* get the basic info of this table
* @param pTableMeta
* @return
*/
STableComInfo
tscGetTableInfo
(
const
STableMeta
*
pTableMeta
);
/**
* get the schema
* @param pTableMeta
* @return
*/
SSchema
*
tscGetTableSchema
(
const
STableMeta
*
pTableMeta
);
/**
* get the tag schema
* @param pMeta
* @return
*/
SSchema
*
tscGetTableTagSchema
(
const
STableMeta
*
pMeta
);
/**
* get the column schema according to the column index
* @param pMeta
* @param colIndex
* @return
*/
SSchema
*
tscGetTableColumnSchema
(
const
STableMeta
*
pMeta
,
int32_t
colIndex
);
/**
* get the column schema according to the column id
* @param pTableMeta
* @param colId
* @return
*/
SSchema
*
tscGetColumnSchemaById
(
STableMeta
*
pTableMeta
,
int16_t
colId
);
/**
* create the table meta from the msg
* @param pTableMetaMsg
* @param size size of the table meta
* @return
*/
STableMeta
*
tscCreateTableMetaFromMsg
(
STableMetaMsg
*
pTableMetaMsg
);
bool
vgroupInfoIdentical
(
SNewVgroupInfo
*
pExisted
,
SVgroupMsg
*
src
);
SNewVgroupInfo
createNewVgroupInfo
(
SVgroupMsg
*
pVgroupMsg
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCHEMAUTIL_H
src/client/inc/tsclient.h
浏览文件 @
d2886dee
...
...
@@ -40,17 +40,9 @@ extern "C" {
// forward declaration
struct
SSqlInfo
;
struct
SLocalMerger
;
typedef
void
(
*
__async_cb_func_t
)(
void
*
param
,
TAOS_RES
*
tres
,
int32_t
numOfRows
);
typedef
struct
STableComInfo
{
uint8_t
numOfTags
;
uint8_t
precision
;
int16_t
numOfColumns
;
int32_t
rowSize
;
}
STableComInfo
;
typedef
struct
SNewVgroupInfo
{
int32_t
vgId
;
int8_t
inUse
;
...
...
@@ -66,34 +58,6 @@ typedef struct CChildTableMeta {
uint64_t
suid
;
// super table id
}
CChildTableMeta
;
typedef
struct
STableMeta
{
int32_t
vgId
;
STableId
id
;
uint8_t
tableType
;
char
sTableName
[
TSDB_TABLE_FNAME_LEN
];
// super table name
uint64_t
suid
;
// super table id
int16_t
sversion
;
int16_t
tversion
;
STableComInfo
tableInfo
;
SSchema
schema
[];
// if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
}
STableMeta
;
typedef
struct
STableMetaInfo
{
STableMeta
*
pTableMeta
;
// table meta, cached in client side and acquired by name
uint32_t
tableMetaSize
;
SVgroupsInfo
*
vgroupList
;
SArray
*
pVgroupTables
;
// SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t
vgroupIndex
;
SName
name
;
char
aliasName
[
TSDB_TABLE_NAME_LEN
];
// alias name of table specified in query sql
SArray
*
tagColList
;
// SArray<SColumn*>, involved tag columns
}
STableMetaInfo
;
typedef
struct
SColumnIndex
{
int16_t
tableIndex
;
int16_t
columnIndex
;
...
...
@@ -111,44 +75,6 @@ typedef struct SInternalField {
SExprInfo
*
pExpr
;
}
SInternalField
;
typedef
struct
SFieldInfo
{
int16_t
numOfOutput
;
// number of column in result
TAOS_FIELD
*
final
;
SArray
*
internalField
;
// SArray<SInternalField>
}
SFieldInfo
;
typedef
struct
SCond
{
uint64_t
uid
;
int32_t
len
;
// length of tag query condition data
char
*
cond
;
}
SCond
;
typedef
struct
SJoinNode
{
uint64_t
uid
;
int16_t
tagColId
;
SArray
*
tsJoin
;
SArray
*
tagJoin
;
}
SJoinNode
;
typedef
struct
SJoinInfo
{
bool
hasJoin
;
SJoinNode
*
joinTables
[
TSDB_MAX_JOIN_TABLE_NUM
];
}
SJoinInfo
;
typedef
struct
STagCond
{
// relation between tbname list and query condition, including : TK_AND or TK_OR
int16_t
relType
;
// tbname query condition, only support tbname query condition on one table
SCond
tbnameCond
;
// join condition, only support two tables join currently
SJoinInfo
joinInfo
;
// for different table, the query condition must be seperated
SArray
*
pCond
;
}
STagCond
;
typedef
struct
SParamInfo
{
int32_t
idx
;
uint8_t
type
;
...
...
@@ -191,57 +117,6 @@ typedef struct STableDataBlocks {
SParamInfo
*
params
;
}
STableDataBlocks
;
typedef
struct
SQueryInfo
{
int16_t
command
;
// the command may be different for each subclause, so keep it seperately.
uint32_t
type
;
// query/insert type
STimeWindow
window
;
// the whole query time window
SInterval
interval
;
// tumble time window
SSessionWindow
sessionWindow
;
// session time window
SGroupbyExpr
groupbyExpr
;
// groupby tags info
SArray
*
colList
;
// SArray<SColumn*>
SFieldInfo
fieldsInfo
;
SArray
*
exprList
;
// SArray<SExprInfo*>
SArray
*
exprList1
;
// final exprlist in case of arithmetic expression exists
SLimitVal
limit
;
SLimitVal
slimit
;
STagCond
tagCond
;
SOrderVal
order
;
int16_t
fillType
;
// final result fill type
int16_t
numOfTables
;
STableMetaInfo
**
pTableMetaInfo
;
struct
STSBuf
*
tsBuf
;
int64_t
*
fillVal
;
// default value for fill
char
*
msg
;
// pointer to the pCmd->payload to keep error message temporarily
int64_t
clauseLimit
;
// limit for current sub clause
int64_t
prjOffset
;
// offset value in the original sql expression, only applied at client side
int64_t
vgroupLimit
;
// table limit in case of super table projection query + global order + limit
int32_t
udColumnId
;
// current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t
resColumnId
;
// result column id
bool
distinctTag
;
// distinct tag or not
int32_t
round
;
// 0/1/....
int32_t
bufLen
;
char
*
buf
;
SQInfo
*
pQInfo
;
// global merge operator
SQueryAttr
*
pQueryAttr
;
// query object
struct
SQueryInfo
*
sibling
;
// sibling
SArray
*
pUpstream
;
// SArray<struct SQueryInfo>
struct
SQueryInfo
*
pDownstream
;
int32_t
havingFieldNum
;
bool
stableQuery
;
bool
groupbyColumn
;
bool
simpleAgg
;
bool
arithmeticOnAgg
;
bool
projectionQuery
;
bool
hasFilter
;
bool
onlyTagQuery
;
}
SQueryInfo
;
typedef
struct
{
STableMeta
*
pTableMeta
;
SVgroupsInfo
*
pVgroupInfo
;
...
...
@@ -255,9 +130,13 @@ typedef struct SInsertStatementParam {
int8_t
schemaAttached
;
// denote if submit block is built with table schema or not
STagData
tagData
;
// NOTE: pTagData->data is used as a variant length array
int32_t
batchSize
;
// for parameter ('?') binding and batch processing
int32_t
numOfParams
;
char
msg
[
512
];
// error message
char
*
sql
;
// current sql statement position
uint32_t
insertType
;
// insert data from [file|sql statement| bound statement]
uint64_t
objectId
;
// sql object id
char
*
sql
;
// current sql statement position
}
SInsertStatementParam
;
// TODO extract sql parser supporter
...
...
@@ -266,13 +145,9 @@ typedef struct {
uint8_t
msgType
;
SInsertStatementParam
insertParam
;
char
reserve1
[
3
];
// fix bus error on arm32
union
{
int32_t
count
;
};
int32_t
count
;
// todo remove it
char
*
curSql
;
// current sql, resume position of sql after parsing paused
char
reserve2
[
3
];
// fix bus error on arm32
int16_t
numOfCols
;
char
reserve3
[
2
];
// fix bus error on arm32
uint32_t
allocSize
;
...
...
@@ -283,8 +158,6 @@ typedef struct {
SQueryInfo
*
pQueryInfo
;
SQueryInfo
*
active
;
// current active query info
int32_t
batchSize
;
// for parameter ('?') binding and batch processing
int32_t
numOfParams
;
STagData
tagData
;
// NOTE: pTagData->data is used as a variant length array
int32_t
resColumnId
;
}
SSqlCmd
;
...
...
@@ -320,7 +193,7 @@ typedef struct {
TAOS_FIELD
*
final
;
SArithmeticSupport
*
pArithSup
;
// support the arithmetic expression calculation on agg functions
struct
S
LocalMerger
*
pLocal
Merger
;
struct
S
GlobalMerger
*
p
Merger
;
}
SSqlRes
;
typedef
struct
{
...
...
@@ -447,7 +320,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void
tscSetResRawPtrRv
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
,
SSDataBlock
*
pBlock
);
void
handleDownstreamOperator
(
SSqlObj
**
pSqlList
,
int32_t
numOfUpstream
,
SQueryInfo
*
px
,
SSqlRes
*
pOutput
);
void
destroyTableNameList
(
S
SqlCmd
*
pCmd
);
void
destroyTableNameList
(
S
InsertStatementParam
*
pInsertParam
);
void
tscResetSqlCmd
(
SSqlCmd
*
pCmd
,
bool
removeMeta
);
...
...
@@ -479,7 +352,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
void
doAsyncQuery
(
STscObj
*
pObj
,
SSqlObj
*
pSql
,
__async_cb_func_t
fp
,
void
*
param
,
const
char
*
sqlstr
,
size_t
sqlLen
);
void
tscImportDataFromFile
(
SSqlObj
*
pSql
);
void
tscInitResObjForLocalQuery
(
SSqlObj
*
pObj
,
int32_t
numOfRes
,
int32_t
rowLen
);
struct
SGlobalMerger
*
tscInitResObjForLocalQuery
(
int32_t
numOfRes
,
int32_t
rowLen
,
uint64_t
id
);
bool
tscIsUpdateQuery
(
SSqlObj
*
pSql
);
char
*
tscGetSqlStr
(
SSqlObj
*
pSql
);
bool
tscIsQueryWithLimit
(
SSqlObj
*
pSql
);
...
...
@@ -489,7 +362,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
char
*
tscGetErrorMsgPayload
(
SSqlCmd
*
pCmd
);
int32_t
tscInvalid
SQLErr
Msg
(
char
*
msg
,
const
char
*
additionalInfo
,
const
char
*
sql
);
int32_t
tscInvalid
Operation
Msg
(
char
*
msg
,
const
char
*
additionalInfo
,
const
char
*
sql
);
int32_t
tscSQLSyntaxErrMsg
(
char
*
msg
,
const
char
*
additionalInfo
,
const
char
*
sql
);
int32_t
tscValidateSqlInfo
(
SSqlObj
*
pSql
,
struct
SSqlInfo
*
pInfo
);
...
...
src/client/src/tscAsync.c
浏览文件 @
d2886dee
...
...
@@ -22,7 +22,7 @@
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsched.h"
#include "
tschemautil
.h"
#include "
qTableMeta
.h"
#include "tsclient.h"
static
void
tscAsyncQueryRowsForNextVnode
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
...
...
@@ -58,7 +58,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
strntolower
(
pSql
->
sqlstr
,
sqlstr
,
(
int32_t
)
sqlLen
);
tscDebugL
(
"0x%"
PRIx64
" SQL: %s"
,
pSql
->
self
,
pSql
->
sqlstr
);
pCmd
->
curSql
=
pSql
->
sqlstr
;
pCmd
->
resColumnId
=
TSDB_RES_COL_ID
;
int32_t
code
=
tsParseSql
(
pSql
,
true
);
...
...
@@ -221,7 +220,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
tscResetForNextRetrieve
(
pRes
);
// handle
the sub queries of join query
// handle
outer query based on the already retrieved nest query results.
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
);
if
(
pQueryInfo
->
pUpstream
!=
NULL
&&
taosArrayGetSize
(
pQueryInfo
->
pUpstream
)
>
0
)
{
SSchedMsg
schedMsg
=
{
0
};
...
...
src/client/src/tsc
LocalM
erge.c
→
src/client/src/tsc
Globalm
erge.c
浏览文件 @
d2886dee
...
...
@@ -13,15 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscLocalMerge.h"
#include "tscSubquery.h"
#include "os.h"
#include "texpr.h"
#include "tlosertree.h"
#include "tscGlobalmerge.h"
#include "tscSubquery.h"
#include "tscLog.h"
#include "tsclient.h"
#include "qUtil.h"
#define COLMODEL_GET_VAL(data, schema, rowId, colId) \
(data + (schema)->pFields[colId].offset * ((schema)->capacity) + (rowId) * (schema)->pFields[colId].field.bytes)
typedef
struct
SCompareParam
{
SLocalDataSource
**
pLocalData
;
tOrderDescriptor
*
pDesc
;
...
...
@@ -29,9 +33,18 @@ typedef struct SCompareParam {
int32_t
groupOrderType
;
}
SCompareParam
;
bool
needToMergeRv
(
SSDataBlock
*
pBlock
,
SArray
*
columnIndex
,
int32_t
index
,
char
**
buf
);
static
bool
needToMerge
(
SSDataBlock
*
pBlock
,
SArray
*
columnIndexList
,
int32_t
index
,
char
**
buf
)
{
int32_t
ret
=
0
;
size_t
size
=
taosArrayGetSize
(
columnIndexList
);
if
(
size
>
0
)
{
ret
=
compare_aRv
(
pBlock
,
columnIndexList
,
(
int32_t
)
size
,
index
,
buf
,
TSDB_ORDER_ASC
);
}
// if ret == 0, means the result belongs to the same group
return
(
ret
==
0
);
}
int32_t
treeComparator
(
const
void
*
pLeft
,
const
void
*
pRight
,
void
*
param
)
{
static
int32_t
treeComparator
(
const
void
*
pLeft
,
const
void
*
pRight
,
void
*
param
)
{
int32_t
pLeftIdx
=
*
(
int32_t
*
)
pLeft
;
int32_t
pRightIdx
=
*
(
int32_t
*
)
pRight
;
...
...
@@ -57,16 +70,16 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
}
}
int32_t
tscCreate
Loc
alMerger
(
tExtMemBuffer
**
pMemBuffer
,
int32_t
numOfBuffer
,
tOrderDescriptor
*
pDesc
,
SQueryInfo
*
pQueryInfo
,
S
Loc
alMerger
**
pMerger
,
int64_t
id
)
{
int32_t
tscCreate
Glob
alMerger
(
tExtMemBuffer
**
pMemBuffer
,
int32_t
numOfBuffer
,
tOrderDescriptor
*
pDesc
,
SQueryInfo
*
pQueryInfo
,
S
Glob
alMerger
**
pMerger
,
int64_t
id
)
{
if
(
pMemBuffer
==
NULL
)
{
tsc
LocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
tsc
DestroyGlobalMergerEnv
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
tscError
(
"0x%"
PRIx64
" %p pMemBuffer is NULL"
,
id
,
pMemBuffer
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
if
(
pDesc
->
pColumnModel
==
NULL
)
{
tsc
LocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
tsc
DestroyGlobalMergerEnv
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
tscError
(
"0x%"
PRIx64
" no local buffer or intermediate result format model"
,
id
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
...
...
@@ -83,7 +96,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
}
if
(
numOfFlush
==
0
||
numOfBuffer
==
0
)
{
tsc
LocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
tsc
DestroyGlobalMergerEnv
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
tscDebug
(
"0x%"
PRIx64
" no data to retrieve"
,
id
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -92,15 +105,15 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
tscError
(
"0x%"
PRIx64
" Invalid value of buffer capacity %d and page size %d "
,
id
,
pDesc
->
pColumnModel
->
capacity
,
pMemBuffer
[
0
]
->
pageSize
);
tsc
LocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
tsc
DestroyGlobalMergerEnv
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
*
pMerger
=
(
S
LocalMerger
*
)
calloc
(
1
,
sizeof
(
SLoc
alMerger
));
*
pMerger
=
(
S
GlobalMerger
*
)
calloc
(
1
,
sizeof
(
SGlob
alMerger
));
if
((
*
pMerger
)
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" failed to create local merge structure, out of memory"
,
id
);
tsc
LocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
tsc
DestroyGlobalMergerEnv
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
...
...
@@ -160,7 +173,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
// no data actually, no need to merge result.
if
(
idx
==
0
)
{
tscDebug
(
"0x%"
PRIx64
" retrieved no data"
,
id
);
tsc
LocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
tsc
DestroyGlobalMergerEnv
(
pMemBuffer
,
pDesc
,
numOfBuffer
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -198,7 +211,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
}
// restore the limitation value at the last stage
if
(
tscOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
)
{
if
(
pQueryInfo
->
orderProjectQuery
)
{
pQueryInfo
->
limit
.
limit
=
pQueryInfo
->
clauseLimit
;
pQueryInfo
->
limit
.
offset
=
pQueryInfo
->
prjOffset
;
}
...
...
@@ -297,28 +310,28 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
return
0
;
}
void
tscDestroy
LocalMerger
(
SLocalMerger
*
pLocal
Merger
)
{
if
(
p
Local
Merger
==
NULL
)
{
void
tscDestroy
GlobalMerger
(
SGlobalMerger
*
p
Merger
)
{
if
(
pMerger
==
NULL
)
{
return
;
}
for
(
int32_t
i
=
0
;
i
<
p
Local
Merger
->
numOfBuffer
;
++
i
)
{
tfree
(
p
Local
Merger
->
pLocalDataSrc
[
i
]);
for
(
int32_t
i
=
0
;
i
<
pMerger
->
numOfBuffer
;
++
i
)
{
tfree
(
pMerger
->
pLocalDataSrc
[
i
]);
}
p
Local
Merger
->
numOfBuffer
=
0
;
tsc
LocalReducerEnvDestroy
(
pLocalMerger
->
pExtMemBuffer
,
pLocalMerger
->
pDesc
,
pLocal
Merger
->
numOfVnode
);
pMerger
->
numOfBuffer
=
0
;
tsc
DestroyGlobalMergerEnv
(
pMerger
->
pExtMemBuffer
,
pMerger
->
pDesc
,
p
Merger
->
numOfVnode
);
p
Local
Merger
->
numOfCompleted
=
0
;
pMerger
->
numOfCompleted
=
0
;
if
(
p
Local
Merger
->
pLoserTree
)
{
tfree
(
p
Local
Merger
->
pLoserTree
->
param
);
tfree
(
p
Local
Merger
->
pLoserTree
);
if
(
pMerger
->
pLoserTree
)
{
tfree
(
pMerger
->
pLoserTree
->
param
);
tfree
(
pMerger
->
pLoserTree
);
}
tfree
(
p
Local
Merger
->
buf
);
tfree
(
p
Local
Merger
->
pLocalDataSrc
);
free
(
p
Local
Merger
);
tfree
(
pMerger
->
buf
);
tfree
(
pMerger
->
pLocalDataSrc
);
free
(
pMerger
);
}
static
int32_t
createOrderDescriptor
(
tOrderDescriptor
**
pOrderDesc
,
SQueryInfo
*
pQueryInfo
,
SColumnModel
*
pModel
)
{
...
...
@@ -329,7 +342,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
}
// primary timestamp column is involved in final result
if
(
pQueryInfo
->
interval
.
interval
!=
0
||
tscOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
)
{
if
(
pQueryInfo
->
interval
.
interval
!=
0
||
pQueryInfo
->
orderProjectQuery
)
{
numOfGroupByCols
++
;
}
...
...
@@ -392,7 +405,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
}
}
int32_t
tsc
LocalReducerEnvCreate
(
SQueryInfo
*
pQueryInfo
,
tExtMemBuffer
***
pMemBuffer
,
int32_t
numOfSub
,
int32_t
tsc
CreateGlobalMergerEnv
(
SQueryInfo
*
pQueryInfo
,
tExtMemBuffer
***
pMemBuffer
,
int32_t
numOfSub
,
tOrderDescriptor
**
pOrderDesc
,
uint32_t
nBufferSizes
,
int64_t
id
)
{
SSchema
*
pSchema
=
NULL
;
SColumnModel
*
pModel
=
NULL
;
...
...
@@ -456,7 +469,7 @@ int32_t tscLocalReducerEnvCreate(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
* @param pDesc
* @param numOfVnodes
*/
void
tsc
LocalReducerEnvDestroy
(
tExtMemBuffer
**
pMemBuffer
,
tOrderDescriptor
*
pDesc
,
int32_t
numOfVnodes
)
{
void
tsc
DestroyGlobalMergerEnv
(
tExtMemBuffer
**
pMemBuffer
,
tOrderDescriptor
*
pDesc
,
int32_t
numOfVnodes
)
{
tOrderDescDestroy
(
pDesc
);
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
pMemBuffer
[
i
]
=
destoryExtMemBuffer
(
pMemBuffer
[
i
]);
...
...
@@ -467,12 +480,12 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe
/**
*
* @param p
LocalMerge
* @param p
Merger
* @param pOneInterDataSrc
* @param treeList
* @return the number of remain input source. if ret == 0, all data has been handled
*/
int32_t
loadNewDataFromDiskFor
(
S
LocalMerger
*
pLocalMerge
,
SLocalDataSource
*
pOneInterDataSrc
,
int32_t
loadNewDataFromDiskFor
(
S
GlobalMerger
*
pMerger
,
SLocalDataSource
*
pOneInterDataSrc
,
bool
*
needAdjustLoserTree
)
{
pOneInterDataSrc
->
rowIdx
=
0
;
pOneInterDataSrc
->
pageId
+=
1
;
...
...
@@ -489,17 +502,17 @@ int32_t loadNewDataFromDiskFor(SLocalMerger *pLocalMerge, SLocalDataSource *pOne
#endif
*
needAdjustLoserTree
=
true
;
}
else
{
p
LocalMerge
->
numOfCompleted
+=
1
;
p
Merger
->
numOfCompleted
+=
1
;
pOneInterDataSrc
->
rowIdx
=
-
1
;
pOneInterDataSrc
->
pageId
=
-
1
;
*
needAdjustLoserTree
=
true
;
}
return
p
LocalMerge
->
numOfBuffer
;
return
p
Merger
->
numOfBuffer
;
}
void
adjustLoserTreeFromNewData
(
S
LocalMerger
*
pLocalMerge
,
SLocalDataSource
*
pOneInterDataSrc
,
void
adjustLoserTreeFromNewData
(
S
GlobalMerger
*
pMerger
,
SLocalDataSource
*
pOneInterDataSrc
,
SLoserTreeInfo
*
pTree
)
{
/*
* load a new data page into memory for intermediate dataset source,
...
...
@@ -507,7 +520,7 @@ void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOn
*/
bool
needToAdjust
=
true
;
if
(
pOneInterDataSrc
->
filePage
.
num
<=
pOneInterDataSrc
->
rowIdx
)
{
loadNewDataFromDiskFor
(
p
LocalMerge
,
pOneInterDataSrc
,
&
needToAdjust
);
loadNewDataFromDiskFor
(
p
Merger
,
pOneInterDataSrc
,
&
needToAdjust
);
}
/*
...
...
@@ -515,7 +528,7 @@ void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOn
* if the loser tree is rebuild completed, we do not need to adjust
*/
if
(
needToAdjust
)
{
int32_t
leafNodeIdx
=
pTree
->
pNode
[
0
].
index
+
p
LocalMerge
->
numOfBuffer
;
int32_t
leafNodeIdx
=
pTree
->
pNode
[
0
].
index
+
p
Merger
->
numOfBuffer
;
#ifdef _DEBUG_VIEW
printf
(
"before adjust:
\t
"
);
...
...
@@ -567,7 +580,7 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
}
}
static
void
doExecuteFinalMerge
Rv
(
SOperatorInfo
*
pOperator
,
int32_t
numOfExpr
,
SSDataBlock
*
pBlock
)
{
static
void
doExecuteFinalMerge
(
SOperatorInfo
*
pOperator
,
int32_t
numOfExpr
,
SSDataBlock
*
pBlock
)
{
SMultiwayMergeInfo
*
pInfo
=
pOperator
->
info
;
SQLFunctionCtx
*
pCtx
=
pInfo
->
binfo
.
pCtx
;
...
...
@@ -579,7 +592,7 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
if
(
pInfo
->
hasPrev
)
{
if
(
needToMerge
Rv
(
pBlock
,
pInfo
->
orderColumnList
,
i
,
pInfo
->
prevRow
))
{
if
(
needToMerge
(
pBlock
,
pInfo
->
orderColumnList
,
i
,
pInfo
->
prevRow
))
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
pCtx
[
j
].
pInput
=
add
[
j
]
+
pCtx
[
j
].
inputBytes
*
i
;
}
...
...
@@ -654,45 +667,27 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
tfree
(
add
);
}
bool
needToMergeRv
(
SSDataBlock
*
pBlock
,
SArray
*
columnIndexList
,
int32_t
index
,
char
**
buf
)
{
int32_t
ret
=
0
;
size_t
size
=
taosArrayGetSize
(
columnIndexList
);
if
(
size
>
0
)
{
ret
=
compare_aRv
(
pBlock
,
columnIndexList
,
(
int32_t
)
size
,
index
,
buf
,
TSDB_ORDER_ASC
);
}
// if ret == 0, means the result belongs to the same group
return
(
ret
==
0
);
static
bool
isAllSourcesCompleted
(
SGlobalMerger
*
pMerger
)
{
return
(
pMerger
->
numOfBuffer
==
pMerger
->
numOfCompleted
);
}
static
bool
isAllSourcesCompleted
(
SLocalMerger
*
pLocalMerge
)
{
return
(
pLocalMerge
->
numOfBuffer
==
pLocalMerge
->
numOfCompleted
);
}
void
tscInitResObjForLocalQuery
(
SSqlObj
*
pSql
,
int32_t
numOfRes
,
int32_t
rowLen
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pRes
->
pLocalMerger
!=
NULL
)
{
tscDestroyLocalMerger
(
pRes
->
pLocalMerger
);
pRes
->
pLocalMerger
=
NULL
;
tscDebug
(
"0x%"
PRIx64
" free local reducer finished"
,
pSql
->
self
);
SGlobalMerger
*
tscInitResObjForLocalQuery
(
int32_t
numOfRes
,
int32_t
rowLen
,
uint64_t
id
)
{
SGlobalMerger
*
pMerger
=
calloc
(
1
,
sizeof
(
SGlobalMerger
));
if
(
pMerger
==
NULL
)
{
tscDebug
(
"0x%"
PRIx64
" free local reducer finished"
,
id
);
return
NULL
;
}
pRes
->
qId
=
1
;
// hack to pass the safety check in fetch_row function
pRes
->
numOfRows
=
0
;
pRes
->
row
=
0
;
pRes
->
rspType
=
0
;
// used as a flag to denote if taos_retrieved() has been called yet
pRes
->
pLocalMerger
=
(
SLocalMerger
*
)
calloc
(
1
,
sizeof
(
SLocalMerger
));
/*
* One more byte space is required, since the sprintf function needs one additional space to put '\0' at
* the end of string
*/
size_t
size
=
numOfRes
*
rowLen
+
1
;
p
Res
->
pLocal
Merger
->
buf
=
calloc
(
1
,
size
);
pRes
->
data
=
pRes
->
pLocalMerger
->
buf
;
pMerger
->
buf
=
calloc
(
1
,
size
);
return
pMerger
;
}
// todo remove it
int32_t
doArithmeticCalculate
(
SQueryInfo
*
pQueryInfo
,
tFilePage
*
pOutput
,
int32_t
rowSize
,
int32_t
finalRowSize
)
{
int32_t
maxRowSize
=
MAX
(
rowSize
,
finalRowSize
);
char
*
pbuf
=
calloc
(
1
,
(
size_t
)(
pOutput
->
num
*
maxRowSize
));
...
...
@@ -736,9 +731,6 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
return
offset
;
}
#define COLMODEL_GET_VAL(data, schema, rowId, colId) \
(data + (schema)->pFields[colId].offset * ((schema)->capacity) + (rowId) * (schema)->pFields[colId].field.bytes)
static
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
char
*
buf
,
SColumnModel
*
pModel
,
int32_t
rowIndex
,
int32_t
maxRows
)
{
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
...
...
@@ -760,7 +752,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SMultiwayMergeInfo
*
pInfo
=
pOperator
->
info
;
S
Loc
alMerger
*
pMerger
=
pInfo
->
pMerge
;
S
Glob
alMerger
*
pMerger
=
pInfo
->
pMerge
;
SLoserTreeInfo
*
pTree
=
pMerger
->
pLoserTree
;
pInfo
->
binfo
.
pRes
->
info
.
rows
=
0
;
...
...
@@ -844,7 +836,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
return
(
pInfo
->
binfo
.
pRes
->
info
.
rows
>
0
)
?
pInfo
->
binfo
.
pRes
:
NULL
;
}
static
bool
isSameGroup
Rv
(
SArray
*
orderColumnList
,
SSDataBlock
*
pBlock
,
char
**
dataCols
)
{
static
bool
isSameGroup
(
SArray
*
orderColumnList
,
SSDataBlock
*
pBlock
,
char
**
dataCols
)
{
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
orderColumnList
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColIndex
*
pIndex
=
taosArrayGet
(
orderColumnList
,
i
);
...
...
@@ -892,7 +884,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
}
}
doExecuteFinalMerge
Rv
(
pOperator
,
pOperator
->
numOfOutput
,
pAggInfo
->
pExistBlock
);
doExecuteFinalMerge
(
pOperator
,
pOperator
->
numOfOutput
,
pAggInfo
->
pExistBlock
);
savePrevOrderColumns
(
pAggInfo
->
currentGroupColData
,
pAggInfo
->
groupColumnList
,
pAggInfo
->
pExistBlock
,
0
,
&
pAggInfo
->
hasGroupColData
);
...
...
@@ -913,7 +905,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
}
if
(
pAggInfo
->
hasGroupColData
)
{
bool
sameGroup
=
isSameGroup
Rv
(
pAggInfo
->
groupColumnList
,
pBlock
,
pAggInfo
->
currentGroupColData
);
bool
sameGroup
=
isSameGroup
(
pAggInfo
->
groupColumnList
,
pBlock
,
pAggInfo
->
currentGroupColData
);
if
(
!
sameGroup
)
{
*
newgroup
=
true
;
pAggInfo
->
hasDataBlockForNewGroup
=
true
;
...
...
@@ -927,7 +919,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
setInputDataBlock
(
pOperator
,
pAggInfo
->
binfo
.
pCtx
,
pBlock
,
TSDB_ORDER_ASC
);
updateOutputBuf
(
&
pAggInfo
->
binfo
,
&
pAggInfo
->
bufCapacity
,
pBlock
->
info
.
rows
*
pAggInfo
->
resultRowFactor
);
doExecuteFinalMerge
Rv
(
pOperator
,
pOperator
->
numOfOutput
,
pBlock
);
doExecuteFinalMerge
(
pOperator
,
pOperator
->
numOfOutput
,
pBlock
);
savePrevOrderColumns
(
pAggInfo
->
currentGroupColData
,
pAggInfo
->
groupColumnList
,
pBlock
,
0
,
&
pAggInfo
->
hasGroupColData
);
handleData
=
true
;
}
...
...
src/client/src/tscLocal.c
浏览文件 @
d2886dee
...
...
@@ -20,7 +20,7 @@
#include "tname.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "
tschemautil
.h"
#include "
qTableMeta
.h"
#include "tsclient.h"
#include "taos.h"
#include "tscSubquery.h"
...
...
@@ -71,7 +71,9 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
numOfRows
=
numOfRows
+
tscGetNumOfTags
(
pMeta
);
}
tscInitResObjForLocalQuery
(
pSql
,
totalNumOfRows
,
rowLen
);
pSql
->
res
.
pMerger
=
tscInitResObjForLocalQuery
(
totalNumOfRows
,
rowLen
,
pSql
->
self
);
tscInitResForMerge
(
&
pSql
->
res
);
SSchema
*
pSchema
=
tscGetTableSchema
(
pMeta
);
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
...
...
@@ -433,7 +435,8 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
if
(
strlen
(
ddl
)
==
0
)
{
}
tscInitResObjForLocalQuery
(
pSql
,
numOfRows
,
rowLen
);
pSql
->
res
.
pMerger
=
tscInitResObjForLocalQuery
(
numOfRows
,
rowLen
,
pSql
->
self
);
tscInitResForMerge
(
&
pSql
->
res
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
0
);
char
*
dst
=
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
0
)
*
numOfRows
;
...
...
@@ -882,7 +885,8 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
TAOS_FIELD
f
=
tscCreateField
((
int8_t
)
type
,
columnName
,
(
int16_t
)
valueLength
);
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
tscInitResObjForLocalQuery
(
pSql
,
1
,
(
int32_t
)
valueLength
);
pSql
->
res
.
pMerger
=
tscInitResObjForLocalQuery
(
1
,
(
int32_t
)
valueLength
,
pSql
->
self
);
tscInitResForMerge
(
&
pSql
->
res
);
SInternalField
*
pInfo
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
0
);
pInfo
->
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
...
...
src/client/src/tscParseInsert.c
浏览文件 @
d2886dee
此差异已折叠。
点击以展开。
src/client/src/tscPrepare.c
浏览文件 @
d2886dee
...
...
@@ -1085,7 +1085,7 @@ static int insertStmtExecute(STscStmt* stmt) {
fillTablesColumnsNull
(
stmt
->
pSql
);
int
code
=
tscMergeTableDataBlocks
(
stmt
->
pSql
,
false
);
int
code
=
tscMergeTableDataBlocks
(
&
stmt
->
pSql
->
cmd
.
insertParam
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1185,7 +1185,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
fillTablesColumnsNull
(
pStmt
->
pSql
);
if
((
code
=
tscMergeTableDataBlocks
(
pStmt
->
pSql
,
false
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
tscMergeTableDataBlocks
(
&
pStmt
->
pSql
->
cmd
.
insertParam
,
false
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1213,7 +1213,7 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
}
int32_t
index
=
0
;
SStrToken
sToken
=
tStrGetToken
(
pCmd
->
curS
ql
,
&
index
,
false
);
SStrToken
sToken
=
tStrGetToken
(
pCmd
->
insertParam
.
s
ql
,
&
index
,
false
);
if
(
sToken
.
n
==
0
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
...
...
@@ -1232,7 +1232,7 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
pStmt
->
mtb
.
tagSet
=
true
;
sToken
=
tStrGetToken
(
pCmd
->
curS
ql
,
&
index
,
false
);
sToken
=
tStrGetToken
(
pCmd
->
insertParam
.
s
ql
,
&
index
,
false
);
if
(
sToken
.
n
>
0
&&
sToken
.
type
==
TK_VALUES
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1241,18 +1241,18 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
sToken
=
tStrGetToken
(
pCmd
->
curS
ql
,
&
index
,
false
);
sToken
=
tStrGetToken
(
pCmd
->
insertParam
.
s
ql
,
&
index
,
false
);
if
(
sToken
.
n
<=
0
||
((
sToken
.
type
!=
TK_ID
)
&&
(
sToken
.
type
!=
TK_STRING
)))
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
pStmt
->
mtb
.
stbname
=
sToken
;
sToken
=
tStrGetToken
(
pCmd
->
curS
ql
,
&
index
,
false
);
sToken
=
tStrGetToken
(
pCmd
->
insertParam
.
s
ql
,
&
index
,
false
);
if
(
sToken
.
n
<=
0
||
sToken
.
type
!=
TK_TAGS
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
sToken
=
tStrGetToken
(
pCmd
->
curS
ql
,
&
index
,
false
);
sToken
=
tStrGetToken
(
pCmd
->
insertParam
.
s
ql
,
&
index
,
false
);
if
(
sToken
.
n
<=
0
||
sToken
.
type
!=
TK_LP
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
...
...
@@ -1262,7 +1262,7 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
int32_t
loopCont
=
1
;
while
(
loopCont
)
{
sToken
=
tStrGetToken
(
pCmd
->
curS
ql
,
&
index
,
false
);
sToken
=
tStrGetToken
(
pCmd
->
insertParam
.
s
ql
,
&
index
,
false
);
if
(
sToken
.
n
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
...
...
@@ -1285,7 +1285,7 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
sToken
=
tStrGetToken
(
pCmd
->
curS
ql
,
&
index
,
false
);
sToken
=
tStrGetToken
(
pCmd
->
insertParam
.
s
ql
,
&
index
,
false
);
if
(
sToken
.
n
<=
0
||
(
sToken
.
type
!=
TK_VALUES
&&
sToken
.
type
!=
TK_LP
))
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
...
...
@@ -1468,7 +1468,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
if
(
tscIsInsertData
(
pSql
->
sqlstr
))
{
pStmt
->
isInsert
=
true
;
pSql
->
cmd
.
numOfParams
=
0
;
pSql
->
cmd
.
insertParam
.
numOfParams
=
0
;
pSql
->
cmd
.
batchSize
=
0
;
registerSqlObj
(
pSql
);
...
...
@@ -1565,7 +1565,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
tscDebug
(
"0x%"
PRIx64
" SQL: %s"
,
pSql
->
self
,
pSql
->
sqlstr
);
pSql
->
cmd
.
numOfParams
=
0
;
pSql
->
cmd
.
insertParam
.
numOfParams
=
0
;
pSql
->
cmd
.
batchSize
=
0
;
if
(
taosHashGetSize
(
pCmd
->
insertParam
.
pTableBlockHashList
)
>
0
)
{
...
...
@@ -1851,8 +1851,8 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
if
(
pStmt
->
isInsert
)
{
SSqlObj
*
pSql
=
pStmt
->
pSql
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
*
nums
=
pCmd
->
numOfParams
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
*
nums
=
pCmd
->
insertParam
.
numOfParams
;
return
TSDB_CODE_SUCCESS
;
}
else
{
SNormalStmt
*
normal
=
&
pStmt
->
normal
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
d2886dee
...
...
@@ -28,7 +28,7 @@
#include "tname.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "
tschemautil
.h"
#include "
qTableMeta
.h"
#include "tsclient.h"
#include "tstrbuild.h"
#include "ttoken.h"
...
...
@@ -195,7 +195,7 @@ static bool validateDebugFlag(int32_t v) {
* is not needed in the final error message.
*/
static
int32_t
invalidOperationMsg
(
char
*
dstBuffer
,
const
char
*
errMsg
)
{
return
tscInvalid
SQLErr
Msg
(
dstBuffer
,
errMsg
,
NULL
);
return
tscInvalid
Operation
Msg
(
dstBuffer
,
errMsg
,
NULL
);
}
static
int
setColumnFilterInfoForTimestamp
(
SSqlCmd
*
pCmd
,
SQueryInfo
*
pQueryInfo
,
tVariant
*
pVar
)
{
...
...
@@ -6603,7 +6603,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
if
(
!
findColumnIndex
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
return
tscInvalid
SQLErr
Msg
(
pCmd
->
payload
,
"invalid tag name"
,
sToken
->
z
);
return
tscInvalid
Operation
Msg
(
pCmd
->
payload
,
"invalid tag name"
,
sToken
->
z
);
}
}
}
else
{
...
...
@@ -7456,6 +7456,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
// validate the query filter condition info
if
(
pSqlNode
->
pWhere
!=
NULL
)
{
if
(
validateWhereNode
(
pQueryInfo
,
&
pSqlNode
->
pWhere
,
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
...
...
@@ -7467,6 +7468,16 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo
->
window
.
ekey
=
pQueryInfo
->
window
.
ekey
/
1000
;
}
}
// validate the interval info
if
(
validateIntervalNode
(
pSql
,
pQueryInfo
,
pSqlNode
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
else
{
if
(
isTimeWindowQuery
(
pQueryInfo
)
&&
(
validateFunctionsInIntervalOrGroupbyQuery
(
pCmd
,
pQueryInfo
)
!=
TSDB_CODE_SUCCESS
))
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
}
}
else
{
pQueryInfo
->
command
=
TSDB_SQL_SELECT
;
...
...
@@ -7612,6 +7623,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo
->
groupbyColumn
=
tscGroupbyColumn
(
pQueryInfo
);
pQueryInfo
->
arithmeticOnAgg
=
tsIsArithmeticQueryOnAggResult
(
pQueryInfo
);
pQueryInfo
->
orderProjectQuery
=
tscOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
);
SExprInfo
**
p
=
NULL
;
int32_t
numOfExpr
=
0
;
...
...
src/client/src/tscServer.c
浏览文件 @
d2886dee
...
...
@@ -15,15 +15,15 @@
#include "os.h"
#include "tcmdtype.h"
#include "tlockfree.h"
#include "trpc.h"
#include "tsc
LocalM
erge.h"
#include "tsc
Globalm
erge.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscUtil.h"
#include "
tschemautil
.h"
#include "
qTableMeta
.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tlockfree.h"
#include "qPlan.h"
int
(
*
tscBuildMsg
[
TSDB_SQL_MAX
])(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
=
{
0
};
...
...
@@ -1598,7 +1598,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
return
code
;
}
if
(
pRes
->
p
Local
Merger
==
NULL
)
{
// no result from subquery, so abort here directly.
if
(
pRes
->
pMerger
==
NULL
)
{
// no result from subquery, so abort here directly.
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
pRes
->
numOfRows
);
return
code
;
}
...
...
@@ -1615,15 +1615,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
taosArrayPush
(
group
,
&
tableKeyInfo
);
taosArrayPush
(
tableGroupInfo
.
pGroupList
,
&
group
);
// todo remove it
SExprInfo
*
list
=
calloc
(
tscNumOfExprs
(
pQueryInfo
),
sizeof
(
SExprInfo
));
for
(
int32_t
i
=
0
;
i
<
tscNumOfExprs
(
pQueryInfo
);
++
i
)
{
SExprInfo
*
pExprInfo
=
tscExprGet
(
pQueryInfo
,
i
);
list
[
i
]
=
*
pExprInfo
;
}
pQueryInfo
->
pQInfo
=
createQInfoFromQueryNode
(
pQueryInfo
,
list
,
&
tableGroupInfo
,
NULL
,
NULL
,
pRes
->
pLocalMerger
,
MERGE_STAGE
);
tfree
(
list
);
pQueryInfo
->
pQInfo
=
createQInfoFromQueryNode
(
pQueryInfo
,
&
tableGroupInfo
,
NULL
,
NULL
,
pRes
->
pMerger
,
MERGE_STAGE
);
}
uint64_t
localQueryId
=
0
;
...
...
@@ -2402,8 +2394,8 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
char
*
pMsg
=
(
char
*
)
pInfoMsg
+
sizeof
(
STableInfoMsg
);
// tag data exists
if
(
autocreate
&&
pSql
->
cmd
.
tagData
.
dataLen
!=
0
)
{
pMsg
=
serializeTagData
(
&
pSql
->
cmd
.
tagData
,
pMsg
);
if
(
autocreate
&&
pSql
->
cmd
.
insertParam
.
tagData
.
dataLen
!=
0
)
{
pMsg
=
serializeTagData
(
&
pSql
->
cmd
.
insertParam
.
tagData
,
pMsg
);
}
pNew
->
cmd
.
payloadLen
=
(
int32_t
)(
pMsg
-
(
char
*
)
pInfoMsg
);
...
...
src/client/src/tscStream.c
浏览文件 @
d2886dee
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tschemautil.h>
#include "os.h"
#include "taosmsg.h"
#include "tscLog.h"
...
...
src/client/src/tscSubquery.c
浏览文件 @
d2886dee
...
...
@@ -21,7 +21,7 @@
#include "tcompare.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "
tschemautil
.h"
#include "
qTableMeta
.h"
#include "tsclient.h"
#include "qUtil.h"
#include "qPlan.h"
...
...
@@ -2446,7 +2446,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
assert
(
pState
->
numOfSub
>
0
);
int32_t
ret
=
tsc
LocalReducerEnvCreate
(
pQueryInfo
,
&
pMemoryBuf
,
pSql
->
subState
.
numOfSub
,
&
pDesc
,
nBufferSize
,
pSql
->
self
);
int32_t
ret
=
tsc
CreateGlobalMergerEnv
(
pQueryInfo
,
&
pMemoryBuf
,
pSql
->
subState
.
numOfSub
,
&
pDesc
,
nBufferSize
,
pSql
->
self
);
if
(
ret
!=
0
)
{
pRes
->
code
=
ret
;
tscAsyncResultOnError
(
pSql
);
...
...
@@ -2459,7 +2459,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
if
(
pSql
->
pSubs
==
NULL
)
{
tfree
(
pSql
->
pSubs
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tsc
LocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pState
->
numOfSub
);
tsc
DestroyGlobalMergerEnv
(
pMemoryBuf
,
pDesc
,
pState
->
numOfSub
);
tscAsyncResultOnError
(
pSql
);
return
ret
;
...
...
@@ -2526,13 +2526,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscError
(
"0x%"
PRIx64
" failed to prepare subquery structure and launch subqueries"
,
pSql
->
self
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tsc
LocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pState
->
numOfSub
);
tsc
DestroyGlobalMergerEnv
(
pMemoryBuf
,
pDesc
,
pState
->
numOfSub
);
doCleanupSubqueries
(
pSql
,
i
);
return
pRes
->
code
;
// free all allocated resource
}
if
(
pRes
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
)
{
tsc
LocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pState
->
numOfSub
);
tsc
DestroyGlobalMergerEnv
(
pMemoryBuf
,
pDesc
,
pState
->
numOfSub
);
doCleanupSubqueries
(
pSql
,
i
);
return
pRes
->
code
;
}
...
...
@@ -2697,7 +2697,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tstrerror
(
pParentSql
->
res
.
code
));
// release allocated resource
tsc
LocalReducerEnvDestroy
(
trsupport
->
pExtMemBuffer
,
trsupport
->
pOrderDescriptor
,
tsc
DestroyGlobalMergerEnv
(
trsupport
->
pExtMemBuffer
,
trsupport
->
pOrderDescriptor
,
pState
->
numOfSub
);
tscFreeRetrieveSup
(
pSql
);
...
...
@@ -2772,7 +2772,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
SQueryInfo
*
pPQueryInfo
=
tscGetQueryInfo
(
&
pParentSql
->
cmd
);
tscClearInterpInfo
(
pPQueryInfo
);
code
=
tscCreate
LocalMerger
(
trsupport
->
pExtMemBuffer
,
pState
->
numOfSub
,
pDesc
,
pPQueryInfo
,
&
pParentSql
->
res
.
pLocal
Merger
,
pParentSql
->
self
);
code
=
tscCreate
GlobalMerger
(
trsupport
->
pExtMemBuffer
,
pState
->
numOfSub
,
pDesc
,
pPQueryInfo
,
&
pParentSql
->
res
.
p
Merger
,
pParentSql
->
self
);
pParentSql
->
res
.
code
=
code
;
if
(
code
==
TSDB_CODE_SUCCESS
&&
trsupport
->
pExtMemBuffer
==
NULL
)
{
...
...
@@ -3499,9 +3499,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return
hasData
;
}
// todo remove pExprs
void
*
createQInfoFromQueryNode
(
SQueryInfo
*
pQueryInfo
,
SExprInfo
*
pExprs
,
STableGroupInfo
*
pTableGroupInfo
,
SOperatorInfo
*
pSourceOperator
,
char
*
sql
,
void
*
merger
,
int32_t
stage
)
{
void
*
createQInfoFromQueryNode
(
SQueryInfo
*
pQueryInfo
,
STableGroupInfo
*
pTableGroupInfo
,
SOperatorInfo
*
pSourceOperator
,
char
*
sql
,
void
*
merger
,
int32_t
stage
)
{
assert
(
pQueryInfo
!=
NULL
);
SQInfo
*
pQInfo
=
(
SQInfo
*
)
calloc
(
1
,
sizeof
(
SQInfo
));
if
(
pQInfo
==
NULL
)
{
...
...
src/client/src/tscUtil.c
浏览文件 @
d2886dee
此差异已折叠。
点击以展开。
go
@
050667e5
比较
8ce6d865
...
050667e5
Subproject commit
8ce6d86558afc8c0b50c10f990fd2b4270cf06fc
Subproject commit
050667e5b4d0eafa5387e4283e713559b421203f
grafanaplugin
@
32e2c97a
比较
3530c6df
...
32e2c97a
Subproject commit 3
530c6df097134a410bacec6b3cd013ef38a61aa
Subproject commit 3
2e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
src/query/inc/qExecutor.h
浏览文件 @
d2886dee
...
...
@@ -22,6 +22,7 @@
#include "qFill.h"
#include "qResultbuf.h"
#include "qSqlparser.h"
#include "qTableMeta.h"
#include "qTsbuf.h"
#include "query.h"
#include "taosdef.h"
...
...
@@ -70,14 +71,6 @@ typedef struct SResultRowPool {
SArray
*
pData
;
// SArray<void*>
}
SResultRowPool
;
typedef
struct
SGroupbyExpr
{
int16_t
tableIndex
;
SArray
*
columnInfo
;
// SArray<SColIndex>, group by columns information
int16_t
numOfGroupCols
;
// todo remove it
int16_t
orderIndex
;
// order by column index
int16_t
orderType
;
// order by type: asc/desc
}
SGroupbyExpr
;
typedef
struct
SResultRow
{
int32_t
pageId
;
// pageId & rowId is the position of current result in disk-based output buffer
int32_t
offset
:
29
;
// row index in buffer page
...
...
@@ -216,7 +209,7 @@ typedef struct SQueryAttr {
int32_t
intermediateResultRowSize
;
// intermediate result row size, in case of top-k query.
int32_t
maxTableColumnWidth
;
int32_t
tagLen
;
// tag value length of current query
SGroupbyExpr
*
pGroupbyExpr
;
SGroupbyExpr
*
pGroupbyExpr
;
SExprInfo
*
pExpr1
;
SExprInfo
*
pExpr2
;
...
...
@@ -475,10 +468,10 @@ typedef struct SDistinctOperatorInfo {
int64_t
outputCapacity
;
}
SDistinctOperatorInfo
;
struct
S
Loc
alMerger
;
struct
S
Glob
alMerger
;
typedef
struct
SMultiwayMergeInfo
{
struct
S
Loc
alMerger
*
pMerge
;
struct
S
Glob
alMerger
*
pMerge
;
SOptrBasicInfo
binfo
;
int32_t
bufCapacity
;
int64_t
seed
;
...
...
@@ -559,6 +552,8 @@ int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId);
void
freeColumnFilterInfo
(
SColumnFilterInfo
*
pFilter
,
int32_t
numOfFilters
);
STableQueryInfo
*
createTableQueryInfo
(
SQueryAttr
*
pQueryAttr
,
void
*
pTable
,
bool
groupbyColumn
,
STimeWindow
win
,
void
*
buf
);
STableQueryInfo
*
createTmpTableQueryInfo
(
STimeWindow
win
);
int32_t
buildArithmeticExprFromMsg
(
SExprInfo
*
pArithExprInfo
,
void
*
pQueryMsg
);
bool
isQueryKilled
(
SQInfo
*
pQInfo
);
...
...
src/query/inc/qPlan.h
浏览文件 @
d2886dee
...
...
@@ -16,6 +16,8 @@
#ifndef TDENGINE_QPLAN_H
#define TDENGINE_QPLAN_H
#include "qExecutor.h"
struct
SQueryInfo
;
typedef
struct
SQueryNodeBasicInfo
{
...
...
src/query/inc/qSqlparser.h
浏览文件 @
d2886dee
...
...
@@ -142,7 +142,7 @@ typedef struct SCreateTableSql {
}
colInfo
;
SArray
*
childTableInfo
;
// SArray<SCreatedTableInfo>
SSqlNode
*
pSelect
;
SSqlNode
*
pSelect
;
}
SCreateTableSql
;
typedef
struct
SAlterTableInfo
{
...
...
@@ -258,7 +258,6 @@ SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int
SArray
*
tVariantListAppendToken
(
SArray
*
pList
,
SStrToken
*
pAliasToken
,
uint8_t
sortOrder
);
SRelationInfo
*
setTableNameList
(
SRelationInfo
*
pFromInfo
,
SStrToken
*
pName
,
SStrToken
*
pAlias
);
//SRelationInfo *setSubquery(SRelationInfo* pFromInfo, SRelElementPair* p);
void
*
destroyRelationInfo
(
SRelationInfo
*
pFromInfo
);
SRelationInfo
*
addSubqueryElem
(
SRelationInfo
*
pRelationInfo
,
SArray
*
pSub
,
SStrToken
*
pAlias
);
...
...
src/query/inc/qTableMeta.h
0 → 100644
浏览文件 @
d2886dee
#ifndef TDENGINE_QTABLEUTIL_H
#define TDENGINE_QTABLEUTIL_H
#include "tsdb.h" //todo tsdb should not be here
#include "qSqlparser.h"
typedef
struct
SFieldInfo
{
int16_t
numOfOutput
;
// number of column in result
TAOS_FIELD
*
final
;
SArray
*
internalField
;
// SArray<SInternalField>
}
SFieldInfo
;
typedef
struct
SCond
{
uint64_t
uid
;
int32_t
len
;
// length of tag query condition data
char
*
cond
;
}
SCond
;
typedef
struct
SJoinNode
{
uint64_t
uid
;
int16_t
tagColId
;
SArray
*
tsJoin
;
SArray
*
tagJoin
;
}
SJoinNode
;
typedef
struct
SJoinInfo
{
bool
hasJoin
;
SJoinNode
*
joinTables
[
TSDB_MAX_JOIN_TABLE_NUM
];
}
SJoinInfo
;
typedef
struct
STagCond
{
// relation between tbname list and query condition, including : TK_AND or TK_OR
int16_t
relType
;
// tbname query condition, only support tbname query condition on one table
SCond
tbnameCond
;
// join condition, only support two tables join currently
SJoinInfo
joinInfo
;
// for different table, the query condition must be seperated
SArray
*
pCond
;
}
STagCond
;
typedef
struct
SGroupbyExpr
{
int16_t
tableIndex
;
SArray
*
columnInfo
;
// SArray<SColIndex>, group by columns information
int16_t
numOfGroupCols
;
// todo remove it
int16_t
orderIndex
;
// order by column index
int16_t
orderType
;
// order by type: asc/desc
}
SGroupbyExpr
;
typedef
struct
STableComInfo
{
uint8_t
numOfTags
;
uint8_t
precision
;
int16_t
numOfColumns
;
int32_t
rowSize
;
}
STableComInfo
;
typedef
struct
STableMeta
{
int32_t
vgId
;
STableId
id
;
uint8_t
tableType
;
char
sTableName
[
TSDB_TABLE_FNAME_LEN
];
// super table name
uint64_t
suid
;
// super table id
int16_t
sversion
;
int16_t
tversion
;
STableComInfo
tableInfo
;
SSchema
schema
[];
// if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
}
STableMeta
;
typedef
struct
STableMetaInfo
{
STableMeta
*
pTableMeta
;
// table meta, cached in client side and acquired by name
uint32_t
tableMetaSize
;
SVgroupsInfo
*
vgroupList
;
SArray
*
pVgroupTables
;
// SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t
vgroupIndex
;
SName
name
;
char
aliasName
[
TSDB_TABLE_NAME_LEN
];
// alias name of table specified in query sql
SArray
*
tagColList
;
// SArray<SColumn*>, involved tag columns
}
STableMetaInfo
;
struct
SQInfo
;
// global merge operator
struct
SQueryAttr
;
// query object
typedef
struct
SQueryInfo
{
int16_t
command
;
// the command may be different for each subclause, so keep it seperately.
uint32_t
type
;
// query/insert type
STimeWindow
window
;
// the whole query time window
SInterval
interval
;
// tumble time window
SSessionWindow
sessionWindow
;
// session time window
SGroupbyExpr
groupbyExpr
;
// groupby tags info
SArray
*
colList
;
// SArray<SColumn*>
SFieldInfo
fieldsInfo
;
SArray
*
exprList
;
// SArray<SExprInfo*>
SArray
*
exprList1
;
// final exprlist in case of arithmetic expression exists
SLimitVal
limit
;
SLimitVal
slimit
;
STagCond
tagCond
;
SOrderVal
order
;
int16_t
fillType
;
// final result fill type
int16_t
numOfTables
;
STableMetaInfo
**
pTableMetaInfo
;
struct
STSBuf
*
tsBuf
;
int64_t
*
fillVal
;
// default value for fill
char
*
msg
;
// pointer to the pCmd->payload to keep error message temporarily
int64_t
clauseLimit
;
// limit for current sub clause
int64_t
prjOffset
;
// offset value in the original sql expression, only applied at client side
int64_t
vgroupLimit
;
// table limit in case of super table projection query + global order + limit
int32_t
udColumnId
;
// current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t
resColumnId
;
// result column id
bool
distinctTag
;
// distinct tag or not
int32_t
round
;
// 0/1/....
int32_t
bufLen
;
char
*
buf
;
struct
SQInfo
*
pQInfo
;
// global merge operator
struct
SQueryAttr
*
pQueryAttr
;
// query object
struct
SQueryInfo
*
sibling
;
// sibling
SArray
*
pUpstream
;
// SArray<struct SQueryInfo>
struct
SQueryInfo
*
pDownstream
;
int32_t
havingFieldNum
;
bool
stableQuery
;
bool
groupbyColumn
;
bool
simpleAgg
;
bool
arithmeticOnAgg
;
bool
projectionQuery
;
bool
hasFilter
;
bool
onlyTagQuery
;
bool
orderProjectQuery
;
}
SQueryInfo
;
/**
* get the number of tags of this table
* @param pTableMeta
* @return
*/
int32_t
tscGetNumOfTags
(
const
STableMeta
*
pTableMeta
);
/**
* get the number of columns of this table
* @param pTableMeta
* @return
*/
int32_t
tscGetNumOfColumns
(
const
STableMeta
*
pTableMeta
);
/**
* get the basic info of this table
* @param pTableMeta
* @return
*/
STableComInfo
tscGetTableInfo
(
const
STableMeta
*
pTableMeta
);
/**
* get the schema
* @param pTableMeta
* @return
*/
SSchema
*
tscGetTableSchema
(
const
STableMeta
*
pTableMeta
);
/**
* get the tag schema
* @param pMeta
* @return
*/
SSchema
*
tscGetTableTagSchema
(
const
STableMeta
*
pMeta
);
/**
* get the column schema according to the column index
* @param pMeta
* @param colIndex
* @return
*/
SSchema
*
tscGetTableColumnSchema
(
const
STableMeta
*
pMeta
,
int32_t
colIndex
);
/**
* get the column schema according to the column id
* @param pTableMeta
* @param colId
* @return
*/
SSchema
*
tscGetColumnSchemaById
(
STableMeta
*
pTableMeta
,
int16_t
colId
);
/**
* create the table meta from the msg
* @param pTableMetaMsg
* @param size size of the table meta
* @return
*/
STableMeta
*
tscCreateTableMetaFromMsg
(
STableMetaMsg
*
pTableMetaMsg
);
#endif // TDENGINE_QTABLEUTIL_H
src/query/src/qExecutor.c
浏览文件 @
d2886dee
...
...
@@ -1718,7 +1718,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case
OP_TimeWindow
:
{
pRuntimeEnv
->
proot
=
createTimeIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
if
(
opType
!=
OP_DummyInput
&&
opType
!=
OP_Join
)
{
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
}
break
;
}
case
OP_Groupby
:
{
...
...
@@ -3267,6 +3270,25 @@ STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool
return
pTableQueryInfo
;
}
STableQueryInfo
*
createTmpTableQueryInfo
(
STimeWindow
win
)
{
STableQueryInfo
*
pTableQueryInfo
=
calloc
(
1
,
sizeof
(
STableQueryInfo
));
pTableQueryInfo
->
win
=
win
;
pTableQueryInfo
->
lastKey
=
win
.
skey
;
pTableQueryInfo
->
pTable
=
NULL
;
pTableQueryInfo
->
cur
.
vgroupIndex
=
-
1
;
// set more initial size of interval/groupby query
int32_t
initialSize
=
16
;
int32_t
code
=
initResultRowInfo
(
&
pTableQueryInfo
->
resInfo
,
initialSize
,
TSDB_DATA_TYPE_INT
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
return
pTableQueryInfo
;
}
void
destroyTableQueryInfoImpl
(
STableQueryInfo
*
pTableQueryInfo
)
{
if
(
pTableQueryInfo
==
NULL
)
{
return
;
...
...
src/query/src/qPlan.c
浏览文件 @
d2886dee
#include "os.h"
#include "
tschemautil
.h"
#include "
qTableMeta
.h"
#include "qPlan.h"
#include "qExecutor.h"
#include "qUtil.h"
...
...
src/
client/src/tscSchemaUtil
.c
→
src/
query/src/qTableMeta
.c
浏览文件 @
d2886dee
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "
tschemautil
.h"
#include "
qTableMeta
.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tutil.h"
#include "tsclient.h"
int32_t
tscGetNumOfTags
(
const
STableMeta
*
pTableMeta
)
{
assert
(
pTableMeta
!=
NULL
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMeta
);
if
(
pTableMeta
->
tableType
==
TSDB_NORMAL_TABLE
)
{
assert
(
tinfo
.
numOfTags
==
0
);
return
0
;
}
if
(
pTableMeta
->
tableType
==
TSDB_SUPER_TABLE
||
pTableMeta
->
tableType
==
TSDB_CHILD_TABLE
)
{
return
tinfo
.
numOfTags
;
}
assert
(
tinfo
.
numOfTags
==
0
);
return
0
;
}
int32_t
tscGetNumOfColumns
(
const
STableMeta
*
pTableMeta
)
{
assert
(
pTableMeta
!=
NULL
);
// table created according to super table, use data from super table
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMeta
);
return
tinfo
.
numOfColumns
;
...
...
@@ -54,10 +39,10 @@ SSchema *tscGetTableSchema(const STableMeta *pTableMeta) {
SSchema
*
tscGetTableTagSchema
(
const
STableMeta
*
pTableMeta
)
{
assert
(
pTableMeta
!=
NULL
&&
(
pTableMeta
->
tableType
==
TSDB_SUPER_TABLE
||
pTableMeta
->
tableType
==
TSDB_CHILD_TABLE
));
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMeta
);
assert
(
tinfo
.
numOfTags
>
0
);
return
tscGetTableColumnSchema
(
pTableMeta
,
tinfo
.
numOfColumns
);
}
...
...
@@ -68,7 +53,7 @@ STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) {
SSchema
*
tscGetTableColumnSchema
(
const
STableMeta
*
pTableMeta
,
int32_t
colIndex
)
{
assert
(
pTableMeta
!=
NULL
);
SSchema
*
pSchema
=
(
SSchema
*
)
pTableMeta
->
schema
;
return
&
pSchema
[
colIndex
];
}
...
...
@@ -88,7 +73,7 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
STableMeta
*
tscCreateTableMetaFromMsg
(
STableMetaMsg
*
pTableMetaMsg
)
{
assert
(
pTableMetaMsg
!=
NULL
&&
pTableMetaMsg
->
numOfColumns
>=
2
&&
pTableMetaMsg
->
numOfTags
>=
0
);
int32_t
schemaSize
=
(
pTableMetaMsg
->
numOfColumns
+
pTableMetaMsg
->
numOfTags
)
*
sizeof
(
SSchema
);
STableMeta
*
pTableMeta
=
calloc
(
1
,
sizeof
(
STableMeta
)
+
schemaSize
);
...
...
@@ -97,11 +82,11 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
pTableMeta
->
suid
=
pTableMetaMsg
->
suid
;
pTableMeta
->
tableInfo
=
(
STableComInfo
)
{
.
numOfTags
=
pTableMetaMsg
->
numOfTags
,
.
precision
=
pTableMetaMsg
->
precision
,
.
numOfColumns
=
pTableMetaMsg
->
numOfColumns
,
.
numOfTags
=
pTableMetaMsg
->
numOfTags
,
.
precision
=
pTableMetaMsg
->
precision
,
.
numOfColumns
=
pTableMetaMsg
->
numOfColumns
,
};
pTableMeta
->
id
.
tid
=
pTableMetaMsg
->
tid
;
pTableMeta
->
id
.
uid
=
pTableMetaMsg
->
uid
;
...
...
@@ -109,69 +94,14 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
pTableMeta
->
tversion
=
pTableMetaMsg
->
tversion
;
tstrncpy
(
pTableMeta
->
sTableName
,
pTableMetaMsg
->
sTableName
,
TSDB_TABLE_FNAME_LEN
);
memcpy
(
pTableMeta
->
schema
,
pTableMetaMsg
->
schema
,
schemaSize
);
int32_t
numOfTotalCols
=
pTableMeta
->
tableInfo
.
numOfColumns
;
for
(
int32_t
i
=
0
;
i
<
numOfTotalCols
;
++
i
)
{
pTableMeta
->
tableInfo
.
rowSize
+=
pTableMeta
->
schema
[
i
].
bytes
;
}
return
pTableMeta
;
}
bool
vgroupInfoIdentical
(
SNewVgroupInfo
*
pExisted
,
SVgroupMsg
*
src
)
{
assert
(
pExisted
!=
NULL
&&
src
!=
NULL
);
if
(
pExisted
->
numOfEps
!=
src
->
numOfEps
)
{
return
false
;
}
for
(
int32_t
i
=
0
;
i
<
pExisted
->
numOfEps
;
++
i
)
{
if
(
pExisted
->
ep
[
i
].
port
!=
src
->
epAddr
[
i
].
port
)
{
return
false
;
}
if
(
strncmp
(
pExisted
->
ep
[
i
].
fqdn
,
src
->
epAddr
[
i
].
fqdn
,
tListLen
(
pExisted
->
ep
[
i
].
fqdn
))
!=
0
)
{
return
false
;
}
}
return
true
;
}
SNewVgroupInfo
createNewVgroupInfo
(
SVgroupMsg
*
pVgroupMsg
)
{
assert
(
pVgroupMsg
!=
NULL
);
SNewVgroupInfo
info
=
{
0
};
info
.
numOfEps
=
pVgroupMsg
->
numOfEps
;
info
.
vgId
=
pVgroupMsg
->
vgId
;
info
.
inUse
=
0
;
// 0 is the default value of inUse in case of multiple replica
assert
(
info
.
numOfEps
>=
1
&&
info
.
vgId
>=
1
);
for
(
int32_t
i
=
0
;
i
<
pVgroupMsg
->
numOfEps
;
++
i
)
{
tstrncpy
(
info
.
ep
[
i
].
fqdn
,
pVgroupMsg
->
epAddr
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
info
.
ep
[
i
].
port
=
pVgroupMsg
->
epAddr
[
i
].
port
;
}
return
info
;
}
// todo refactor
UNUSED_FUNC
static
FORCE_INLINE
char
*
skipSegments
(
char
*
input
,
char
delim
,
int32_t
num
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
while
(
*
input
!=
0
&&
*
input
++
!=
delim
)
{
};
}
return
input
;
}
UNUSED_FUNC
static
FORCE_INLINE
size_t
copy
(
char
*
dst
,
const
char
*
src
,
char
delimiter
)
{
size_t
len
=
0
;
while
(
*
src
!=
delimiter
&&
*
src
!=
0
)
{
*
dst
++
=
*
src
++
;
len
++
;
}
return
len
;
return
pTableMeta
;
}
tests/script/general/parser/import_file.sim
浏览文件 @
d2886dee
...
...
@@ -18,6 +18,7 @@ system general/parser/gendata.sh
sql create table tbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16), imsi BINARY(16), imei BINARY(16), mdn BINARY(10), net_type BINARY(4), mno NCHAR(4), province NCHAR(10), city NCHAR(16), alarm BINARY(2))
print ====== create tables success, starting import data
sql import into tbx file '~/data.sql'
sql import into tbx file '~/data.sql'
sql select count(*) from tbx
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录