Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a3c11889
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a3c11889
编写于
5月 31, 2021
作者:
B
bryanchang0603
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into test/TD-4439
上级
39342aec
ad46a6be
变更
28
隐藏空白更改
内联
并排
Showing
28 changed file
with
864 addition
and
148 deletion
+864
-148
documentation20/cn/03.architecture/02.replica/docs.md
documentation20/cn/03.architecture/02.replica/docs.md
+3
-3
documentation20/cn/03.architecture/docs.md
documentation20/cn/03.architecture/docs.md
+1
-1
documentation20/cn/08.connector/docs.md
documentation20/cn/08.connector/docs.md
+1
-1
documentation20/cn/12.taos-sql/docs.md
documentation20/cn/12.taos-sql/docs.md
+4
-3
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+12
-19
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+6
-8
src/common/inc/tname.h
src/common/inc/tname.h
+0
-2
src/common/src/tname.c
src/common/src/tname.c
+0
-9
src/inc/taosdef.h
src/inc/taosdef.h
+0
-1
src/inc/tsdb.h
src/inc/tsdb.h
+5
-1
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+26
-22
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+74
-42
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+16
-17
src/query/src/qUtil.c
src/query/src/qUtil.c
+10
-4
src/tsdb/inc/tsdbCommit.h
src/tsdb/inc/tsdbCommit.h
+1
-0
src/tsdb/inc/tsdbint.h
src/tsdb/inc/tsdbint.h
+1
-0
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+1
-2
src/tsdb/src/tsdbFS.c
src/tsdb/src/tsdbFS.c
+77
-4
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+7
-1
src/tsdb/src/tsdbSync.c
src/tsdb/src/tsdbSync.c
+33
-8
src/util/inc/tarray.h
src/util/inc/tarray.h
+8
-0
src/util/src/tarray.c
src/util/src/tarray.c
+5
-0
tests/examples/c/apitest.c
tests/examples/c/apitest.c
+471
-0
tests/script/api/batchprepare.c
tests/script/api/batchprepare.c
+5
-0
tests/script/fullGeneralSuite.sim
tests/script/fullGeneralSuite.sim
+1
-0
tests/script/general/compute/block_dist.sim
tests/script/general/compute/block_dist.sim
+94
-0
tests/script/general/compute/testSuite.sim
tests/script/general/compute/testSuite.sim
+1
-0
tests/script/regressionSuite.sim
tests/script/regressionSuite.sim
+1
-0
未找到文件。
documentation20/cn/03.architecture/02.replica/docs.md
浏览文件 @
a3c11889
...
@@ -107,9 +107,9 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
...
@@ -107,9 +107,9 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性


1.
应用对写请求做基本的合法性检查,通过,则给
改
请求包打上一个版本号(version, 单调递增)
1.
应用对写请求做基本的合法性检查,通过,则给
该
请求包打上一个版本号(version, 单调递增)
2.
应用将打上版本号的写请求封装一个WAL Head, 写入WAL(Write Ahead Log)
2.
应用将打上版本号的写请求封装一个WAL Head, 写入WAL(Write Ahead Log)
3.
应用调用API syncForwardToPeer,如
多
vnode B是slave状态,sync模块将包含WAL Head的数据包通过Forward消息发送给vnode B,否则就不转发。
3.
应用调用API syncForwardToPeer,如
果
vnode B是slave状态,sync模块将包含WAL Head的数据包通过Forward消息发送给vnode B,否则就不转发。
4.
vnode B收到Forward消息后,调用回调函数writeToCache, 交给应用处理
4.
vnode B收到Forward消息后,调用回调函数writeToCache, 交给应用处理
5.
vnode B应用在写入成功后,都需要调用syncAckForward通知sync模块已经写入成功。
5.
vnode B应用在写入成功后,都需要调用syncAckForward通知sync模块已经写入成功。
6.
如果quorum大于1,vnode B需要等待应用的回复确认,收到确认后,vnode B发送Forward Response消息给node A。
6.
如果quorum大于1,vnode B需要等待应用的回复确认,收到确认后,vnode B发送Forward Response消息给node A。
...
@@ -219,7 +219,7 @@ Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系
...
@@ -219,7 +219,7 @@ Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系
不同之处:
不同之处:
-
选举流程不一样:Raft里任何一个节点是candidate时,主动向其他节点发出vote request
, 如果超过半数回答Yes, 这个candidate就成为Leader,开始一个新的term. 而TDengine的实现里,节点上线、离线或角色改变都会触发状态消息在节点组类传播,等节点组里状态稳定一致之后才触发选举流程,因为状态稳定一致,基于同样的状态信息,每个节点做出的决定会是一致的,一旦某个节点符合成为master的条件,无需其他节点认可,它会自动将自己设为master。TDengine里,任何一个节点检测到其他节点或自己的角色发生改变,就会给节点组内其他节点进行广播的
。Raft里不存在这样的机制,因此需要投票来解决。
-
选举流程不一样:Raft里任何一个节点是candidate时,主动向其他节点发出vote request
,如果超过半数回答Yes,这个candidate就成为Leader,开始一个新的term。而TDengine的实现里,节点上线、离线或角色改变都会触发状态消息在节点组内传播,等节点组里状态稳定一致之后才触发选举流程,因为状态稳定一致,基于同样的状态信息,每个节点做出的决定会是一致的,一旦某个节点符合成为master的条件,无需其他节点认可,它会自动将自己设为master。TDengine里,任何一个节点检测到其他节点或自己的角色发生改变,就会向节点组内其他节点进行广播
。Raft里不存在这样的机制,因此需要投票来解决。
-
对WAL的一条记录,Raft用term + index来做唯一标识。但TDengine只用version(类似index),在TDengine实现里,仅仅用version是完全可行的, 因为TDengine的选举机制,没有term的概念。
-
对WAL的一条记录,Raft用term + index来做唯一标识。但TDengine只用version(类似index),在TDengine实现里,仅仅用version是完全可行的, 因为TDengine的选举机制,没有term的概念。
如果整个虚拟节点组全部宕机,重启,但不是所有虚拟节点都上线,这个时候TDengine是不会选出master的,因为未上线的节点有可能有最高version的数据。而RAFT协议,只要超过半数上线,就会选出Leader。
如果整个虚拟节点组全部宕机,重启,但不是所有虚拟节点都上线,这个时候TDengine是不会选出master的,因为未上线的节点有可能有最高version的数据。而RAFT协议,只要超过半数上线,就会选出Leader。
...
...
documentation20/cn/03.architecture/docs.md
浏览文件 @
a3c11889
...
@@ -343,7 +343,7 @@ TDengine采用数据驱动的方式让缓存中的数据写入硬盘进行持久
...
@@ -343,7 +343,7 @@ TDengine采用数据驱动的方式让缓存中的数据写入硬盘进行持久
对于采集的数据,一般有保留时长,这个时长由系统配置参数keep决定。超过这个设置天数的数据文件,将被系统自动删除,释放存储空间。
对于采集的数据,一般有保留时长,这个时长由系统配置参数keep决定。超过这个设置天数的数据文件,将被系统自动删除,释放存储空间。
给定days与keep两个参数,一个
vnode总的数据文件数为:keep/days
。总的数据文件个数不宜过大,也不宜过小。10到100以内合适。基于这个原则,可以设置合理的days。 目前的版本,参数keep可以修改,但对于参数days,一但设置后,不可修改。
给定days与keep两个参数,一个
典型工作状态的vnode中总的数据文件数为:
`向上取整(keep/days)+1`
个
。总的数据文件个数不宜过大,也不宜过小。10到100以内合适。基于这个原则,可以设置合理的days。 目前的版本,参数keep可以修改,但对于参数days,一但设置后,不可修改。
在每个数据文件里,一张表的数据是一块一块存储的。一张表可以有一到多个数据文件块。在一个文件块里,数据是列式存储的,占用的是一片连续的存储空间,这样大大提高读取速度。文件块的大小由系统参数maxRows(每块最大记录条数)决定,缺省值为4096。这个值不宜过大,也不宜过小。过大,定位具体时间段的数据的搜索时间会变长,影响读取速度;过小,数据块的索引太大,压缩效率偏低,也影响读取速度。
在每个数据文件里,一张表的数据是一块一块存储的。一张表可以有一到多个数据文件块。在一个文件块里,数据是列式存储的,占用的是一片连续的存储空间,这样大大提高读取速度。文件块的大小由系统参数maxRows(每块最大记录条数)决定,缺省值为4096。这个值不宜过大,也不宜过小。过大,定位具体时间段的数据的搜索时间会变长,影响读取速度;过小,数据块的索引太大,压缩效率偏低,也影响读取速度。
...
...
documentation20/cn/08.connector/docs.md
浏览文件 @
a3c11889
...
@@ -516,7 +516,7 @@ conn.close()
...
@@ -516,7 +516,7 @@ conn.close()
-
_TDengineCursor_ 类
-
_TDengineCursor_ 类
参考python中help(taos.TDengineCursor)。
参考python中help(taos.TDengineCursor)。
这个类对应客户端进行的写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能
夸
线程共享使用,否则会导致返回结果出现错误。
这个类对应客户端进行的写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能
跨
线程共享使用,否则会导致返回结果出现错误。
-
_connect_ 方法
-
_connect_ 方法
...
...
documentation20/cn/12.taos-sql/docs.md
浏览文件 @
a3c11889
...
@@ -37,7 +37,7 @@ taos> DESCRIBE meters;
...
@@ -37,7 +37,7 @@ taos> DESCRIBE meters;
-
Epoch Time:时间戳也可以是一个长整数,表示从 1970-01-01 08:00:00.000 开始的毫秒数
-
Epoch Time:时间戳也可以是一个长整数,表示从 1970-01-01 08:00:00.000 开始的毫秒数
-
时间可以加减,比如 now-2h,表明查询时刻向前推 2 个小时(最近 2 小时)。数字后面的时间单位可以是 u(微秒)、a(毫秒)、s(秒)、m(分)、h(小时)、d(天)、w(周)。 比如
`select * from t1 where ts > now-2w and ts <= now-1w`
,表示查询两周前整整一周的数据。在指定降频操作(down sampling)的时间窗口(interval)时,时间单位还可以使用 n(自然月) 和 y(自然年)。
-
时间可以加减,比如 now-2h,表明查询时刻向前推 2 个小时(最近 2 小时)。数字后面的时间单位可以是 u(微秒)、a(毫秒)、s(秒)、m(分)、h(小时)、d(天)、w(周)。 比如
`select * from t1 where ts > now-2w and ts <= now-1w`
,表示查询两周前整整一周的数据。在指定降频操作(down sampling)的时间窗口(interval)时,时间单位还可以使用 n(自然月) 和 y(自然年)。
TDengine 缺省的时间戳是毫秒精度,但通过
修改配置参数 enableMicrosecond
就可以支持微秒。
TDengine 缺省的时间戳是毫秒精度,但通过
在 CREATE DATABASE 时传递的 PRECISION 参数
就可以支持微秒。
在TDengine中,普通表的数据模型中可使用以下 10 种数据类型。
在TDengine中,普通表的数据模型中可使用以下 10 种数据类型。
...
@@ -400,6 +400,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
...
@@ -400,6 +400,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
tb2_name (tb2_field1_name, ...) [USING stb2_name TAGS (tag_value2, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...;
tb2_name (tb2_field1_name, ...) [USING stb2_name TAGS (tag_value2, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...;
```
```
以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。
以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。
说明:
`(tb1_field1_name, ...)`
的部分可以省略掉,这样就是使用全列模式写入——也即在 VALUES 部分提供的数据,必须为数据表的每个列都显式地提供数据。全列写入速度会远快于指定列,因此建议尽可能采用全列写入方式,此时空列可以填入NULL。
从 2.0.20.5 版本开始,子表的列名可以不跟在子表名称后面,而是可以放在 TAGS 和 VALUES 之间,例如像下面这样写:
从 2.0.20.5 版本开始,子表的列名可以不跟在子表名称后面,而是可以放在 TAGS 和 VALUES 之间,例如像下面这样写:
```
mysql
```
mysql
INSERT INTO tb1_name [USING stb1_name TAGS (tag_value1, ...)] (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...) ...;
INSERT INTO tb1_name [USING stb1_name TAGS (tag_value1, ...)] (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...) ...;
...
@@ -423,9 +424,9 @@ Query OK, 1 row(s) in set (0.001029s)
...
@@ -423,9 +424,9 @@ Query OK, 1 row(s) in set (0.001029s)
taos> SHOW TABLES;
taos> SHOW TABLES;
Query OK, 0 row(s) in set (0.000946s)
Query OK, 0 row(s) in set (0.000946s)
taos> INSERT INTO d1001 USING meters TAGS('Beijing.Chaoyang', 2);
taos> INSERT INTO d1001 USING meters TAGS('Beijing.Chaoyang', 2)
VALUES('a')
;
DB error: invalid SQL:
keyword VALUES or FILE required
DB error: invalid SQL:
'a' (invalid timestamp) (0.039494s)
taos> SHOW TABLES;
taos> SHOW TABLES;
table_name | created_time | columns | stable_name |
table_name | created_time | columns | stable_name |
...
...
src/client/src/tscSQLParser.c
浏览文件 @
a3c11889
...
@@ -41,11 +41,11 @@
...
@@ -41,11 +41,11 @@
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
// -1 is tbname column index, so here use the -
3
as the initial value
// -1 is tbname column index, so here use the -
2
as the initial value
#define COLUMN_INDEX_INITIAL_VAL (-
3
)
#define COLUMN_INDEX_INITIAL_VAL (-
2
)
#define COLUMN_INDEX_INITIALIZER \
#define COLUMN_INDEX_INITIALIZER \
{ COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
{ COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_
BLOCK_DIST
_COLUMN_INDEX))
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_
TBNAME
_COLUMN_INDEX))
#define TBNAME_LIST_SEP ","
#define TBNAME_LIST_SEP ","
typedef
struct
SColumnList
{
// todo refactor
typedef
struct
SColumnList
{
// todo refactor
...
@@ -1873,9 +1873,6 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
...
@@ -1873,9 +1873,6 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
if
(
index
.
columnIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
if
(
index
.
columnIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
SSchema
*
colSchema
=
tGetTbnameColumnSchema
();
SSchema
*
colSchema
=
tGetTbnameColumnSchema
();
tscAddFuncInSelectClause
(
pQueryInfo
,
startPos
,
TSDB_FUNC_TAGPRJ
,
&
index
,
colSchema
,
TSDB_COL_TAG
,
getNewResColId
(
pCmd
));
tscAddFuncInSelectClause
(
pQueryInfo
,
startPos
,
TSDB_FUNC_TAGPRJ
,
&
index
,
colSchema
,
TSDB_COL_TAG
,
getNewResColId
(
pCmd
));
}
else
if
(
index
.
columnIndex
==
TSDB_BLOCK_DIST_COLUMN_INDEX
)
{
SSchema
colSchema
=
tGetBlockDistColumnSchema
();
tscAddFuncInSelectClause
(
pQueryInfo
,
startPos
,
TSDB_FUNC_PRJ
,
&
index
,
&
colSchema
,
TSDB_COL_TAG
,
getNewResColId
(
pCmd
));
}
else
{
}
else
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
index
.
tableIndex
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
index
.
tableIndex
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
...
@@ -2487,7 +2484,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
...
@@ -2487,7 +2484,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
}
SColumnIndex
index
=
{.
tableIndex
=
0
,
.
columnIndex
=
TSDB_BLOCK_DIST_COLUMN_INDEX
,};
SColumnIndex
index
=
{.
tableIndex
=
0
,
.
columnIndex
=
0
,};
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
index
.
tableIndex
);
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
index
.
tableIndex
);
SSchema
s
=
{.
name
=
"block_dist"
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
SSchema
s
=
{.
name
=
"block_dist"
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
...
@@ -2495,10 +2492,16 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
...
@@ -2495,10 +2492,16 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int16_t
resType
=
0
;
int16_t
resType
=
0
;
int16_t
bytes
=
0
;
int16_t
bytes
=
0
;
getResultDataInfo
(
TSDB_DATA_TYPE_INT
,
4
,
TSDB_FUNC_BLKINFO
,
0
,
&
resType
,
&
bytes
,
&
inter
,
0
,
0
);
getResultDataInfo
(
TSDB_DATA_TYPE_INT
,
4
,
TSDB_FUNC_BLKINFO
,
0
,
&
resType
,
&
bytes
,
&
inter
,
0
,
0
);
s
.
bytes
=
bytes
;
s
.
bytes
=
bytes
;
s
.
type
=
(
uint8_t
)
resType
;
s
.
type
=
(
uint8_t
)
resType
;
SExprInfo
*
pExpr
=
tscAddFuncInSelectClause
(
pQueryInfo
,
0
,
TSDB_FUNC_BLKINFO
,
&
index
,
&
s
,
TSDB_COL_TAG
,
getNewResColId
(
pCmd
));
SExprInfo
*
pExpr
=
tscExprInsert
(
pQueryInfo
,
0
,
TSDB_FUNC_BLKINFO
,
&
index
,
resType
,
bytes
,
getNewResColId
(
pCmd
),
bytes
,
0
);
tstrncpy
(
pExpr
->
base
.
aliasName
,
s
.
name
,
sizeof
(
pExpr
->
base
.
aliasName
));
SColumnList
ids
=
createColumnList
(
1
,
index
.
tableIndex
,
index
.
columnIndex
);
insertResultField
(
pQueryInfo
,
0
,
&
ids
,
bytes
,
s
.
type
,
s
.
name
,
pExpr
);
pExpr
->
base
.
numOfParams
=
1
;
pExpr
->
base
.
numOfParams
=
1
;
pExpr
->
base
.
param
[
0
].
i64
=
pTableMetaInfo
->
pTableMeta
->
tableInfo
.
rowSize
;
pExpr
->
base
.
param
[
0
].
i64
=
pTableMetaInfo
->
pTableMeta
->
tableInfo
.
rowSize
;
pExpr
->
base
.
param
[
0
].
nType
=
TSDB_DATA_TYPE_BIGINT
;
pExpr
->
base
.
param
[
0
].
nType
=
TSDB_DATA_TYPE_BIGINT
;
...
@@ -2545,14 +2548,6 @@ static bool isTablenameToken(SStrToken* token) {
...
@@ -2545,14 +2548,6 @@ static bool isTablenameToken(SStrToken* token) {
return
(
strncasecmp
(
TSQL_TBNAME_L
,
tmpToken
.
z
,
tmpToken
.
n
)
==
0
&&
tmpToken
.
n
==
strlen
(
TSQL_TBNAME_L
));
return
(
strncasecmp
(
TSQL_TBNAME_L
,
tmpToken
.
z
,
tmpToken
.
n
)
==
0
&&
tmpToken
.
n
==
strlen
(
TSQL_TBNAME_L
));
}
}
static
bool
isTableBlockDistToken
(
SStrToken
*
token
)
{
SStrToken
tmpToken
=
*
token
;
SStrToken
tableToken
=
{
0
};
extractTableNameFromToken
(
&
tmpToken
,
&
tableToken
);
return
(
strncasecmp
(
TSQL_BLOCK_DIST
,
tmpToken
.
z
,
tmpToken
.
n
)
==
0
&&
tmpToken
.
n
==
strlen
(
TSQL_BLOCK_DIST_L
));
}
static
int16_t
doGetColumnIndex
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
,
SStrToken
*
pToken
)
{
static
int16_t
doGetColumnIndex
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
,
SStrToken
*
pToken
)
{
STableMeta
*
pTableMeta
=
tscGetMetaInfo
(
pQueryInfo
,
index
)
->
pTableMeta
;
STableMeta
*
pTableMeta
=
tscGetMetaInfo
(
pQueryInfo
,
index
)
->
pTableMeta
;
...
@@ -2582,8 +2577,6 @@ int32_t doGetColumnIndexByName(SSqlCmd* pCmd, SStrToken* pToken, SQueryInfo* pQu
...
@@ -2582,8 +2577,6 @@ int32_t doGetColumnIndexByName(SSqlCmd* pCmd, SStrToken* pToken, SQueryInfo* pQu
if
(
isTablenameToken
(
pToken
))
{
if
(
isTablenameToken
(
pToken
))
{
pIndex
->
columnIndex
=
TSDB_TBNAME_COLUMN_INDEX
;
pIndex
->
columnIndex
=
TSDB_TBNAME_COLUMN_INDEX
;
}
else
if
(
isTableBlockDistToken
(
pToken
))
{
pIndex
->
columnIndex
=
TSDB_BLOCK_DIST_COLUMN_INDEX
;
}
else
if
(
strncasecmp
(
pToken
->
z
,
DEFAULT_PRIMARY_TIMESTAMP_COL_NAME
,
pToken
->
n
)
==
0
)
{
}
else
if
(
strncasecmp
(
pToken
->
z
,
DEFAULT_PRIMARY_TIMESTAMP_COL_NAME
,
pToken
->
n
)
==
0
)
{
pIndex
->
columnIndex
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
;
pIndex
->
columnIndex
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
;
}
else
{
}
else
{
...
...
src/client/src/tscUtil.c
浏览文件 @
a3c11889
...
@@ -527,7 +527,7 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
...
@@ -527,7 +527,7 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
bool
isBlockDistQuery
(
SQueryInfo
*
pQueryInfo
)
{
bool
isBlockDistQuery
(
SQueryInfo
*
pQueryInfo
)
{
size_t
numOfExprs
=
tscNumOfExprs
(
pQueryInfo
);
size_t
numOfExprs
=
tscNumOfExprs
(
pQueryInfo
);
SExprInfo
*
pExpr
=
tscExprGet
(
pQueryInfo
,
0
);
SExprInfo
*
pExpr
=
tscExprGet
(
pQueryInfo
,
0
);
return
(
numOfExprs
==
1
&&
pExpr
->
base
.
colInfo
.
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
);
return
(
numOfExprs
==
1
&&
pExpr
->
base
.
functionId
==
TSDB_FUNC_BLKINFO
);
}
}
void
tscClearInterpInfo
(
SQueryInfo
*
pQueryInfo
)
{
void
tscClearInterpInfo
(
SQueryInfo
*
pQueryInfo
)
{
...
@@ -2048,16 +2048,14 @@ SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde
...
@@ -2048,16 +2048,14 @@ SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde
p
->
colInfo
.
colId
=
TSDB_TBNAME_COLUMN_INDEX
;
p
->
colInfo
.
colId
=
TSDB_TBNAME_COLUMN_INDEX
;
p
->
colBytes
=
s
->
bytes
;
p
->
colBytes
=
s
->
bytes
;
p
->
colType
=
s
->
type
;
p
->
colType
=
s
->
type
;
}
else
if
(
pColIndex
->
columnIndex
==
TSDB_BLOCK_DIST_COLUMN_INDEX
)
{
SSchema
s
=
tGetBlockDistColumnSchema
();
p
->
colInfo
.
colId
=
TSDB_BLOCK_DIST_COLUMN_INDEX
;
p
->
colBytes
=
s
.
bytes
;
p
->
colType
=
s
.
type
;
}
else
if
(
pColIndex
->
columnIndex
<=
TSDB_UD_COLUMN_INDEX
)
{
}
else
if
(
pColIndex
->
columnIndex
<=
TSDB_UD_COLUMN_INDEX
)
{
p
->
colInfo
.
colId
=
pColIndex
->
columnIndex
;
p
->
colInfo
.
colId
=
pColIndex
->
columnIndex
;
p
->
colBytes
=
size
;
p
->
colBytes
=
size
;
p
->
colType
=
type
;
p
->
colType
=
type
;
}
else
if
(
functionId
==
TSDB_FUNC_BLKINFO
)
{
p
->
colInfo
.
colId
=
pColIndex
->
columnIndex
;
p
->
colBytes
=
TSDB_MAX_BINARY_LEN
;
p
->
colType
=
TSDB_DATA_TYPE_BINARY
;
}
else
{
}
else
{
if
(
TSDB_COL_IS_TAG
(
colType
))
{
if
(
TSDB_COL_IS_TAG
(
colType
))
{
SSchema
*
pSchema
=
tscGetTableTagSchema
(
pTableMetaInfo
->
pTableMeta
);
SSchema
*
pSchema
=
tscGetTableTagSchema
(
pTableMetaInfo
->
pTableMeta
);
...
@@ -2553,7 +2551,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
...
@@ -2553,7 +2551,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
return
false
;
return
false
;
}
}
if
(
colId
==
TSDB_TBNAME_COLUMN_INDEX
||
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
||
(
colId
<=
TSDB_UD_COLUMN_INDEX
&&
numOfParams
==
2
))
{
if
(
colId
==
TSDB_TBNAME_COLUMN_INDEX
||
(
colId
<=
TSDB_UD_COLUMN_INDEX
&&
numOfParams
==
2
))
{
return
true
;
return
true
;
}
}
...
...
src/common/inc/tname.h
浏览文件 @
a3c11889
...
@@ -92,8 +92,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len);
...
@@ -92,8 +92,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len);
void
extractTableNameFromToken
(
SStrToken
*
pToken
,
SStrToken
*
pTable
);
void
extractTableNameFromToken
(
SStrToken
*
pToken
,
SStrToken
*
pTable
);
SSchema
tGetBlockDistColumnSchema
();
SSchema
tGetUserSpecifiedColumnSchema
(
tVariant
*
pVal
,
SStrToken
*
exprStr
,
const
char
*
name
);
SSchema
tGetUserSpecifiedColumnSchema
(
tVariant
*
pVal
,
SStrToken
*
exprStr
,
const
char
*
name
);
bool
tscValidateTableNameLength
(
size_t
len
);
bool
tscValidateTableNameLength
(
size_t
len
);
...
...
src/common/src/tname.c
浏览文件 @
a3c11889
...
@@ -33,15 +33,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len) {
...
@@ -33,15 +33,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len) {
return
strlen
(
prefix
);
return
strlen
(
prefix
);
}
}
SSchema
tGetBlockDistColumnSchema
()
{
SSchema
s
=
{
0
};
s
.
bytes
=
TSDB_MAX_BINARY_LEN
;;
s
.
type
=
TSDB_DATA_TYPE_BINARY
;
s
.
colId
=
TSDB_BLOCK_DIST_COLUMN_INDEX
;
tstrncpy
(
s
.
name
,
TSQL_BLOCK_DIST_L
,
TSDB_COL_NAME_LEN
);
return
s
;
}
SSchema
tGetUserSpecifiedColumnSchema
(
tVariant
*
pVal
,
SStrToken
*
exprStr
,
const
char
*
name
)
{
SSchema
tGetUserSpecifiedColumnSchema
(
tVariant
*
pVal
,
SStrToken
*
exprStr
,
const
char
*
name
)
{
SSchema
s
=
{
0
};
SSchema
s
=
{
0
};
...
...
src/inc/taosdef.h
浏览文件 @
a3c11889
...
@@ -244,7 +244,6 @@ do { \
...
@@ -244,7 +244,6 @@ do { \
#define TSDB_MAX_REPLICA 5
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
#define TSDB_RES_COL_ID (-5000)
...
...
src/inc/tsdb.h
浏览文件 @
a3c11889
...
@@ -215,7 +215,7 @@ typedef struct SDataBlockInfo {
...
@@ -215,7 +215,7 @@ typedef struct SDataBlockInfo {
}
SDataBlockInfo
;
}
SDataBlockInfo
;
typedef
struct
SFileBlockInfo
{
typedef
struct
SFileBlockInfo
{
int32_t
num
OfRows
;
int32_t
num
BlocksOfStep
;
}
SFileBlockInfo
;
}
SFileBlockInfo
;
typedef
struct
{
typedef
struct
{
...
@@ -229,11 +229,15 @@ typedef struct {
...
@@ -229,11 +229,15 @@ typedef struct {
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
}
STableGroupInfo
;
}
STableGroupInfo
;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef
struct
{
typedef
struct
{
uint16_t
rowSize
;
uint16_t
rowSize
;
uint16_t
numOfFiles
;
uint16_t
numOfFiles
;
uint32_t
numOfTables
;
uint32_t
numOfTables
;
uint64_t
totalSize
;
uint64_t
totalSize
;
uint64_t
totalRows
;
int32_t
maxRows
;
int32_t
minRows
;
int32_t
firstSeekTimeUs
;
int32_t
firstSeekTimeUs
;
uint32_t
numOfRowsInMemTable
;
uint32_t
numOfRowsInMemTable
;
SArray
*
dataBlockInfos
;
SArray
*
dataBlockInfos
;
...
...
src/kit/taosdemo/taosdemo.c
浏览文件 @
a3c11889
...
@@ -2910,8 +2910,8 @@ static void* createTable(void *sarg)
...
@@ -2910,8 +2910,8 @@ static void* createTable(void *sarg)
int
buff_len
;
int
buff_len
;
buff_len
=
BUFFER_SIZE
/
8
;
buff_len
=
BUFFER_SIZE
/
8
;
char
*
buffer
=
calloc
(
buff_len
,
1
);
pThreadInfo
->
buffer
=
calloc
(
buff_len
,
1
);
if
(
buffer
==
NULL
)
{
if
(
pThreadInfo
->
buffer
==
NULL
)
{
errorPrint
(
"%s() LN%d, Memory allocated failed!
\n
"
,
__func__
,
__LINE__
);
errorPrint
(
"%s() LN%d, Memory allocated failed!
\n
"
,
__func__
,
__LINE__
);
exit
(
-
1
);
exit
(
-
1
);
}
}
...
@@ -2926,7 +2926,7 @@ static void* createTable(void *sarg)
...
@@ -2926,7 +2926,7 @@ static void* createTable(void *sarg)
for
(
uint64_t
i
=
pThreadInfo
->
start_table_from
;
for
(
uint64_t
i
=
pThreadInfo
->
start_table_from
;
i
<=
pThreadInfo
->
end_table_to
;
i
++
)
{
i
<=
pThreadInfo
->
end_table_to
;
i
++
)
{
if
(
0
==
g_Dbs
.
use_metric
)
{
if
(
0
==
g_Dbs
.
use_metric
)
{
snprintf
(
buffer
,
buff_len
,
snprintf
(
pThreadInfo
->
buffer
,
buff_len
,
"create table if not exists %s.%s%"
PRIu64
" %s;"
,
"create table if not exists %s.%s%"
PRIu64
" %s;"
,
pThreadInfo
->
db_name
,
pThreadInfo
->
db_name
,
g_args
.
tb_prefix
,
i
,
g_args
.
tb_prefix
,
i
,
...
@@ -2935,13 +2935,13 @@ static void* createTable(void *sarg)
...
@@ -2935,13 +2935,13 @@ static void* createTable(void *sarg)
if
(
superTblInfo
==
NULL
)
{
if
(
superTblInfo
==
NULL
)
{
errorPrint
(
"%s() LN%d, use metric, but super table info is NULL
\n
"
,
errorPrint
(
"%s() LN%d, use metric, but super table info is NULL
\n
"
,
__func__
,
__LINE__
);
__func__
,
__LINE__
);
free
(
buffer
);
free
(
pThreadInfo
->
buffer
);
exit
(
-
1
);
exit
(
-
1
);
}
else
{
}
else
{
if
(
0
==
len
)
{
if
(
0
==
len
)
{
batchNum
=
0
;
batchNum
=
0
;
memset
(
buffer
,
0
,
buff_len
);
memset
(
pThreadInfo
->
buffer
,
0
,
buff_len
);
len
+=
snprintf
(
buffer
+
len
,
len
+=
snprintf
(
pThreadInfo
->
buffer
+
len
,
buff_len
-
len
,
"create table "
);
buff_len
-
len
,
"create table "
);
}
}
char
*
tagsValBuf
=
NULL
;
char
*
tagsValBuf
=
NULL
;
...
@@ -2953,10 +2953,10 @@ static void* createTable(void *sarg)
...
@@ -2953,10 +2953,10 @@ static void* createTable(void *sarg)
i
%
superTblInfo
->
tagSampleCount
);
i
%
superTblInfo
->
tagSampleCount
);
}
}
if
(
NULL
==
tagsValBuf
)
{
if
(
NULL
==
tagsValBuf
)
{
free
(
buffer
);
free
(
pThreadInfo
->
buffer
);
return
NULL
;
return
NULL
;
}
}
len
+=
snprintf
(
buffer
+
len
,
len
+=
snprintf
(
pThreadInfo
->
buffer
+
len
,
buff_len
-
len
,
buff_len
-
len
,
"if not exists %s.%s%"
PRIu64
" using %s.%s tags %s "
,
"if not exists %s.%s%"
PRIu64
" using %s.%s tags %s "
,
pThreadInfo
->
db_name
,
superTblInfo
->
childTblPrefix
,
pThreadInfo
->
db_name
,
superTblInfo
->
childTblPrefix
,
...
@@ -2973,9 +2973,10 @@ static void* createTable(void *sarg)
...
@@ -2973,9 +2973,10 @@ static void* createTable(void *sarg)
}
}
len
=
0
;
len
=
0
;
if
(
0
!=
queryDbExec
(
pThreadInfo
->
taos
,
buffer
,
NO_INSERT_TYPE
,
false
)){
if
(
0
!=
queryDbExec
(
pThreadInfo
->
taos
,
pThreadInfo
->
buffer
,
errorPrint
(
"queryDbExec() failed. buffer:
\n
%s
\n
"
,
buffer
);
NO_INSERT_TYPE
,
false
)){
free
(
buffer
);
errorPrint
(
"queryDbExec() failed. buffer:
\n
%s
\n
"
,
pThreadInfo
->
buffer
);
free
(
pThreadInfo
->
buffer
);
return
NULL
;
return
NULL
;
}
}
...
@@ -2988,12 +2989,13 @@ static void* createTable(void *sarg)
...
@@ -2988,12 +2989,13 @@ static void* createTable(void *sarg)
}
}
if
(
0
!=
len
)
{
if
(
0
!=
len
)
{
if
(
0
!=
queryDbExec
(
pThreadInfo
->
taos
,
buffer
,
NO_INSERT_TYPE
,
false
))
{
if
(
0
!=
queryDbExec
(
pThreadInfo
->
taos
,
pThreadInfo
->
buffer
,
errorPrint
(
"queryDbExec() failed. buffer:
\n
%s
\n
"
,
buffer
);
NO_INSERT_TYPE
,
false
))
{
errorPrint
(
"queryDbExec() failed. buffer:
\n
%s
\n
"
,
pThreadInfo
->
buffer
);
}
}
}
}
free
(
buffer
);
free
(
pThreadInfo
->
buffer
);
return
NULL
;
return
NULL
;
}
}
...
@@ -4399,8 +4401,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
...
@@ -4399,8 +4401,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON
*
resubAfterConsume
=
cJSON
*
resubAfterConsume
=
cJSON_GetObjectItem
(
specifiedQuery
,
"resubAfterConsume"
);
cJSON_GetObjectItem
(
specifiedQuery
,
"resubAfterConsume"
);
if
(
resubAfterConsume
if
((
resubAfterConsume
)
&&
resubAfterConsume
->
type
==
cJSON_Number
)
{
&&
(
resubAfterConsume
->
type
==
cJSON_Number
)
&&
(
resubAfterConsume
->
valueint
>=
0
))
{
g_queryInfo
.
specifiedQueryInfo
.
resubAfterConsume
[
j
]
g_queryInfo
.
specifiedQueryInfo
.
resubAfterConsume
[
j
]
=
resubAfterConsume
->
valueint
;
=
resubAfterConsume
->
valueint
;
}
else
if
(
!
resubAfterConsume
)
{
}
else
if
(
!
resubAfterConsume
)
{
...
@@ -4561,14 +4564,15 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
...
@@ -4561,14 +4564,15 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
}
}
cJSON
*
superResubAfterConsume
=
cJSON
*
superResubAfterConsume
=
cJSON_GetObjectItem
(
superQuery
,
"endAfterConsume"
);
cJSON_GetObjectItem
(
superQuery
,
"resubAfterConsume"
);
if
(
superResubAfterConsume
if
((
superResubAfterConsume
)
&&
superResubAfterConsume
->
type
==
cJSON_Number
)
{
&&
(
superResubAfterConsume
->
type
==
cJSON_Number
)
g_queryInfo
.
superQueryInfo
.
endAfterConsume
=
&&
(
superResubAfterConsume
->
valueint
>=
0
))
{
g_queryInfo
.
superQueryInfo
.
resubAfterConsume
=
superResubAfterConsume
->
valueint
;
superResubAfterConsume
->
valueint
;
}
else
if
(
!
superResubAfterConsume
)
{
}
else
if
(
!
superResubAfterConsume
)
{
// default value is -1, which mean do not resub
// default value is -1, which mean do not resub
g_queryInfo
.
superQueryInfo
.
end
AfterConsume
=
-
1
;
g_queryInfo
.
superQueryInfo
.
resub
AfterConsume
=
-
1
;
}
}
// supert table sqls
// supert table sqls
...
@@ -4932,7 +4936,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
...
@@ -4932,7 +4936,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
case
REST_IFACE
:
case
REST_IFACE
:
if
(
0
!=
postProceSql
(
g_Dbs
.
host
,
&
g_Dbs
.
serv_addr
,
g_Dbs
.
port
,
if
(
0
!=
postProceSql
(
g_Dbs
.
host
,
&
g_Dbs
.
serv_addr
,
g_Dbs
.
port
,
pThreadInfo
->
buffer
,
NULL
/* not set result file */
))
{
pThreadInfo
->
buffer
,
pThreadInfo
))
{
affectedRows
=
-
1
;
affectedRows
=
-
1
;
printf
(
"========restful return fail, threadID[%d]
\n
"
,
printf
(
"========restful return fail, threadID[%d]
\n
"
,
pThreadInfo
->
threadID
);
pThreadInfo
->
threadID
);
...
...
src/query/src/qAggMain.c
浏览文件 @
a3c11889
...
@@ -19,6 +19,7 @@
...
@@ -19,6 +19,7 @@
#include "texpr.h"
#include "texpr.h"
#include "ttype.h"
#include "ttype.h"
#include "tsdb.h"
#include "tsdb.h"
#include "tglobal.h"
#include "qAggMain.h"
#include "qAggMain.h"
#include "qFill.h"
#include "qFill.h"
...
@@ -4828,51 +4829,81 @@ void blockInfo_func(SQLFunctionCtx* pCtx) {
...
@@ -4828,51 +4829,81 @@ void blockInfo_func(SQLFunctionCtx* pCtx) {
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
}
static
void
mergeTableBlockDist
(
STableBlockDist
*
pDist
,
const
STableBlockDist
*
pSrc
)
{
static
void
mergeTableBlockDist
(
SResultRowCellInfo
*
pResInfo
,
const
STableBlockDist
*
pSrc
)
{
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
assert
(
pDist
!=
NULL
&&
pSrc
!=
NULL
);
assert
(
pDist
!=
NULL
&&
pSrc
!=
NULL
);
pDist
->
numOfTables
+=
pSrc
->
numOfTables
;
pDist
->
numOfTables
+=
pSrc
->
numOfTables
;
pDist
->
numOfRowsInMemTable
+=
pSrc
->
numOfRowsInMemTable
;
pDist
->
numOfRowsInMemTable
+=
pSrc
->
numOfRowsInMemTable
;
pDist
->
numOfFiles
+=
pSrc
->
numOfFiles
;
pDist
->
numOfFiles
+=
pSrc
->
numOfFiles
;
pDist
->
totalSize
+=
pSrc
->
totalSize
;
pDist
->
totalSize
+=
pSrc
->
totalSize
;
pDist
->
totalRows
+=
pSrc
->
totalRows
;
if
(
pDist
->
dataBlockInfos
==
NULL
)
{
if
(
pResInfo
->
hasResult
==
DATA_SET_FLAG
)
{
pDist
->
dataBlockInfos
=
taosArrayInit
(
4
,
sizeof
(
SFileBlockInfo
));
pDist
->
maxRows
=
MAX
(
pDist
->
maxRows
,
pSrc
->
maxRows
);
pDist
->
minRows
=
MIN
(
pDist
->
minRows
,
pSrc
->
minRows
);
}
else
{
pDist
->
maxRows
=
pSrc
->
maxRows
;
pDist
->
minRows
=
pSrc
->
minRows
;
int32_t
numSteps
=
tsMaxRowsInFileBlock
/
TSDB_BLOCK_DIST_STEP_ROWS
;
pDist
->
dataBlockInfos
=
taosArrayInit
(
numSteps
,
sizeof
(
SFileBlockInfo
));
taosArraySetSize
(
pDist
->
dataBlockInfos
,
numSteps
);
}
}
taosArrayAddBatch
(
pDist
->
dataBlockInfos
,
pSrc
->
dataBlockInfos
->
pData
,
(
int32_t
)
taosArrayGetSize
(
pSrc
->
dataBlockInfos
));
size_t
steps
=
taosArrayGetSize
(
pDist
->
dataBlockInfos
);
for
(
int32_t
i
=
0
;
i
<
steps
;
++
i
)
{
int32_t
srcNumBlocks
=
((
SFileBlockInfo
*
)
taosArrayGet
(
pSrc
->
dataBlockInfos
,
i
))
->
numBlocksOfStep
;
SFileBlockInfo
*
blockInfo
=
(
SFileBlockInfo
*
)
taosArrayGet
(
pDist
->
dataBlockInfos
,
i
);
blockInfo
->
numBlocksOfStep
+=
srcNumBlocks
;
}
}
}
void
block_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
void
block_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
STableBlockDist
info
=
{
0
};
STableBlockDist
info
=
{
0
};
int32_t
len
=
*
(
int32_t
*
)
pCtx
->
pInput
;
int32_t
len
=
*
(
int32_t
*
)
pCtx
->
pInput
;
blockDistInfoFromBinary
(((
char
*
)
pCtx
->
pInput
)
+
sizeof
(
int32_t
),
len
,
&
info
);
blockDistInfoFromBinary
(((
char
*
)
pCtx
->
pInput
)
+
sizeof
(
int32_t
),
len
,
&
info
);
mergeTableBlockDist
(
pDist
,
&
info
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
mergeTableBlockDist
(
pResInfo
,
&
info
);
pResInfo
->
numOfRes
=
1
;
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
}
static
int32_t
doGetPercentile
(
const
SArray
*
pArray
,
double
rate
)
{
void
getPercentiles
(
STableBlockDist
*
pTableBlockDist
,
int64_t
totalBlocks
,
int32_t
numOfPercents
,
int32_t
len
=
(
int32_t
)
taosArrayGetSize
(
pArray
);
double
*
percents
,
int32_t
*
percentiles
)
{
if
(
len
<=
0
)
{
if
(
totalBlocks
==
0
)
{
return
0
;
for
(
int32_t
i
=
0
;
i
<
numOfPercents
;
++
i
)
{
percentiles
[
i
]
=
0
;
}
return
;
}
}
assert
(
rate
>=
0
&&
rate
<=
1
.
0
);
SArray
*
blocksInfos
=
pTableBlockDist
->
dataBlockInfos
;
int
idx
=
(
int32_t
)((
len
-
1
)
*
rate
);
size_t
numSteps
=
taosArrayGetSize
(
blocksInfos
);
size_t
cumulativeBlocks
=
0
;
return
((
SFileBlockInfo
*
)(
taosArrayGet
(
pArray
,
idx
)))
->
numOfRows
;
int
percentIndex
=
0
;
}
for
(
int32_t
indexStep
=
0
;
indexStep
<
numSteps
;
++
indexStep
)
{
int32_t
numStepBlocks
=
((
SFileBlockInfo
*
)
taosArrayGet
(
blocksInfos
,
indexStep
))
->
numBlocksOfStep
;
if
(
numStepBlocks
==
0
)
continue
;
cumulativeBlocks
+=
numStepBlocks
;
static
int
compareBlockInfo
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
while
(
percentIndex
<
numOfPercents
)
{
int32_t
left
=
((
SFileBlockInfo
*
)
pLeft
)
->
numOfRows
;
double
blockRank
=
totalBlocks
*
percents
[
percentIndex
];
int32_t
right
=
((
SFileBlockInfo
*
)
pRight
)
->
numOfRows
;
if
(
blockRank
<=
cumulativeBlocks
)
{
percentiles
[
percentIndex
]
=
indexStep
;
++
percentIndex
;
}
else
{
break
;
}
}
}
if
(
left
>
right
)
return
1
;
for
(
int32_t
i
=
0
;
i
<
numOfPercents
;
++
i
)
{
if
(
left
<
right
)
return
-
1
;
percentiles
[
i
]
=
(
percentiles
[
i
]
+
1
)
*
TSDB_BLOCK_DIST_STEP_ROWS
-
TSDB_BLOCK_DIST_STEP_ROWS
/
2
;
return
0
;
}
}
}
void
generateBlockDistResult
(
STableBlockDist
*
pTableBlockDist
,
char
*
result
)
{
void
generateBlockDistResult
(
STableBlockDist
*
pTableBlockDist
,
char
*
result
)
{
...
@@ -4880,40 +4911,41 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
...
@@ -4880,40 +4911,41 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
return
;
return
;
}
}
int64_t
min
=
INT64_MAX
,
max
=
INT64_MIN
,
avg
=
0
;
SArray
*
blockInfos
=
pTableBlockDist
->
dataBlockInfos
;
SArray
*
blockInfos
=
pTableBlockDist
->
dataBlockInfos
;
uint64_t
totalRows
=
pTableBlockDist
->
totalRows
;
int64_t
totalRows
=
0
,
totalBlocks
=
taosArrayGetSize
(
blockInfos
);
size_t
numSteps
=
taosArrayGetSize
(
blockInfos
);
int64_t
totalBlocks
=
0
;
int64_t
min
=
-
1
,
max
=
-
1
,
avg
=
0
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
blockInfos
)
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
numSteps
;
i
++
)
{
SFileBlockInfo
*
blockInfo
=
taosArrayGet
(
blockInfos
,
i
);
SFileBlockInfo
*
blockInfo
=
taosArrayGet
(
blockInfos
,
i
);
int64_t
rows
=
blockInfo
->
numOfRows
;
int64_t
blocks
=
blockInfo
->
numBlocksOfStep
;
totalBlocks
+=
blocks
;
min
=
MIN
(
min
,
rows
);
max
=
MAX
(
max
,
rows
);
totalRows
+=
rows
;
}
}
avg
=
totalBlocks
>
0
?
(
int64_t
)(
totalRows
/
totalBlocks
)
:
0
;
avg
=
totalBlocks
>
0
?
(
int64_t
)(
totalRows
/
totalBlocks
)
:
0
;
taosArraySort
(
blockInfos
,
compareBlockInfo
);
min
=
totalBlocks
>
0
?
pTableBlockDist
->
minRows
:
0
;
max
=
totalBlocks
>
0
?
pTableBlockDist
->
maxRows
:
0
;
double
percents
[]
=
{
0
.
05
,
0
.
10
,
0
.
20
,
0
.
30
,
0
.
40
,
0
.
50
,
0
.
60
,
0
.
70
,
0
.
80
,
0
.
90
,
0
.
95
,
0
.
99
};
int32_t
percentiles
[]
=
{
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
};
assert
(
sizeof
(
percents
)
/
sizeof
(
double
)
==
sizeof
(
percentiles
)
/
sizeof
(
int32_t
));
getPercentiles
(
pTableBlockDist
,
totalBlocks
,
sizeof
(
percents
)
/
sizeof
(
double
),
percents
,
percentiles
);
uint64_t
totalLen
=
pTableBlockDist
->
totalSize
;
uint64_t
totalLen
=
pTableBlockDist
->
totalSize
;
int32_t
rowSize
=
pTableBlockDist
->
rowSize
;
int32_t
rowSize
=
pTableBlockDist
->
rowSize
;
double
compRatio
=
(
totalRows
>
0
)
?
((
double
)(
totalLen
)
/
(
rowSize
*
totalRows
))
:
1
;
int
sz
=
sprintf
(
result
+
VARSTR_HEADER_SIZE
,
int
sz
=
sprintf
(
result
+
VARSTR_HEADER_SIZE
,
"summary:
\n\t
"
"summary:
\n\t
"
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]
\n\t
"
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]
\n\t
"
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]
\n\t
"
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]
\n\t
"
"Min=[%"
PRId64
"(Rows)] Max=[%"
PRId64
"(Rows)] Avg=[%"
PRId64
"(Rows)] Stddev=[%.2f]
\n\t
"
"Min=[%"
PRId64
"(Rows)] Max=[%"
PRId64
"(Rows)] Avg=[%"
PRId64
"(Rows)] Stddev=[%.2f]
\n\t
"
"Rows=[%"
PRI
d64
"], Blocks=[%"
PRId64
"], Size=[%.3f(Kb)] Comp=[%.2f%%
]
\n\t
"
"Rows=[%"
PRI
u64
"], Blocks=[%"
PRId64
"], Size=[%.3f(Kb)] Comp=[%.2f
]
\n\t
"
"RowsInMem=[%d]
\n\t
SeekHeaderTime=[%d(us)]"
,
"RowsInMem=[%d]
\n\t
SeekHeaderTime=[%d(us)]"
,
doGetPercentile
(
blockInfos
,
0
.
05
),
doGetPercentile
(
blockInfos
,
0
.
10
),
percentiles
[
0
],
percentiles
[
1
],
percentiles
[
2
],
percentiles
[
3
],
percentiles
[
4
],
percentiles
[
5
],
doGetPercentile
(
blockInfos
,
0
.
20
),
doGetPercentile
(
blockInfos
,
0
.
30
),
percentiles
[
6
],
percentiles
[
7
],
percentiles
[
8
],
percentiles
[
9
],
percentiles
[
10
],
percentiles
[
11
],
doGetPercentile
(
blockInfos
,
0
.
40
),
doGetPercentile
(
blockInfos
,
0
.
50
),
doGetPercentile
(
blockInfos
,
0
.
60
),
doGetPercentile
(
blockInfos
,
0
.
70
),
doGetPercentile
(
blockInfos
,
0
.
80
),
doGetPercentile
(
blockInfos
,
0
.
90
),
doGetPercentile
(
blockInfos
,
0
.
95
),
doGetPercentile
(
blockInfos
,
0
.
99
),
min
,
max
,
avg
,
0
.
0
,
min
,
max
,
avg
,
0
.
0
,
totalRows
,
totalBlocks
,
totalLen
/
1024
.
0
,
(
double
)(
totalLen
*
100
.
0
)
/
(
rowSize
*
totalRows
)
,
totalRows
,
totalBlocks
,
totalLen
/
1024
.
0
,
compRatio
,
pTableBlockDist
->
numOfRowsInMemTable
,
pTableBlockDist
->
firstSeekTimeUs
);
pTableBlockDist
->
numOfRowsInMemTable
,
pTableBlockDist
->
firstSeekTimeUs
);
varDataSetLen
(
result
,
sz
);
varDataSetLen
(
result
,
sz
);
UNUSED
(
sz
);
UNUSED
(
sz
);
...
...
src/query/src/qExecutor.c
浏览文件 @
a3c11889
...
@@ -935,7 +935,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
...
@@ -935,7 +935,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
setArithParams
((
SArithmeticSupport
*
)
pCtx
[
i
].
param
[
1
].
pz
,
&
pOperator
->
pExpr
[
i
],
pBlock
);
setArithParams
((
SArithmeticSupport
*
)
pCtx
[
i
].
param
[
1
].
pz
,
&
pOperator
->
pExpr
[
i
],
pBlock
);
}
else
{
}
else
{
SColIndex
*
pCol
=
&
pOperator
->
pExpr
[
i
].
base
.
colInfo
;
SColIndex
*
pCol
=
&
pOperator
->
pExpr
[
i
].
base
.
colInfo
;
if
(
TSDB_COL_IS_NORMAL_COL
(
pCol
->
flag
)
||
(
pC
ol
->
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
)
||
if
(
TSDB_COL_IS_NORMAL_COL
(
pCol
->
flag
)
||
(
pC
tx
[
i
].
functionId
==
TSDB_FUNC_BLKINFO
)
||
(
TSDB_COL_IS_TAG
(
pCol
->
flag
)
&&
pOperator
->
pRuntimeEnv
->
scanFlag
==
MERGE_STAGE
))
{
(
TSDB_COL_IS_TAG
(
pCol
->
flag
)
&&
pOperator
->
pRuntimeEnv
->
scanFlag
==
MERGE_STAGE
))
{
SColIndex
*
pColIndex
=
&
pOperator
->
pExpr
[
i
].
base
.
colInfo
;
SColIndex
*
pColIndex
=
&
pOperator
->
pExpr
[
i
].
base
.
colInfo
;
SColumnInfoData
*
p
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pColIndex
->
colIndex
);
SColumnInfoData
*
p
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pColIndex
->
colIndex
);
...
@@ -3771,7 +3771,7 @@ void queryCostStatis(SQInfo *pQInfo) {
...
@@ -3771,7 +3771,7 @@ void queryCostStatis(SQInfo *pQInfo) {
//
//
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//
//
// qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", num
OfRows
:%d, numOfRes:%d, lastKey:%"PRId64, GET_QID(pRuntimeEnv),
// qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", num
BlocksOfStep
:%d, numOfRes:%d, lastKey:%"PRId64, GET_QID(pRuntimeEnv),
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
//}
//}
...
@@ -4347,7 +4347,12 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
...
@@ -4347,7 +4347,12 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
STableBlockDist
tableBlockDist
=
{
0
};
STableBlockDist
tableBlockDist
=
{
0
};
tableBlockDist
.
numOfTables
=
(
int32_t
)
pOperator
->
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
;
tableBlockDist
.
numOfTables
=
(
int32_t
)
pOperator
->
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
;
tableBlockDist
.
dataBlockInfos
=
taosArrayInit
(
512
,
sizeof
(
SFileBlockInfo
));
int32_t
numRowSteps
=
tsMaxRowsInFileBlock
/
TSDB_BLOCK_DIST_STEP_ROWS
;
tableBlockDist
.
dataBlockInfos
=
taosArrayInit
(
numRowSteps
,
sizeof
(
SFileBlockInfo
));
taosArraySetSize
(
tableBlockDist
.
dataBlockInfos
,
numRowSteps
);
tableBlockDist
.
maxRows
=
INT_MIN
;
tableBlockDist
.
minRows
=
INT_MAX
;
tsdbGetFileBlocksDistInfo
(
pTableScanInfo
->
pQueryHandle
,
&
tableBlockDist
);
tsdbGetFileBlocksDistInfo
(
pTableScanInfo
->
pQueryHandle
,
&
tableBlockDist
);
tableBlockDist
.
numOfRowsInMemTable
=
(
int32_t
)
tsdbGetNumOfRowsInMemTable
(
pTableScanInfo
->
pQueryHandle
);
tableBlockDist
.
numOfRowsInMemTable
=
(
int32_t
)
tsdbGetNumOfRowsInMemTable
(
pTableScanInfo
->
pQueryHandle
);
...
@@ -4429,7 +4434,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
...
@@ -4429,7 +4434,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
SColumnInfoData
infoData
=
{{
0
}};
SColumnInfoData
infoData
=
{{
0
}};
infoData
.
info
.
type
=
TSDB_DATA_TYPE_BINARY
;
infoData
.
info
.
type
=
TSDB_DATA_TYPE_BINARY
;
infoData
.
info
.
bytes
=
1024
;
infoData
.
info
.
bytes
=
1024
;
infoData
.
info
.
colId
=
TSDB_BLOCK_DIST_COLUMN_INDEX
;
infoData
.
info
.
colId
=
0
;
taosArrayPush
(
pInfo
->
block
.
pDataBlock
,
&
infoData
);
taosArrayPush
(
pInfo
->
block
.
pDataBlock
,
&
infoData
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
...
@@ -5958,10 +5963,7 @@ static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *p
...
@@ -5958,10 +5963,7 @@ static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *p
if
(
TSDB_COL_IS_TAG
(
pExpr
->
colInfo
.
flag
))
{
if
(
TSDB_COL_IS_TAG
(
pExpr
->
colInfo
.
flag
))
{
if
(
pExpr
->
colInfo
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
if
(
pExpr
->
colInfo
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
return
TSDB_TBNAME_COLUMN_INDEX
;
return
TSDB_TBNAME_COLUMN_INDEX
;
}
else
if
(
pExpr
->
colInfo
.
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
)
{
return
TSDB_BLOCK_DIST_COLUMN_INDEX
;
}
}
while
(
j
<
pTableInfo
->
numOfTags
)
{
while
(
j
<
pTableInfo
->
numOfTags
)
{
if
(
pExpr
->
colInfo
.
colId
==
pTagCols
[
j
].
colId
)
{
if
(
pExpr
->
colInfo
.
colId
==
pTagCols
[
j
].
colId
)
{
...
@@ -6531,14 +6533,14 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
...
@@ -6531,14 +6533,14 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
type
=
TSDB_DATA_TYPE_DOUBLE
;
type
=
TSDB_DATA_TYPE_DOUBLE
;
bytes
=
tDataTypes
[
type
].
bytes
;
bytes
=
tDataTypes
[
type
].
bytes
;
}
else
if
(
pExprs
[
i
].
base
.
functionId
==
TSDB_FUNC_BLKINFO
)
{
SSchema
s
=
{.
type
=
TSDB_DATA_TYPE_BINARY
,
.
bytes
=
TSDB_MAX_BINARY_LEN
};
type
=
s
.
type
;
bytes
=
s
.
bytes
;
}
else
if
(
pExprs
[
i
].
base
.
colInfo
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
&&
pExprs
[
i
].
base
.
functionId
==
TSDB_FUNC_TAGPRJ
)
{
// parse the normal column
}
else
if
(
pExprs
[
i
].
base
.
colInfo
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
&&
pExprs
[
i
].
base
.
functionId
==
TSDB_FUNC_TAGPRJ
)
{
// parse the normal column
SSchema
*
s
=
tGetTbnameColumnSchema
();
SSchema
*
s
=
tGetTbnameColumnSchema
();
type
=
s
->
type
;
type
=
s
->
type
;
bytes
=
s
->
bytes
;
bytes
=
s
->
bytes
;
}
else
if
(
pExprs
[
i
].
base
.
colInfo
.
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
)
{
SSchema
s
=
tGetBlockDistColumnSchema
();
type
=
s
.
type
;
bytes
=
s
.
bytes
;
}
else
if
(
pExprs
[
i
].
base
.
colInfo
.
colId
<=
TSDB_UD_COLUMN_INDEX
&&
pExprs
[
i
].
base
.
colInfo
.
colId
>
TSDB_RES_COL_ID
)
{
}
else
if
(
pExprs
[
i
].
base
.
colInfo
.
colId
<=
TSDB_UD_COLUMN_INDEX
&&
pExprs
[
i
].
base
.
colInfo
.
colId
>
TSDB_RES_COL_ID
)
{
// it is a user-defined constant value column
// it is a user-defined constant value column
assert
(
pExprs
[
i
].
base
.
functionId
==
TSDB_FUNC_PRJ
);
assert
(
pExprs
[
i
].
base
.
functionId
==
TSDB_FUNC_PRJ
);
...
@@ -6551,7 +6553,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
...
@@ -6551,7 +6553,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
}
else
{
}
else
{
int32_t
j
=
getColumnIndexInSource
(
pTableInfo
,
&
pExprs
[
i
].
base
,
pTagCols
);
int32_t
j
=
getColumnIndexInSource
(
pTableInfo
,
&
pExprs
[
i
].
base
,
pTagCols
);
if
(
TSDB_COL_IS_TAG
(
pExprs
[
i
].
base
.
colInfo
.
flag
))
{
if
(
TSDB_COL_IS_TAG
(
pExprs
[
i
].
base
.
colInfo
.
flag
))
{
if
(
j
<
TSDB_
BLOCK_DIST
_COLUMN_INDEX
||
j
>=
pTableInfo
->
numOfTags
)
{
if
(
j
<
TSDB_
TBNAME
_COLUMN_INDEX
||
j
>=
pTableInfo
->
numOfTags
)
{
return
TSDB_CODE_QRY_INVALID_MSG
;
return
TSDB_CODE_QRY_INVALID_MSG
;
}
}
}
else
{
}
else
{
...
@@ -6787,9 +6789,6 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
...
@@ -6787,9 +6789,6 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
assert
(
f
<
pQueryAttr
->
numOfCols
);
assert
(
f
<
pQueryAttr
->
numOfCols
);
}
else
if
(
pColIndex
->
colId
<=
TSDB_UD_COLUMN_INDEX
)
{
}
else
if
(
pColIndex
->
colId
<=
TSDB_UD_COLUMN_INDEX
)
{
// do nothing for user-defined constant value result columns
// do nothing for user-defined constant value result columns
}
else
if
(
pColIndex
->
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
)
{
pColIndex
->
colIndex
=
0
;
// only one source column, so it must be 0;
assert
(
pQueryAttr
->
numOfOutput
==
1
);
}
else
{
}
else
{
int32_t
f
=
0
;
int32_t
f
=
0
;
for
(
f
=
0
;
f
<
pQueryAttr
->
numOfTags
;
++
f
)
{
for
(
f
=
0
;
f
<
pQueryAttr
->
numOfTags
;
++
f
)
{
...
@@ -6799,7 +6798,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
...
@@ -6799,7 +6798,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
}
}
}
}
assert
(
f
<
pQueryAttr
->
numOfTags
||
pColIndex
->
colId
==
TSDB_TBNAME_COLUMN_INDEX
||
pColIndex
->
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
);
assert
(
f
<
pQueryAttr
->
numOfTags
||
pColIndex
->
colId
==
TSDB_TBNAME_COLUMN_INDEX
);
}
}
}
}
}
}
...
@@ -6991,7 +6990,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
...
@@ -6991,7 +6990,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
colIdCheck
(
pQueryAttr
,
pQInfo
->
qId
);
colIdCheck
(
pQueryAttr
,
pQInfo
->
qId
);
// todo refactor
// todo refactor
pQInfo
->
query
.
queryBlockDist
=
(
numOfOutput
==
1
&&
pExprs
[
0
].
base
.
colInfo
.
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
);
pQInfo
->
query
.
queryBlockDist
=
(
numOfOutput
==
1
&&
pExprs
[
0
].
base
.
functionId
==
TSDB_FUNC_BLKINFO
);
qDebug
(
"qmsg:%p QInfo:0x%"
PRIx64
"-%p created"
,
pQueryMsg
,
pQInfo
->
qId
,
pQInfo
);
qDebug
(
"qmsg:%p QInfo:0x%"
PRIx64
"-%p created"
,
pQueryMsg
,
pQInfo
->
qId
,
pQInfo
);
return
pQInfo
;
return
pQInfo
;
...
...
src/query/src/qUtil.c
浏览文件 @
a3c11889
...
@@ -581,6 +581,9 @@ void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
...
@@ -581,6 +581,9 @@ void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
tbufWriteUint32
(
bw
,
pDist
->
numOfTables
);
tbufWriteUint32
(
bw
,
pDist
->
numOfTables
);
tbufWriteUint16
(
bw
,
pDist
->
numOfFiles
);
tbufWriteUint16
(
bw
,
pDist
->
numOfFiles
);
tbufWriteUint64
(
bw
,
pDist
->
totalSize
);
tbufWriteUint64
(
bw
,
pDist
->
totalSize
);
tbufWriteUint64
(
bw
,
pDist
->
totalRows
);
tbufWriteInt32
(
bw
,
pDist
->
maxRows
);
tbufWriteInt32
(
bw
,
pDist
->
minRows
);
tbufWriteUint32
(
bw
,
pDist
->
numOfRowsInMemTable
);
tbufWriteUint32
(
bw
,
pDist
->
numOfRowsInMemTable
);
tbufWriteUint64
(
bw
,
taosArrayGetSize
(
pDist
->
dataBlockInfos
));
tbufWriteUint64
(
bw
,
taosArrayGetSize
(
pDist
->
dataBlockInfos
));
...
@@ -616,13 +619,16 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
...
@@ -616,13 +619,16 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
pDist
->
numOfTables
=
tbufReadUint32
(
&
br
);
pDist
->
numOfTables
=
tbufReadUint32
(
&
br
);
pDist
->
numOfFiles
=
tbufReadUint16
(
&
br
);
pDist
->
numOfFiles
=
tbufReadUint16
(
&
br
);
pDist
->
totalSize
=
tbufReadUint64
(
&
br
);
pDist
->
totalSize
=
tbufReadUint64
(
&
br
);
pDist
->
totalRows
=
tbufReadUint64
(
&
br
);
pDist
->
maxRows
=
tbufReadInt32
(
&
br
);
pDist
->
minRows
=
tbufReadInt32
(
&
br
);
pDist
->
numOfRowsInMemTable
=
tbufReadUint32
(
&
br
);
pDist
->
numOfRowsInMemTable
=
tbufReadUint32
(
&
br
);
int64_t
num
OfBlock
s
=
tbufReadUint64
(
&
br
);
int64_t
num
Step
s
=
tbufReadUint64
(
&
br
);
bool
comp
=
tbufReadUint8
(
&
br
);
bool
comp
=
tbufReadUint8
(
&
br
);
uint32_t
compLen
=
tbufReadUint32
(
&
br
);
uint32_t
compLen
=
tbufReadUint32
(
&
br
);
size_t
originalLen
=
(
size_t
)
(
num
OfBlocks
*
sizeof
(
SFileBlockInfo
));
size_t
originalLen
=
(
size_t
)
(
num
Steps
*
sizeof
(
SFileBlockInfo
));
char
*
outputBuf
=
NULL
;
char
*
outputBuf
=
NULL
;
if
(
comp
)
{
if
(
comp
)
{
...
@@ -633,12 +639,12 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
...
@@ -633,12 +639,12 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
int32_t
orignalLen
=
tsDecompressString
(
compStr
,
compLen
,
1
,
outputBuf
,
int32_t
orignalLen
=
tsDecompressString
(
compStr
,
compLen
,
1
,
outputBuf
,
(
int32_t
)
originalLen
,
ONE_STAGE_COMP
,
NULL
,
0
);
(
int32_t
)
originalLen
,
ONE_STAGE_COMP
,
NULL
,
0
);
assert
(
orignalLen
==
num
OfBlocks
*
sizeof
(
SFileBlockInfo
));
assert
(
orignalLen
==
num
Steps
*
sizeof
(
SFileBlockInfo
));
}
else
{
}
else
{
outputBuf
=
(
char
*
)
tbufReadBinary
(
&
br
,
&
originalLen
);
outputBuf
=
(
char
*
)
tbufReadBinary
(
&
br
,
&
originalLen
);
}
}
pDist
->
dataBlockInfos
=
taosArrayFromList
(
outputBuf
,
(
uint32_t
)
numOfBlock
s
,
sizeof
(
SFileBlockInfo
));
pDist
->
dataBlockInfos
=
taosArrayFromList
(
outputBuf
,
(
uint32_t
)
numStep
s
,
sizeof
(
SFileBlockInfo
));
if
(
comp
)
{
if
(
comp
)
{
tfree
(
outputBuf
);
tfree
(
outputBuf
);
}
}
...
...
src/tsdb/inc/tsdbCommit.h
浏览文件 @
a3c11889
...
@@ -33,6 +33,7 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
...
@@ -33,6 +33,7 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
int
tsdbEncodeKVRecord
(
void
**
buf
,
SKVRecord
*
pRecord
);
int
tsdbEncodeKVRecord
(
void
**
buf
,
SKVRecord
*
pRecord
);
void
*
tsdbDecodeKVRecord
(
void
*
buf
,
SKVRecord
*
pRecord
);
void
*
tsdbDecodeKVRecord
(
void
*
buf
,
SKVRecord
*
pRecord
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
);
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
);
static
FORCE_INLINE
int
tsdbGetFidLevel
(
int
fid
,
SRtn
*
pRtn
)
{
static
FORCE_INLINE
int
tsdbGetFidLevel
(
int
fid
,
SRtn
*
pRtn
)
{
if
(
fid
>=
pRtn
->
maxFid
)
{
if
(
fid
>=
pRtn
->
maxFid
)
{
...
...
src/tsdb/inc/tsdbint.h
浏览文件 @
a3c11889
...
@@ -86,6 +86,7 @@ struct STsdbRepo {
...
@@ -86,6 +86,7 @@ struct STsdbRepo {
SMemTable
*
mem
;
SMemTable
*
mem
;
SMemTable
*
imem
;
SMemTable
*
imem
;
STsdbFS
*
fs
;
STsdbFS
*
fs
;
SRtn
rtn
;
tsem_t
readyToCommit
;
tsem_t
readyToCommit
;
pthread_mutex_t
mutex
;
pthread_mutex_t
mutex
;
bool
repoLocked
;
bool
repoLocked
;
...
...
src/tsdb/src/tsdbCommit.c
浏览文件 @
a3c11889
...
@@ -86,7 +86,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
...
@@ -86,7 +86,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static
bool
tsdbCanAddSubBlock
(
SCommitH
*
pCommith
,
SBlock
*
pBlock
,
SMergeInfo
*
pInfo
);
static
bool
tsdbCanAddSubBlock
(
SCommitH
*
pCommith
,
SBlock
*
pBlock
,
SMergeInfo
*
pInfo
);
static
void
tsdbLoadAndMergeFromCache
(
SDataCols
*
pDataCols
,
int
*
iter
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pTarget
,
static
void
tsdbLoadAndMergeFromCache
(
SDataCols
*
pDataCols
,
int
*
iter
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pTarget
,
TSKEY
maxKey
,
int
maxRows
,
int8_t
update
);
TSKEY
maxKey
,
int
maxRows
,
int8_t
update
);
static
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
);
static
int
tsdbApplyRtnOnFSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
);
static
int
tsdbApplyRtnOnFSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
)
{
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
)
{
...
@@ -1431,7 +1430,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
...
@@ -1431,7 +1430,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
return
false
;
return
false
;
}
}
static
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
)
{
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
)
{
SRtn
rtn
;
SRtn
rtn
;
SFSIter
fsiter
;
SFSIter
fsiter
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
...
...
src/tsdb/src/tsdbFS.c
浏览文件 @
a3c11889
...
@@ -33,7 +33,9 @@ static int tsdbScanDataDir(STsdbRepo *pRepo);
...
@@ -33,7 +33,9 @@ static int tsdbScanDataDir(STsdbRepo *pRepo);
static
bool
tsdbIsTFileInFS
(
STsdbFS
*
pfs
,
const
TFILE
*
pf
);
static
bool
tsdbIsTFileInFS
(
STsdbFS
*
pfs
,
const
TFILE
*
pf
);
static
int
tsdbRestoreCurrent
(
STsdbRepo
*
pRepo
);
static
int
tsdbRestoreCurrent
(
STsdbRepo
*
pRepo
);
static
int
tsdbComparTFILE
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbComparTFILE
(
const
void
*
arg1
,
const
void
*
arg2
);
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
);
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
,
int32_t
*
nExpired
);
static
int
tsdbProcessExpiredFS
(
STsdbRepo
*
pRepo
);
static
int
tsdbCreateMeta
(
STsdbRepo
*
pRepo
);
// ================== CURRENT file header info
// ================== CURRENT file header info
static
int
tsdbEncodeFSHeader
(
void
**
buf
,
SFSHeader
*
pHeader
)
{
static
int
tsdbEncodeFSHeader
(
void
**
buf
,
SFSHeader
*
pHeader
)
{
...
@@ -212,6 +214,8 @@ STsdbFS *tsdbNewFS(STsdbCfg *pCfg) {
...
@@ -212,6 +214,8 @@ STsdbFS *tsdbNewFS(STsdbCfg *pCfg) {
return
NULL
;
return
NULL
;
}
}
pfs
->
intxn
=
false
;
pfs
->
nstatus
=
tsdbNewFSStatus
(
maxFSet
);
pfs
->
nstatus
=
tsdbNewFSStatus
(
maxFSet
);
if
(
pfs
->
nstatus
==
NULL
)
{
if
(
pfs
->
nstatus
==
NULL
)
{
tsdbFreeFS
(
pfs
);
tsdbFreeFS
(
pfs
);
...
@@ -234,22 +238,84 @@ void *tsdbFreeFS(STsdbFS *pfs) {
...
@@ -234,22 +238,84 @@ void *tsdbFreeFS(STsdbFS *pfs) {
return
NULL
;
return
NULL
;
}
}
static
int
tsdbProcessExpiredFS
(
STsdbRepo
*
pRepo
)
{
tsdbStartFSTxn
(
pRepo
,
0
,
0
);
if
(
tsdbCreateMeta
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create meta since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbApplyRtn
(
pRepo
)
<
0
)
{
tsdbEndFSTxnWithError
(
REPO_FS
(
pRepo
));
tsdbError
(
"vgId:%d failed to apply rtn since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbEndFSTxn
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to end fs txn since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
return
0
;
}
static
int
tsdbCreateMeta
(
STsdbRepo
*
pRepo
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
SMFile
*
pOMFile
=
pfs
->
cstatus
->
pmf
;
SMFile
mf
;
SDiskID
did
;
if
(
pOMFile
!=
NULL
)
{
// keep the old meta file
tsdbUpdateMFile
(
pfs
,
pOMFile
);
return
0
;
}
// Create a new meta file
did
.
level
=
TFS_PRIMARY_LEVEL
;
did
.
id
=
TFS_PRIMARY_ID
;
tsdbInitMFile
(
&
mf
,
did
,
REPO_ID
(
pRepo
),
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)));
if
(
tsdbCreateMFile
(
&
mf
,
true
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create META file since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
tsdbInfo
(
"vgId:%d meta file %s is created"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
&
mf
));
if
(
tsdbUpdateMFileHeader
(
&
mf
)
<
0
)
{
tsdbError
(
"vgId:%d failed to update META file header since %s, revert it"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbApplyMFileChange
(
&
mf
,
pOMFile
);
return
-
1
;
}
TSDB_FILE_FSYNC
(
&
mf
);
tsdbCloseMFile
(
&
mf
);
tsdbUpdateMFile
(
pfs
,
&
mf
);
return
0
;
}
int
tsdbOpenFS
(
STsdbRepo
*
pRepo
)
{
int
tsdbOpenFS
(
STsdbRepo
*
pRepo
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
char
current
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
char
current
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
int
nExpired
=
0
;
ASSERT
(
pfs
!=
NULL
);
ASSERT
(
pfs
!=
NULL
);
tsdbGetTxnFname
(
REPO_ID
(
pRepo
),
TSDB_TXN_CURR_FILE
,
current
);
tsdbGetTxnFname
(
REPO_ID
(
pRepo
),
TSDB_TXN_CURR_FILE
,
current
);
tsdbGetRtnSnap
(
pRepo
,
&
pRepo
->
rtn
);
if
(
access
(
current
,
F_OK
)
==
0
)
{
if
(
access
(
current
,
F_OK
)
==
0
)
{
if
(
tsdbOpenFSFromCurrent
(
pRepo
)
<
0
)
{
if
(
tsdbOpenFSFromCurrent
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to open FS since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d failed to open FS since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
tsdbScanAndTryFixDFilesHeader
(
pRepo
);
tsdbScanAndTryFixDFilesHeader
(
pRepo
,
&
nExpired
);
if
(
nExpired
>
0
)
{
tsdbProcessExpiredFS
(
pRepo
);
}
}
else
{
}
else
{
// should skip expired fileset inside of the function
if
(
tsdbRestoreCurrent
(
pRepo
)
<
0
)
{
if
(
tsdbRestoreCurrent
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to restore current file since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d failed to restore current file since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
...
@@ -1110,6 +1176,11 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
...
@@ -1110,6 +1176,11 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
ASSERT
(
tvid
==
REPO_ID
(
pRepo
));
ASSERT
(
tvid
==
REPO_ID
(
pRepo
));
if
(
tfid
<
pRepo
->
rtn
.
minFid
)
{
// skip file expired
++
index
;
continue
;
}
if
(
ftype
==
0
)
{
if
(
ftype
==
0
)
{
fset
.
fid
=
tfid
;
fset
.
fid
=
tfid
;
}
else
{
}
else
{
...
@@ -1206,7 +1277,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
...
@@ -1206,7 +1277,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
}
}
}
}
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
)
{
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
,
int32_t
*
nExpired
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
SFSStatus
*
pStatus
=
pfs
->
cstatus
;
SFSStatus
*
pStatus
=
pfs
->
cstatus
;
SDFInfo
info
;
SDFInfo
info
;
...
@@ -1214,7 +1285,9 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo) {
...
@@ -1214,7 +1285,9 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo) {
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pStatus
->
df
);
i
++
)
{
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pStatus
->
df
);
i
++
)
{
SDFileSet
fset
;
SDFileSet
fset
;
tsdbInitDFileSetEx
(
&
fset
,
(
SDFileSet
*
)
taosArrayGet
(
pStatus
->
df
,
i
));
tsdbInitDFileSetEx
(
&
fset
,
(
SDFileSet
*
)
taosArrayGet
(
pStatus
->
df
,
i
));
if
(
fset
.
fid
<
pRepo
->
rtn
.
minFid
)
{
++*
nExpired
;
}
tsdbDebug
(
"vgId:%d scan DFileSet %d header"
,
REPO_ID
(
pRepo
),
fset
.
fid
);
tsdbDebug
(
"vgId:%d scan DFileSet %d header"
,
REPO_ID
(
pRepo
),
fset
.
fid
);
if
(
tsdbOpenDFileSet
(
&
fset
,
O_RDWR
)
<
0
)
{
if
(
tsdbOpenDFileSet
(
&
fset
,
O_RDWR
)
<
0
)
{
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
a3c11889
...
@@ -2128,6 +2128,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
...
@@ -2128,6 +2128,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
queryHandle
;
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
queryHandle
;
pTableBlockInfo
->
totalSize
=
0
;
pTableBlockInfo
->
totalSize
=
0
;
pTableBlockInfo
->
totalRows
=
0
;
STsdbFS
*
pFileHandle
=
REPO_FS
(
pQueryHandle
->
pTsdb
);
STsdbFS
*
pFileHandle
=
REPO_FS
(
pQueryHandle
->
pTsdb
);
// find the start data block in file
// find the start data block in file
...
@@ -2201,7 +2202,12 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
...
@@ -2201,7 +2202,12 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
pTableBlockInfo
->
totalSize
+=
pBlock
[
j
].
len
;
pTableBlockInfo
->
totalSize
+=
pBlock
[
j
].
len
;
int32_t
numOfRows
=
pBlock
[
j
].
numOfRows
;
int32_t
numOfRows
=
pBlock
[
j
].
numOfRows
;
taosArrayPush
(
pTableBlockInfo
->
dataBlockInfos
,
&
numOfRows
);
pTableBlockInfo
->
totalRows
+=
numOfRows
;
if
(
numOfRows
>
pTableBlockInfo
->
maxRows
)
pTableBlockInfo
->
maxRows
=
numOfRows
;
if
(
numOfRows
<
pTableBlockInfo
->
minRows
)
pTableBlockInfo
->
minRows
=
numOfRows
;
int32_t
stepIndex
=
(
numOfRows
-
1
)
/
TSDB_BLOCK_DIST_STEP_ROWS
;
SFileBlockInfo
*
blockInfo
=
(
SFileBlockInfo
*
)
taosArrayGet
(
pTableBlockInfo
->
dataBlockInfos
,
stepIndex
);
blockInfo
->
numBlocksOfStep
++
;
}
}
}
}
}
}
...
...
src/tsdb/src/tsdbSync.c
浏览文件 @
a3c11889
...
@@ -424,24 +424,42 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
...
@@ -424,24 +424,42 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
}
}
if
(
tsdbSendDecision
(
pSynch
,
false
)
<
0
)
{
if
(
tsdbSendDecision
(
pSynch
,
false
)
<
0
)
{
tsdbError
(
"vgId:%d, filed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d, f
a
iled to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
}
else
{
}
else
{
// Need to copy from remote
// Need to copy from remote
tsdbInfo
(
"vgId:%d, fileset:%d will be received"
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
);
int
fidLevel
=
tsdbGetFidLevel
(
pSynch
->
pdf
->
fid
,
&
(
pSynch
->
rtn
));
if
(
fidLevel
<
0
)
{
// expired fileset
// Notify remote to send there file here
tsdbInfo
(
"vgId:%d, fileset:%d will be skipped as expired"
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
);
if
(
tsdbSendDecision
(
pSynch
,
true
)
<
0
)
{
if
(
tsdbSendDecision
(
pSynch
,
false
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d, failed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
// Move forward
if
(
tsdbRecvDFileSetInfo
(
pSynch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv fileset since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
pLSet
)
{
pLSet
=
tsdbFSIterNext
(
&
fsiter
);
}
// Next loop
continue
;
}
else
{
tsdbInfo
(
"vgId:%d, fileset:%d will be received"
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
);
// Notify remote to send there file here
if
(
tsdbSendDecision
(
pSynch
,
true
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
}
}
// Create local files and copy from remote
// Create local files and copy from remote
SDiskID
did
;
SDiskID
did
;
SDFileSet
fset
;
SDFileSet
fset
;
tfsAllocDisk
(
tsdbGetFidLevel
(
pSynch
->
pdf
->
fid
,
&
(
pSynch
->
rtn
))
,
&
(
did
.
level
),
&
(
did
.
id
));
tfsAllocDisk
(
fidLevel
,
&
(
did
.
level
),
&
(
did
.
id
));
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
tsdbError
(
"vgId:%d, failed allc disk since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d, failed allc disk since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
...
@@ -548,6 +566,13 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
...
@@ -548,6 +566,13 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
STsdbRepo
*
pRepo
=
pSynch
->
pRepo
;
STsdbRepo
*
pRepo
=
pSynch
->
pRepo
;
bool
toSend
=
false
;
bool
toSend
=
false
;
// skip expired fileset
if
(
pSet
&&
tsdbGetFidLevel
(
pSet
->
fid
,
&
(
pSynch
->
rtn
))
<
0
)
{
tsdbInfo
(
"vgId:%d, don't sync send since fileset:%d smaller than minFid:%d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
pSynch
->
rtn
.
minFid
);
return
0
;
}
if
(
tsdbSendDFileSetInfo
(
pSynch
,
pSet
)
<
0
)
{
if
(
tsdbSendDFileSetInfo
(
pSynch
,
pSet
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send fileset:%d info since %s"
,
REPO_ID
(
pRepo
),
pSet
?
pSet
->
fid
:
-
1
,
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d, failed to send fileset:%d info since %s"
,
REPO_ID
(
pRepo
),
pSet
?
pSet
->
fid
:
-
1
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
...
...
src/util/inc/tarray.h
浏览文件 @
a3c11889
...
@@ -106,6 +106,14 @@ void* taosArrayGetLast(const SArray* pArray);
...
@@ -106,6 +106,14 @@ void* taosArrayGetLast(const SArray* pArray);
*/
*/
size_t
taosArrayGetSize
(
const
SArray
*
pArray
);
size_t
taosArrayGetSize
(
const
SArray
*
pArray
);
/**
* set the size of array
* @param pArray
* @param size size of the array
* @return
*/
void
taosArraySetSize
(
SArray
*
pArray
,
size_t
size
);
/**
/**
* insert data into array
* insert data into array
* @param pArray
* @param pArray
...
...
src/util/src/tarray.c
浏览文件 @
a3c11889
...
@@ -115,6 +115,11 @@ void* taosArrayGetLast(const SArray* pArray) {
...
@@ -115,6 +115,11 @@ void* taosArrayGetLast(const SArray* pArray) {
size_t
taosArrayGetSize
(
const
SArray
*
pArray
)
{
return
pArray
->
size
;
}
size_t
taosArrayGetSize
(
const
SArray
*
pArray
)
{
return
pArray
->
size
;
}
void
taosArraySetSize
(
SArray
*
pArray
,
size_t
size
)
{
assert
(
size
<=
pArray
->
capacity
);
pArray
->
size
=
size
;
}
void
*
taosArrayInsert
(
SArray
*
pArray
,
size_t
index
,
void
*
pData
)
{
void
*
taosArrayInsert
(
SArray
*
pArray
,
size_t
index
,
void
*
pData
)
{
if
(
pArray
==
NULL
||
pData
==
NULL
)
{
if
(
pArray
==
NULL
||
pData
==
NULL
)
{
return
NULL
;
return
NULL
;
...
...
tests/examples/c/apitest.c
浏览文件 @
a3c11889
...
@@ -402,6 +402,471 @@ void verify_prepare(TAOS* taos) {
...
@@ -402,6 +402,471 @@ void verify_prepare(TAOS* taos) {
taos_stmt_close
(
stmt
);
taos_stmt_close
(
stmt
);
}
}
void
verify_prepare2
(
TAOS
*
taos
)
{
TAOS_RES
*
result
=
taos_query
(
taos
,
"drop database if exists test;"
);
taos_free_result
(
result
);
usleep
(
100000
);
result
=
taos_query
(
taos
,
"create database test;"
);
int
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
printf
(
"
\033
[31mfailed to create database, reason:%s
\033
[0m
\n
"
,
taos_errstr
(
result
));
taos_free_result
(
result
);
return
;
}
taos_free_result
(
result
);
usleep
(
100000
);
taos_select_db
(
taos
,
"test"
);
// create table
const
char
*
sql
=
"create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10))"
;
result
=
taos_query
(
taos
,
sql
);
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
printf
(
"
\033
[31mfailed to create table, reason:%s
\033
[0m
\n
"
,
taos_errstr
(
result
));
taos_free_result
(
result
);
return
;
}
taos_free_result
(
result
);
// insert 10 records
struct
{
int64_t
ts
[
10
];
int8_t
b
[
10
];
int8_t
v1
[
10
];
int16_t
v2
[
10
];
int32_t
v4
[
10
];
int64_t
v8
[
10
];
float
f4
[
10
];
double
f8
[
10
];
char
bin
[
10
][
40
];
char
blob
[
10
][
80
];
}
v
;
int32_t
*
t8_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
t16_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
t32_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
t64_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
float_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
double_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
bin_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
blob_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
TAOS_STMT
*
stmt
=
taos_stmt_init
(
taos
);
TAOS_MULTI_BIND
params
[
10
];
char
is_null
[
10
]
=
{
0
};
params
[
0
].
buffer_type
=
TSDB_DATA_TYPE_TIMESTAMP
;
params
[
0
].
buffer_length
=
sizeof
(
v
.
ts
[
0
]);
params
[
0
].
buffer
=
v
.
ts
;
params
[
0
].
length
=
t64_len
;
params
[
0
].
is_null
=
is_null
;
params
[
0
].
num
=
10
;
params
[
1
].
buffer_type
=
TSDB_DATA_TYPE_BOOL
;
params
[
1
].
buffer_length
=
sizeof
(
v
.
b
[
0
]);
params
[
1
].
buffer
=
v
.
b
;
params
[
1
].
length
=
t8_len
;
params
[
1
].
is_null
=
is_null
;
params
[
1
].
num
=
10
;
params
[
2
].
buffer_type
=
TSDB_DATA_TYPE_TINYINT
;
params
[
2
].
buffer_length
=
sizeof
(
v
.
v1
[
0
]);
params
[
2
].
buffer
=
v
.
v1
;
params
[
2
].
length
=
t8_len
;
params
[
2
].
is_null
=
is_null
;
params
[
2
].
num
=
10
;
params
[
3
].
buffer_type
=
TSDB_DATA_TYPE_SMALLINT
;
params
[
3
].
buffer_length
=
sizeof
(
v
.
v2
[
0
]);
params
[
3
].
buffer
=
v
.
v2
;
params
[
3
].
length
=
t16_len
;
params
[
3
].
is_null
=
is_null
;
params
[
3
].
num
=
10
;
params
[
4
].
buffer_type
=
TSDB_DATA_TYPE_INT
;
params
[
4
].
buffer_length
=
sizeof
(
v
.
v4
[
0
]);
params
[
4
].
buffer
=
v
.
v4
;
params
[
4
].
length
=
t32_len
;
params
[
4
].
is_null
=
is_null
;
params
[
4
].
num
=
10
;
params
[
5
].
buffer_type
=
TSDB_DATA_TYPE_BIGINT
;
params
[
5
].
buffer_length
=
sizeof
(
v
.
v8
[
0
]);
params
[
5
].
buffer
=
v
.
v8
;
params
[
5
].
length
=
t64_len
;
params
[
5
].
is_null
=
is_null
;
params
[
5
].
num
=
10
;
params
[
6
].
buffer_type
=
TSDB_DATA_TYPE_FLOAT
;
params
[
6
].
buffer_length
=
sizeof
(
v
.
f4
[
0
]);
params
[
6
].
buffer
=
v
.
f4
;
params
[
6
].
length
=
float_len
;
params
[
6
].
is_null
=
is_null
;
params
[
6
].
num
=
10
;
params
[
7
].
buffer_type
=
TSDB_DATA_TYPE_DOUBLE
;
params
[
7
].
buffer_length
=
sizeof
(
v
.
f8
[
0
]);
params
[
7
].
buffer
=
v
.
f8
;
params
[
7
].
length
=
double_len
;
params
[
7
].
is_null
=
is_null
;
params
[
7
].
num
=
10
;
params
[
8
].
buffer_type
=
TSDB_DATA_TYPE_BINARY
;
params
[
8
].
buffer_length
=
sizeof
(
v
.
bin
[
0
]);
params
[
8
].
buffer
=
v
.
bin
;
params
[
8
].
length
=
bin_len
;
params
[
8
].
is_null
=
is_null
;
params
[
8
].
num
=
10
;
params
[
9
].
buffer_type
=
TSDB_DATA_TYPE_NCHAR
;
params
[
9
].
buffer_length
=
sizeof
(
v
.
blob
[
0
]);
params
[
9
].
buffer
=
v
.
blob
;
params
[
9
].
length
=
blob_len
;
params
[
9
].
is_null
=
is_null
;
params
[
9
].
num
=
10
;
sql
=
"insert into ? values(?,?,?,?,?,?,?,?,?,?)"
;
code
=
taos_stmt_prepare
(
stmt
,
sql
,
0
);
if
(
code
!=
0
){
printf
(
"
\033
[31mfailed to execute taos_stmt_prepare. code:0x%x
\033
[0m
\n
"
,
code
);
}
code
=
taos_stmt_set_tbname
(
stmt
,
"m1"
);
if
(
code
!=
0
){
printf
(
"
\033
[31mfailed to execute taos_stmt_prepare. code:0x%x
\033
[0m
\n
"
,
code
);
}
int64_t
ts
=
1591060628000
;
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
v
.
ts
[
i
]
=
ts
++
;
is_null
[
i
]
=
0
;
v
.
b
[
i
]
=
(
int8_t
)
i
%
2
;
v
.
v1
[
i
]
=
(
int8_t
)
i
;
v
.
v2
[
i
]
=
(
int16_t
)(
i
*
2
);
v
.
v4
[
i
]
=
(
int32_t
)(
i
*
4
);
v
.
v8
[
i
]
=
(
int64_t
)(
i
*
8
);
v
.
f4
[
i
]
=
(
float
)(
i
*
40
);
v
.
f8
[
i
]
=
(
double
)(
i
*
80
);
for
(
int
j
=
0
;
j
<
sizeof
(
v
.
bin
[
0
])
-
1
;
++
j
)
{
v
.
bin
[
i
][
j
]
=
(
char
)(
i
+
'0'
);
}
strcpy
(
v
.
blob
[
i
],
"一二三四五六七八九十"
);
t8_len
[
i
]
=
sizeof
(
int8_t
);
t16_len
[
i
]
=
sizeof
(
int16_t
);
t32_len
[
i
]
=
sizeof
(
int32_t
);
t64_len
[
i
]
=
sizeof
(
int64_t
);
float_len
[
i
]
=
sizeof
(
float
);
double_len
[
i
]
=
sizeof
(
double
);
bin_len
[
i
]
=
sizeof
(
v
.
bin
[
0
]);
blob_len
[
i
]
=
(
int32_t
)
strlen
(
v
.
blob
[
i
]);
}
taos_stmt_bind_param_batch
(
stmt
,
params
);
taos_stmt_add_batch
(
stmt
);
if
(
taos_stmt_execute
(
stmt
)
!=
0
)
{
printf
(
"
\033
[31mfailed to execute insert statement.
\033
[0m
\n
"
);
return
;
}
taos_stmt_close
(
stmt
);
// query the records
stmt
=
taos_stmt_init
(
taos
);
taos_stmt_prepare
(
stmt
,
"SELECT * FROM m1 WHERE v1 > ? AND v2 < ?"
,
0
);
TAOS_BIND
qparams
[
2
];
int8_t
v1
=
5
;
int16_t
v2
=
15
;
qparams
[
0
].
buffer_type
=
TSDB_DATA_TYPE_TINYINT
;
qparams
[
0
].
buffer_length
=
sizeof
(
v1
);
qparams
[
0
].
buffer
=
&
v1
;
qparams
[
0
].
length
=
&
qparams
[
0
].
buffer_length
;
qparams
[
0
].
is_null
=
NULL
;
qparams
[
1
].
buffer_type
=
TSDB_DATA_TYPE_SMALLINT
;
qparams
[
1
].
buffer_length
=
sizeof
(
v2
);
qparams
[
1
].
buffer
=
&
v2
;
qparams
[
1
].
length
=
&
qparams
[
1
].
buffer_length
;
qparams
[
1
].
is_null
=
NULL
;
taos_stmt_bind_param
(
stmt
,
qparams
);
if
(
taos_stmt_execute
(
stmt
)
!=
0
)
{
printf
(
"
\033
[31mfailed to execute select statement.
\033
[0m
\n
"
);
return
;
}
result
=
taos_stmt_use_result
(
stmt
);
TAOS_ROW
row
;
int
rows
=
0
;
int
num_fields
=
taos_num_fields
(
result
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
char
temp
[
256
];
// fetch the records row by row
while
((
row
=
taos_fetch_row
(
result
)))
{
rows
++
;
taos_print_row
(
temp
,
row
,
fields
,
num_fields
);
printf
(
"%s
\n
"
,
temp
);
}
taos_free_result
(
result
);
taos_stmt_close
(
stmt
);
}
void
verify_prepare3
(
TAOS
*
taos
)
{
TAOS_RES
*
result
=
taos_query
(
taos
,
"drop database if exists test;"
);
taos_free_result
(
result
);
usleep
(
100000
);
result
=
taos_query
(
taos
,
"create database test;"
);
int
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
printf
(
"
\033
[31mfailed to create database, reason:%s
\033
[0m
\n
"
,
taos_errstr
(
result
));
taos_free_result
(
result
);
return
;
}
taos_free_result
(
result
);
usleep
(
100000
);
taos_select_db
(
taos
,
"test"
);
// create table
const
char
*
sql
=
"create stable st1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10)) tags (id1 int, id2 binary(40))"
;
result
=
taos_query
(
taos
,
sql
);
code
=
taos_errno
(
result
);
if
(
code
!=
0
)
{
printf
(
"
\033
[31mfailed to create table, reason:%s
\033
[0m
\n
"
,
taos_errstr
(
result
));
taos_free_result
(
result
);
return
;
}
taos_free_result
(
result
);
TAOS_BIND
tags
[
2
];
int32_t
id1
=
1
;
char
id2
[
40
]
=
"abcdefghijklmnopqrstuvwxyz0123456789"
;
uintptr_t
id2_len
=
strlen
(
id2
);
tags
[
0
].
buffer_type
=
TSDB_DATA_TYPE_INT
;
tags
[
0
].
buffer_length
=
sizeof
(
int
);
tags
[
0
].
buffer
=
&
id1
;
tags
[
0
].
length
=
NULL
;
tags
[
0
].
is_null
=
NULL
;
tags
[
1
].
buffer_type
=
TSDB_DATA_TYPE_BINARY
;
tags
[
1
].
buffer_length
=
sizeof
(
id2
);
tags
[
1
].
buffer
=
id2
;
tags
[
1
].
length
=
&
id2_len
;
tags
[
1
].
is_null
=
NULL
;
// insert 10 records
struct
{
int64_t
ts
[
10
];
int8_t
b
[
10
];
int8_t
v1
[
10
];
int16_t
v2
[
10
];
int32_t
v4
[
10
];
int64_t
v8
[
10
];
float
f4
[
10
];
double
f8
[
10
];
char
bin
[
10
][
40
];
char
blob
[
10
][
80
];
}
v
;
int32_t
*
t8_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
t16_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
t32_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
t64_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
float_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
double_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
bin_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
int32_t
*
blob_len
=
malloc
(
sizeof
(
int32_t
)
*
10
);
TAOS_STMT
*
stmt
=
taos_stmt_init
(
taos
);
TAOS_MULTI_BIND
params
[
10
];
char
is_null
[
10
]
=
{
0
};
params
[
0
].
buffer_type
=
TSDB_DATA_TYPE_TIMESTAMP
;
params
[
0
].
buffer_length
=
sizeof
(
v
.
ts
[
0
]);
params
[
0
].
buffer
=
v
.
ts
;
params
[
0
].
length
=
t64_len
;
params
[
0
].
is_null
=
is_null
;
params
[
0
].
num
=
10
;
params
[
1
].
buffer_type
=
TSDB_DATA_TYPE_BOOL
;
params
[
1
].
buffer_length
=
sizeof
(
v
.
b
[
0
]);
params
[
1
].
buffer
=
v
.
b
;
params
[
1
].
length
=
t8_len
;
params
[
1
].
is_null
=
is_null
;
params
[
1
].
num
=
10
;
params
[
2
].
buffer_type
=
TSDB_DATA_TYPE_TINYINT
;
params
[
2
].
buffer_length
=
sizeof
(
v
.
v1
[
0
]);
params
[
2
].
buffer
=
v
.
v1
;
params
[
2
].
length
=
t8_len
;
params
[
2
].
is_null
=
is_null
;
params
[
2
].
num
=
10
;
params
[
3
].
buffer_type
=
TSDB_DATA_TYPE_SMALLINT
;
params
[
3
].
buffer_length
=
sizeof
(
v
.
v2
[
0
]);
params
[
3
].
buffer
=
v
.
v2
;
params
[
3
].
length
=
t16_len
;
params
[
3
].
is_null
=
is_null
;
params
[
3
].
num
=
10
;
params
[
4
].
buffer_type
=
TSDB_DATA_TYPE_INT
;
params
[
4
].
buffer_length
=
sizeof
(
v
.
v4
[
0
]);
params
[
4
].
buffer
=
v
.
v4
;
params
[
4
].
length
=
t32_len
;
params
[
4
].
is_null
=
is_null
;
params
[
4
].
num
=
10
;
params
[
5
].
buffer_type
=
TSDB_DATA_TYPE_BIGINT
;
params
[
5
].
buffer_length
=
sizeof
(
v
.
v8
[
0
]);
params
[
5
].
buffer
=
v
.
v8
;
params
[
5
].
length
=
t64_len
;
params
[
5
].
is_null
=
is_null
;
params
[
5
].
num
=
10
;
params
[
6
].
buffer_type
=
TSDB_DATA_TYPE_FLOAT
;
params
[
6
].
buffer_length
=
sizeof
(
v
.
f4
[
0
]);
params
[
6
].
buffer
=
v
.
f4
;
params
[
6
].
length
=
float_len
;
params
[
6
].
is_null
=
is_null
;
params
[
6
].
num
=
10
;
params
[
7
].
buffer_type
=
TSDB_DATA_TYPE_DOUBLE
;
params
[
7
].
buffer_length
=
sizeof
(
v
.
f8
[
0
]);
params
[
7
].
buffer
=
v
.
f8
;
params
[
7
].
length
=
double_len
;
params
[
7
].
is_null
=
is_null
;
params
[
7
].
num
=
10
;
params
[
8
].
buffer_type
=
TSDB_DATA_TYPE_BINARY
;
params
[
8
].
buffer_length
=
sizeof
(
v
.
bin
[
0
]);
params
[
8
].
buffer
=
v
.
bin
;
params
[
8
].
length
=
bin_len
;
params
[
8
].
is_null
=
is_null
;
params
[
8
].
num
=
10
;
params
[
9
].
buffer_type
=
TSDB_DATA_TYPE_NCHAR
;
params
[
9
].
buffer_length
=
sizeof
(
v
.
blob
[
0
]);
params
[
9
].
buffer
=
v
.
blob
;
params
[
9
].
length
=
blob_len
;
params
[
9
].
is_null
=
is_null
;
params
[
9
].
num
=
10
;
sql
=
"insert into ? using st1 tags(?,?) values(?,?,?,?,?,?,?,?,?,?)"
;
code
=
taos_stmt_prepare
(
stmt
,
sql
,
0
);
if
(
code
!=
0
){
printf
(
"
\033
[31mfailed to execute taos_stmt_prepare. code:0x%x
\033
[0m
\n
"
,
code
);
}
code
=
taos_stmt_set_tbname_tags
(
stmt
,
"m1"
,
tags
);
if
(
code
!=
0
){
printf
(
"
\033
[31mfailed to execute taos_stmt_prepare. code:0x%x
\033
[0m
\n
"
,
code
);
}
int64_t
ts
=
1591060628000
;
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
v
.
ts
[
i
]
=
ts
++
;
is_null
[
i
]
=
0
;
v
.
b
[
i
]
=
(
int8_t
)
i
%
2
;
v
.
v1
[
i
]
=
(
int8_t
)
i
;
v
.
v2
[
i
]
=
(
int16_t
)(
i
*
2
);
v
.
v4
[
i
]
=
(
int32_t
)(
i
*
4
);
v
.
v8
[
i
]
=
(
int64_t
)(
i
*
8
);
v
.
f4
[
i
]
=
(
float
)(
i
*
40
);
v
.
f8
[
i
]
=
(
double
)(
i
*
80
);
for
(
int
j
=
0
;
j
<
sizeof
(
v
.
bin
[
0
])
-
1
;
++
j
)
{
v
.
bin
[
i
][
j
]
=
(
char
)(
i
+
'0'
);
}
strcpy
(
v
.
blob
[
i
],
"一二三四五六七八九十"
);
t8_len
[
i
]
=
sizeof
(
int8_t
);
t16_len
[
i
]
=
sizeof
(
int16_t
);
t32_len
[
i
]
=
sizeof
(
int32_t
);
t64_len
[
i
]
=
sizeof
(
int64_t
);
float_len
[
i
]
=
sizeof
(
float
);
double_len
[
i
]
=
sizeof
(
double
);
bin_len
[
i
]
=
sizeof
(
v
.
bin
[
0
]);
blob_len
[
i
]
=
(
int32_t
)
strlen
(
v
.
blob
[
i
]);
}
taos_stmt_bind_param_batch
(
stmt
,
params
);
taos_stmt_add_batch
(
stmt
);
if
(
taos_stmt_execute
(
stmt
)
!=
0
)
{
printf
(
"
\033
[31mfailed to execute insert statement.
\033
[0m
\n
"
);
return
;
}
taos_stmt_close
(
stmt
);
// query the records
stmt
=
taos_stmt_init
(
taos
);
taos_stmt_prepare
(
stmt
,
"SELECT * FROM m1 WHERE v1 > ? AND v2 < ?"
,
0
);
TAOS_BIND
qparams
[
2
];
int8_t
v1
=
5
;
int16_t
v2
=
15
;
qparams
[
0
].
buffer_type
=
TSDB_DATA_TYPE_TINYINT
;
qparams
[
0
].
buffer_length
=
sizeof
(
v1
);
qparams
[
0
].
buffer
=
&
v1
;
qparams
[
0
].
length
=
&
qparams
[
0
].
buffer_length
;
qparams
[
0
].
is_null
=
NULL
;
qparams
[
1
].
buffer_type
=
TSDB_DATA_TYPE_SMALLINT
;
qparams
[
1
].
buffer_length
=
sizeof
(
v2
);
qparams
[
1
].
buffer
=
&
v2
;
qparams
[
1
].
length
=
&
qparams
[
1
].
buffer_length
;
qparams
[
1
].
is_null
=
NULL
;
taos_stmt_bind_param
(
stmt
,
qparams
);
if
(
taos_stmt_execute
(
stmt
)
!=
0
)
{
printf
(
"
\033
[31mfailed to execute select statement.
\033
[0m
\n
"
);
return
;
}
result
=
taos_stmt_use_result
(
stmt
);
TAOS_ROW
row
;
int
rows
=
0
;
int
num_fields
=
taos_num_fields
(
result
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
char
temp
[
256
];
// fetch the records row by row
while
((
row
=
taos_fetch_row
(
result
)))
{
rows
++
;
taos_print_row
(
temp
,
row
,
fields
,
num_fields
);
printf
(
"%s
\n
"
,
temp
);
}
taos_free_result
(
result
);
taos_stmt_close
(
stmt
);
}
void
retrieve_callback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
void
retrieve_callback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
{
if
(
numOfRows
>
0
)
{
if
(
numOfRows
>
0
)
{
...
@@ -493,6 +958,12 @@ int main(int argc, char *argv[]) {
...
@@ -493,6 +958,12 @@ int main(int argc, char *argv[]) {
printf
(
"************ verify prepare *************
\n
"
);
printf
(
"************ verify prepare *************
\n
"
);
verify_prepare
(
taos
);
verify_prepare
(
taos
);
printf
(
"************ verify prepare2 *************
\n
"
);
verify_prepare2
(
taos
);
printf
(
"************ verify prepare3 *************
\n
"
);
verify_prepare3
(
taos
);
printf
(
"************ verify stream *************
\n
"
);
printf
(
"************ verify stream *************
\n
"
);
verify_stream
(
taos
);
verify_stream
(
taos
);
printf
(
"done
\n
"
);
printf
(
"done
\n
"
);
...
...
tests/script/api/batchprepare.c
浏览文件 @
a3c11889
...
@@ -1046,6 +1046,7 @@ int stmt_funcb_autoctb1(TAOS_STMT *stmt) {
...
@@ -1046,6 +1046,7 @@ int stmt_funcb_autoctb1(TAOS_STMT *stmt) {
free
(
params
);
free
(
params
);
free
(
is_null
);
free
(
is_null
);
free
(
no_null
);
free
(
no_null
);
free
(
tags
);
return
0
;
return
0
;
}
}
...
@@ -1258,6 +1259,7 @@ int stmt_funcb_autoctb2(TAOS_STMT *stmt) {
...
@@ -1258,6 +1259,7 @@ int stmt_funcb_autoctb2(TAOS_STMT *stmt) {
free
(
params
);
free
(
params
);
free
(
is_null
);
free
(
is_null
);
free
(
no_null
);
free
(
no_null
);
free
(
tags
);
return
0
;
return
0
;
}
}
...
@@ -1446,6 +1448,7 @@ int stmt_funcb_autoctb3(TAOS_STMT *stmt) {
...
@@ -1446,6 +1448,7 @@ int stmt_funcb_autoctb3(TAOS_STMT *stmt) {
free
(
params
);
free
(
params
);
free
(
is_null
);
free
(
is_null
);
free
(
no_null
);
free
(
no_null
);
free
(
tags
);
return
0
;
return
0
;
}
}
...
@@ -1635,6 +1638,7 @@ int stmt_funcb_autoctb_e1(TAOS_STMT *stmt) {
...
@@ -1635,6 +1638,7 @@ int stmt_funcb_autoctb_e1(TAOS_STMT *stmt) {
free
(
params
);
free
(
params
);
free
(
is_null
);
free
(
is_null
);
free
(
no_null
);
free
(
no_null
);
free
(
tags
);
return
0
;
return
0
;
}
}
...
@@ -1849,6 +1853,7 @@ int stmt_funcb_autoctb_e2(TAOS_STMT *stmt) {
...
@@ -1849,6 +1853,7 @@ int stmt_funcb_autoctb_e2(TAOS_STMT *stmt) {
free
(
params
);
free
(
params
);
free
(
is_null
);
free
(
is_null
);
free
(
no_null
);
free
(
no_null
);
free
(
tags
);
return
0
;
return
0
;
}
}
...
...
tests/script/fullGeneralSuite.sim
浏览文件 @
a3c11889
...
@@ -33,6 +33,7 @@ run general/compute/percentile.sim
...
@@ -33,6 +33,7 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/db/alter_option.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
run general/db/alter_tables_v1.sim
...
...
tests/script/general/compute/block_dist.sim
0 → 100644
浏览文件 @
a3c11889
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
$dbPrefix = m_di_db
$tbPrefix = m_di_tb
$mtPrefix = m_di_mt
$ntPrefix = m_di_nt
$tbNum = 1
$rowNum = 2000
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
$nt = $ntPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
$i = $i + 1
endw
sql create table $nt (ts timestamp, tbcol int)
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $nt values ($ms , $x )
$x = $x + 1
endw
sleep 100
print =============== step2
$i = 0
$tb = $tbPrefix . $i
sql select _block_dist() from $tb
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
print =============== step3
$i = 0
$mt = $mtPrefix . $i
sql select _block_dist() from $mt
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
print =============== step4
$i = 0
$nt = $ntPrefix . $i
sql select _block_dist() from $nt
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
print =============== clear
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
tests/script/general/compute/testSuite.sim
浏览文件 @
a3c11889
...
@@ -14,3 +14,4 @@ run general/compute/percentile.sim
...
@@ -14,3 +14,4 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/top.sim
run general/compute/block_dist.sim
tests/script/regressionSuite.sim
浏览文件 @
a3c11889
...
@@ -32,6 +32,7 @@ run general/compute/percentile.sim
...
@@ -32,6 +32,7 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/db/alter_option.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
run general/db/alter_tables_v1.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录