提交 63d8057a 编写于 作者: G Ganlin Zhao

Merge branch '3.0' into enh/TD-17659

......@@ -5,7 +5,7 @@ title: 数据写入
## 写入语法
```
```sql
INSERT INTO
tb_name
[USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
......@@ -18,46 +18,64 @@ INSERT INTO
...];
```
## 插入一条或多条记录
**关于时间戳**
1. TDengine 要求插入的数据必须要有时间戳,插入数据的时间戳要注意以下几点:
2. 时间戳不同的格式语法会有不同的精度影响。字符串格式的时间戳写法不受所在 DATABASE 的时间精度设置影响;而长整形格式的时间戳写法会受到所在 DATABASE 的时间精度设置影响。例如,时间戳"2021-07-13 16:16:48"的 UNIX 秒数为 1626164208。则其在毫秒精度下需要写作 1626164208000,在微秒精度设置下就需要写为 1626164208000000,纳秒精度设置下需要写为 1626164208000000000。
3. 一次插入多行数据时,不要把首列的时间戳的值都写 NOW。否则会导致语句中的多条记录使用相同的时间戳,于是就可能出现相互覆盖以致这些数据行无法全部被正确保存。其原因在于,NOW 函数在执行中会被解析为所在 SQL 语句的客户端执行时间,出现在同一语句中的多个 NOW 标记也就会被替换为完全相同的时间戳取值。
允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值(数据保留的天数)。允许插入的最新记录的时间戳,是相对于当前服务器时间,加上配置的 DURATION 值(数据文件存储数据的时间跨度,单位为天)。KEEP 和 DURATION 都是可以在创建数据库时指定的,缺省值分别是 3650 天和 10 天。
**语法说明**
1. USING 子句是自动建表语法。如果用户在写数据时并不确定某个表是否存在,此时可以在写入数据时使用自动建表语法来创建不存在的表,若该表已存在则不会建立新表。自动建表时,要求必须以超级表为模板,并写明数据表的 TAGS 取值。可以只是指定部分 TAGS 列的取值,未被指定的 TAGS 列将置为 NULL。
2. 可以指定要插入值的列,对于为指定的列数据库将自动填充为 NULL。
3. VALUES 语法表示了要插入的一行或多行数据。
4. FILE 语法表示数据来自于 CSV 文件(英文逗号分隔、英文单引号括住每个值),CSV 文件无需表头。
5. 无论使用哪种语法,均可以在一条 INSERT 语句中同时向多个表插入数据。
6. INSERT 语句是完整解析后再执行的,对如下语句,不会再出现数据错误但建表成功的情况:
```sql
INSERT INTO d1001 USING meters TAGS('Beijing.Chaoyang', 2) VALUES('a');
```
7. 对于向多个子表插入数据的情况,依然会有部分数据写入失败,部分数据写入成功的情况。这是因为多个子表可能分布在不同的 VNODE 上,客户端将 INSERT 语句完整解析后,将数据发往各个涉及的 VNODE 上,每个 VNODE 独立进行写入操作。如果某个 VNODE 因为某些原因(比如网络问题或磁盘故障)导致写入失败,并不会影响其他 VNODE 节点的写入。
## 插入一条记录
指定已经创建好的数据子表的表名,并通过 VALUES 关键字提供一行或多行数据,即可向数据库写入这些数据。例如,执行如下语句可以写入一行记录:
```
```sql
INSERT INTO d1001 VALUES (NOW, 10.2, 219, 0.32);
```
## 插入多条记录
或者,可以通过如下语句写入两行记录:
```
```sql
INSERT INTO d1001 VALUES ('2021-07-13 14:06:32.272', 10.2, 219, 0.32) (1626164208000, 10.15, 217, 0.33);
```
:::note
1. 在第二个例子中,两行记录的首列时间戳使用了不同格式的写法。其中字符串格式的时间戳写法不受所在 DATABASE 的时间精度设置影响;而长整形格式的时间戳写法会受到所在 DATABASE 的时间精度设置影响——例子中的时间戳在毫秒精度下可以写作 1626164208000,而如果是在微秒精度设置下就需要写为 1626164208000000,纳秒精度设置下需要写为 1626164208000000000。
2. 在使用“插入多条记录”方式写入数据时,不能把第一列的时间戳取值都设为 NOW,否则会导致语句中的多条记录使用相同的时间戳,于是就可能出现相互覆盖以致这些数据行无法全部被正确保存。其原因在于,NOW 函数在执行中会被解析为所在 SQL 语句的实际执行时间,出现在同一语句中的多个 NOW 标记也就会被替换为完全相同的时间戳取值。
3. 允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 keep 值(数据保留的天数);允许插入的最新记录的时间戳,是相对于当前服务器时间,加上配置的 days 值(数据文件存储数据的时间跨度,单位为天)。keep 和 days 都是可以在创建数据库时指定的,缺省值分别是 3650 天和 10 天。
:::
## 插入记录,数据对应到指定的列
## 指定列插入
向数据子表中插入记录时,无论插入一行还是多行,都可以让数据对应到指定的列。对于 SQL 语句中没有出现的列,数据库将自动填充为 NULL。主键(时间戳)不能为 NULL。例如:
```
```sql
INSERT INTO d1001 (ts, current, phase) VALUES ('2021-07-13 14:06:33.196', 10.27, 0.31);
```
:::info
如果不指定列,也即使用全列模式——那么在 VALUES 部分提供的数据,必须为数据表的每个列都显式地提供数据。全列模式写入速度会远快于指定列,因此建议尽可能采用全列写入方式,此时空列可以填入 NULL。
:::
## 向多个表插入记录
可以在一条语句中,分别向多个表插入一条或多条记录,并且也可以在插入过程中指定列。例如:
```
```sql
INSERT INTO d1001 VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33)
d1002 (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31;
```
......@@ -66,28 +84,24 @@ INSERT INTO d1001 VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-
如果用户在写数据时并不确定某个表是否存在,此时可以在写入数据时使用自动建表语法来创建不存在的表,若该表已存在则不会建立新表。自动建表时,要求必须以超级表为模板,并写明数据表的 TAGS 取值。例如:
```
```sql
INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) VALUES ('2021-07-13 14:06:32.272', 10.2, 219, 0.32);
```
也可以在自动建表时,只是指定部分 TAGS 列的取值,未被指定的 TAGS 列将置为 NULL。例如:
```
```sql
INSERT INTO d21001 USING meters (groupId) TAGS (2) VALUES ('2021-07-13 14:06:33.196', 10.15, 217, 0.33);
```
自动建表语法也支持在一条语句中向多个表插入记录。例如:
```
```sql
INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33)
d21002 USING meters (groupId) TAGS (2) VALUES ('2021-07-13 14:06:34.255', 10.15, 217, 0.33)
d21003 USING meters (groupId) TAGS (2) (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31);
```
:::info
在 2.0.20.5 版本之前,在使用自动建表语法并指定列时,子表的列名必须紧跟在子表名称后面,而不能如例子里那样放在 TAGS 和 VALUES 之间。从 2.0.20.5 版本开始,两种写法都可以,但不能在一条 SQL 语句中混用,否则会报语法错误。
:::
## 插入来自文件的数据记录
除了使用 VALUES 关键字插入一行或多行数据外,也可以把要写入的数据放在 CSV 文件中(英文逗号分隔、英文单引号括住每个值)供 SQL 指令读取。其中 CSV 文件无需表头。例如,如果 /tmp/csvfile.csv 文件的内容为:
......@@ -99,51 +113,19 @@ INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) VALUES ('202
那么通过如下指令可以把这个文件中的数据写入子表中:
```
```sql
INSERT INTO d1001 FILE '/tmp/csvfile.csv';
```
## 插入来自文件的数据记录,并自动建表
从 2.1.5.0 版本开始,支持在插入来自 CSV 文件的数据时,以超级表为模板来自动创建不存在的数据表。例如:
```
```sql
INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) FILE '/tmp/csvfile.csv';
```
也可以在一条语句中向多个表以自动建表的方式插入记录。例如:
```
```sql
INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) FILE '/tmp/csvfile_21001.csv'
d21002 USING meters (groupId) TAGS (2) FILE '/tmp/csvfile_21002.csv';
```
## 历史记录写入
可使用 IMPORT 或者 INSERT 命令,IMPORT 的语法,功能与 INSERT 完全一样。
针对 insert 类型的 SQL 语句,我们采用的流式解析策略,在发现后面的错误之前,前面正确的部分 SQL 仍会执行。下面的 SQL 中,INSERT 语句是无效的,但是 d1001 仍会被创建。
```
taos> CREATE TABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS(location BINARY(30), groupId INT);
Query OK, 0 row(s) affected (0.008245s)
taos> SHOW STABLES;
name | created_time | columns | tags | tables |
============================================================================================
meters | 2020-08-06 17:50:27.831 | 4 | 2 | 0 |
Query OK, 1 row(s) in set (0.001029s)
taos> SHOW TABLES;
Query OK, 0 row(s) in set (0.000946s)
taos> INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('a');
DB error: invalid SQL: 'a' (invalid timestamp) (0.039494s)
taos> SHOW TABLES;
table_name | created_time | columns | stable_name |
======================================================================================================
d1001 | 2020-08-06 17:52:02.097 | 4 | meters |
Query OK, 1 row(s) in set (0.001091s)
```
此差异已折叠。
......@@ -5,8 +5,6 @@ title: "删除数据"
---
删除数据是 TDengine 提供的根据指定时间段删除指定表或超级表中数据记录的功能,方便用户清理由于设备故障等原因产生的异常数据。
注意:本功能只在企业版 2.6.0.0 及以后的版本中提供,如需此功能请点击下面的链接访问[企业版产品](https://www.taosdata.com/products#enterprise-edition-link)
**语法:**
......@@ -17,21 +15,21 @@ DELETE FROM [ db_name. ] tb_name [WHERE condition];
**功能:** 删除指定表或超级表中的数据记录
**参数:**
- `db_name` : 可选参数,指定要删除表所在的数据库名,不填写则在当前数据库中
- `tb_name` : 必填参数,指定要删除数据的表名,可以是普通表、子表,也可以是超级表。
- `condition`: 可选参数,指定删除数据的过滤条件,不指定过滤条件则为表中所有数据,请慎重使用。特别说明,这里的where 条件中只支持对第一列时间列的过滤,如果是超级表,支持对 tag 列过滤。
- `db_name` : 可选参数,指定要删除表所在的数据库名,不填写则在当前数据库中
- `tb_name` : 必填参数,指定要删除数据的表名,可以是普通表、子表,也可以是超级表。
- `condition`: 可选参数,指定删除数据的过滤条件,不指定过滤条件则为表中所有数据,请慎重使用。特别说明,这里的 where 条件中只支持对第一列时间列的过滤。
**特别说明:**
数据删除后不可恢复,请慎重使用。为了确保删除的数据确实是自己要删除的,建议可以先使用 `select` 语句加 `where` 后的删除条件查看要删除的数据内容,确认无误后再执行 `delete` 命令。
数据删除后不可恢复,请慎重使用。为了确保删除的数据确实是自己要删除的,建议可以先使用 `select` 语句加 `where` 后的删除条件查看要删除的数据内容,确认无误后再执行 `delete` 命令。
**示例:**
`meters` 是一个超级表,`groupid` 是 int 类型的 tag 列,现在要删除 `meters` 表中时间小于 2021-10-01 10:40:00.100 且 tag 列 `groupid` 值为 1 的所有数据,sql 如下:
`meters` 是一个超级表,`groupid` 是 int 类型的 tag 列,现在要删除 `meters` 表中时间小于 2021-10-01 10:40:00.100 的所有数据,sql 如下:
```sql
delete from meters where ts < '2021-10-01 10:40:00.100' and groupid=1 ;
delete from meters where ts < '2021-10-01 10:40:00.100' ;
```
执行后显示结果为:
......
......@@ -12,16 +12,16 @@ TDengine 提供的特色查询包括标签切分查询和窗口切分查询。
超级表查询中,当需要针对标签进行数据切分然后在切分出的数据空间内再进行一系列的计算时使用标签切分子句,标签切分的语句如下:
```sql
PARTITION BY tag_list
PARTITION BY part_list
```
其中 `tag_list` 是标签列的列表,还可以包括 tbname 伪列
part_list 可以是任意的标量表达式,包括列、常量、标量函数和它们的组合
TDengine 按如下方式处理标签切分子句:
当 PARTITION BY 和标签一起使用时,TDengine 按如下方式处理标签切分子句:
标签切分子句位于 `WHERE` 子句之后,且不能和 `JOIN` 子句一起使用。
标签切分子句将超级表数据按指定的标签组合进行切分,然后对每个切分的分片进行指定的计算。计算由之后的子句定义(窗口子句、`GROUP BY` 子句或`SELECT` 子句)。
标签切分子句可以和窗口切分子句(或 `GROUP BY` 子句)一起使用,此时后面的子句作用在每个切分的分片上。例如,下面的示例将数据按标签 `location` 进行分组,并对每个组按 10 分钟进行降采样,取其最大值。
- 标签切分子句位于 WHERE 子句之后,且不能和 JOIN 子句一起使用。
- 标签切分子句将超级表数据按指定的标签组合进行切分,每个切分的分片进行指定的计算。计算由之后的子句定义(窗口子句、GROUP BY 子句或 SELECT 子句)。
- 标签切分子句可以和窗口切分子句(或 GROUP BY 子句)一起使用,此时后面的子句作用在每个切分的分片上。例如,将数据按标签 location 进行分组,并对每个组按 10 分钟进行降采样,取其最大值。
```sql
select max(current) from meters partition by location interval(10m)
......
---
sidebar_label: 流式计算
title: 流式计算
---
在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。用户通常需要在时序数据库之外再搭建 Kafka、Flink、Spark 等流计算处理引擎,增加了用户的开发成本和维护成本。
使用 TDengine 3.0 的流式计算引擎能够最大限度的减少对这些额外中间件的依赖,真正将数据的写入、预处理、长期存储、复杂分析、实时计算、实时报警触发等功能融为一体,并且,所有这些任务只需要使用 SQL 完成,极大降低了用户的学习成本、使用成本。
## 创建流式计算
```sql
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {
TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
WATERMARK time
}
```
其中 subquery 是 select 普通查询语法的子集:
```sql
subquery: SELECT [DISTINCT] select_list
from_clause
[WHERE condition]
[PARTITION BY tag_list]
[window_clause]
[group_by_clause]
```
不支持 order_by,limit,slimit,fill 语句
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
```sql
CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
```
## 删除流式计算
```sql
DROP STREAM [IF NOT EXISTS] stream_name
```
仅删除流式计算任务,由流式计算写入的数据不会被删除。
## 展示流式计算
```sql
SHOW STREAMS;
```
## 流式计算的触发模式
在创建流时,可以通过 TRIGGER 指令指定流式计算的触发模式。
对于非窗口计算,流式计算的触发是实时的;对于窗口计算,目前提供 3 种触发模式:
1. AT_ONCE:写入立即触发
2. WINDOW_CLOSE:窗口关闭时触发(窗口关闭由事件时间决定,可配合 watermark 使用,详见《流式计算的乱序数据容忍策略》)
3. MAX_DELAY time:若窗口关闭,则触发计算。若窗口未关闭,且未关闭时长超过 max delay 指定的时间,则触发计算。
由于窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,则事件时间无法更新,可能导致无法得到最新的计算结果。
因此,流式计算提供了以事件时间结合处理时间计算的 MAX_DELAY 触发模式。
MAX_DELAY 模式在窗口关闭时会立即触发计算。此外,当数据写入后,计算触发的时间超过 max delay 指定的时间,则立即触发计算
## 流式计算的乱序数据容忍策略
在创建流时,可以在 stream_option 中指定 watermark。
流式计算通过 watermark 来度量对乱序数据的容忍程度,watermark 默认为 0。
T = 最新事件时间 - watermark
每批到来的数据都会以上述公式更新窗口关闭时间,并将窗口结束时间 < T 的所有打开的窗口关闭,若触发模式为 WINDOW_CLOSE 或 MAX_DELAY,则推送窗口聚合结果。
流式计算的过期数据处理策略
对于已关闭的窗口,再次落入该窗口中的数据被标记为过期数据,对于过期数据,流式计算提供两种处理方式:
1. 直接丢弃:这是常见流式计算引擎提供的默认(甚至是唯一)计算模式
2. 重新计算:从 TSDB 中重新查找对应窗口的所有数据并重新计算得到最新结果
无论在哪种模式下,watermark 都应该被妥善设置,来得到正确结果(直接丢弃模式)或避免频繁触发重算带来的性能开销(重新计算模式)。
## 流式计算的数据填充策略
TODO
## 流式计算与会话窗口(session window)
```sql
window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
}
```
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
## 流式计算的监控与流任务分布查询
TODO
## 流式计算的内存控制与存算分离
TODO
## 流式计算的暂停与恢复
```sql
STOP STREAM stream_name;
RESUME STREAM stream_name;
```
---
sidebar_label: 保留关键字
sidebar_label: 保留关键字
title: TDengine 保留关键字
---
......@@ -58,70 +58,70 @@ title: TDengine 保留关键字
### D
- DATABASE
- DATABASES
- DAYS
- DBS
- DEFERRED
- DATABASE
- DATABASES
- DAYS
- DBS
- DEFERRED
- DELETE
- DELIMITERS
- DESC
- DESCRIBE
- DETACH
- DISTINCT
- DIVIDE
- DNODE
- DNODES
- DOT
- DOUBLE
- DROP
- DESC
- DESCRIBE
- DETACH
- DISTINCT
- DIVIDE
- DNODE
- DNODES
- DOT
- DOUBLE
- DROP
### E
- END
- EQ
- EXISTS
- EXPLAIN
- END
- EQ
- EXISTS
- EXPLAIN
### F
- FAIL
- FILE
- FILL
- FLOAT
- FOR
- FROM
- FSYNC
- FAIL
- FILE
- FILL
- FLOAT
- FOR
- FROM
- FSYNC
### G
- GE
- GLOB
- GE
- GLOB
- GRANTS
- GROUP
- GT
- GROUP
- GT
### H
- HAVING
- HAVING
### I
- ID
- IF
- IGNORE
- IGNORE
- IMMEDIA
- IMPORT
- IN
- IMPORT
- IN
- INITIAL
- INSERT
- INSERT
- INSTEAD
- INT
- INT
- INTEGER
- INTERVA
- INTO
- IS
- ISNULL
- INTO
- IS
- ISNULL
### J
......@@ -130,190 +130,147 @@ title: TDengine 保留关键字
### K
- KEEP
- KEY
- KEY
- KILL
### L
- LE
- LIKE
- LIMIT
- LE
- LIKE
- LIMIT
- LINEAR
- LOCAL
- LP
- LOCAL
- LP
- LSHIFT
- LT
- LT
### M
- MATCH
- MAXROWS
- MINROWS
- MINUS
- MNODES
- MODIFY
- MODULES
- MATCH
- MAXROWS
- MINROWS
- MINUS
- MNODES
- MODIFY
- MODULES
### N
- NE
- NONE
- NOT
- NE
- NONE
- NOT
- NOTNULL
- NOW
- NOW
- NULL
### O
- OF
- OF
- OFFSET
- OR
- ORDER
- OR
- ORDER
### P
- PARTITION
- PASS
- PLUS
- PPS
- PASS
- PLUS
- PPS
- PRECISION
- PREV
- PREV
- PRIVILEGE
### Q
- QTIME
- QTIME
- QUERIE
- QUERY
- QUERY
- QUORUM
### R
- RAISE
- REM
- RAISE
- REM
- REPLACE
- REPLICA
- RESET
- RESET
- RESTRIC
- ROW
- RP
- ROW
- RP
- RSHIFT
### S
- SCORES
- SELECT
- SEMI
- SCORES
- SELECT
- SEMI
- SESSION
- SET
- SHOW
- SLASH
- SET
- SHOW
- SLASH
- SLIDING
- SLIMIT
- SLIMIT
- SMALLIN
- SOFFSET
- STable
- STable
- STableS
- STAR
- STATE
- STAR
- STATE
- STATEMEN
- STATE_WI
- STORAGE
- STREAM
- STREAMS
- STRING
- SYNCDB
- STORAGE
- STREAM
- STREAMS
- STRING
- SYNCDB
### T
- TABLE
- TABLES
- TAG
- TAGS
- TBNAME
- TIMES
- TIMESTAMP
- TINYINT
- TOPIC
- TOPICS
- TRIGGER
- TSERIES
- TABLE
- TABLES
- TAG
- TAGS
- TBNAME
- TIMES
- TIMESTAMP
- TINYINT
- TOPIC
- TOPICS
- TRIGGER
- TSERIES
### U
- UMINUS
- UNION
- UNSIGNED
- UPDATE
- UPLUS
- USE
- USER
- USERS
- USING
- UMINUS
- UNION
- UNSIGNED
- UPDATE
- UPLUS
- USE
- USER
- USERS
- USING
### V
- VALUES
- VARIABLE
- VALUES
- VARIABLE
- VARIABLES
- VGROUPS
- VIEW
- VNODES
- VGROUPS
- VIEW
- VNODES
### W
- WAL
- WHERE
### _
- _C0
- _QSTART
- _QSTOP
- _QDURATION
- _WSTART
- _WSTOP
- _WDURATION
## 特殊说明
### TBNAME
`TBNAME` 可以视为超级表中一个特殊的标签,代表子表的表名。
获取一个超级表所有的子表名及相关的标签信息:
```mysql
SELECT TBNAME, location FROM meters;
```
统计超级表下辖子表数量:
```mysql
SELECT COUNT(TBNAME) FROM meters;
```
以上两个查询均只支持在WHERE条件子句中添加针对标签(TAGS)的过滤条件。例如:
```mysql
taos> SELECT TBNAME, location FROM meters;
tbname | location |
==================================================================
d1004 | California.SanFrancisco |
d1003 | California.SanFrancisco |
d1002 | California.LosAngeles |
d1001 | California.LosAngeles |
Query OK, 4 row(s) in set (0.000881s)
taos> SELECT COUNT(tbname) FROM meters WHERE groupId > 2;
count(tbname) |
========================
2 |
Query OK, 1 row(s) in set (0.001091s)
```
### _QSTART/_QSTOP/_QDURATION
表示查询过滤窗口的起始,结束以及持续时间。
### _WSTART/_WSTOP/_WDURATION
窗口切分聚合查询(例如 interval/session window/state window)中表示每个切分窗口的起始,结束以及持续时间。
### _c0/_ROWTS
_c0 _ROWTS 等价,表示表或超级表的第一列
### \_
- \_C0
- \_QSTART
- \_QSTOP
- \_QDURATION
- \_WSTART
- \_WSTOP
- \_WDURATION
---
sidebar_label: Information内置数据库
title: Information内置数据库
---
---
sidebar_label: 元数据库
title: 元数据库
---
TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数据库元数据、数据库系统信息和状态的访问,例如数据库或表的名称,当前执行的 SQL 语句等。
`INFORMATION_SCHEMA` 是 TDengine 启动时自动创建的数据库,该数据库存储有关 TDengine 维护的所有其他数据库的信息。它包含多个只读表。实际上,这些表都是视图,而不是基表,因此没有与它们关联的文件。所以对这些表只能查询,不能进行 INSERT 等写入操作。
可以使用 USE 语句将 INFORMATION_SCHEMA 设为默认数据库。
INFORMATION_SCHEMA 旨在以一种更一致的方式来提供对 TDengine 支持的各种 SHOW 语句(如 SHOW TABLES、SHOW DATABASES)提供的信息的访问。与 SHOW 语句相比,使用 SELECT ... FROM INFORMATION_SCHEMA.tablename 具有以下优点:
您可以使用 SELECT 语句熟悉的语法,只需要学习一些表名和列名。
您可以对查询结果进行筛选、排序等操作,事实上,您可以使用任意 TDengine 支持的 SELECT 语句对 INFORMATION_SCHEMA 中的表进行查询。
TDengine 在后续演进中可以灵活的添加已有 INFORMATION_SCHEMA 中表的列,而不用担心对既有业务系统造成影响。
此技术与其他数据库系统更具互操作性。例如,Oracle 数据库用户熟悉查询 Oracle 数据字典中的表。
由于 SHOW 语句已经被开发者熟悉的和广泛使用,所以它们仍然是可用的。
本章将详细介绍 `INFORMATION_SCHEMA` 这个内置元数据库中的表和表结构。
## DNODES
提供 dnode 的相关信息。也可以使用 SHOW DNODES 来查询这些信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :------------: | ------------ | --------------------- |
| 1 | vnodes | SMALLINT | dnode 中的 vnode 个数 |
| 2 | support_vnodes | SMALLINT | 支持的 vnode 个数 |
| 3 | status | BINARY(10) | 当前状态 |
| 4 | note | BINARY(256) | 离线原因等信息 |
| 5 | id | SMALLINT | dnode id |
| 6 | endpoint | BINARY(134) | dnode 的地址 |
| 7 | create | TIMESTAMP | 创建时间 |
## MNODES
提供 mnode 的相关信息。也可以使用 SHOW MNODES 来查询这些信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ------------------ |
| 1 | id | SMALLINT | mnode id |
| 2 | endpoint | BINARY(134) | mnode 的地址 |
| 3 | role | BINARY(10) | 当前角色 |
| 4 | role_time | TIMESTAMP | 成为当前角色的时间 |
| 5 | create_time | TIMESTAMP | 创建时间 |
## MODULES
提供组件的相关信息。也可以使用 SHOW MODULES 来查询这些信息
| # | **列名** | **数据类型** | **说明** |
| --- | :------: | ------------ | ---------- |
| 1 | id | SMALLINT | module id |
| 2 | endpoint | BINARY(134) | 组件的地址 |
| 3 | module | BINARY(10) | 组件状态 |
## QNODES
当前系统中 QNODE 的信息。也可以使用 SHOW QNODES 来查询这些信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ------------ |
| 1 | id | SMALLINT | module id |
| 2 | endpoint | BINARY(134) | qnode 的地址 |
| 3 | create_time | TIMESTAMP | 创建时间 |
## USER_DATABASES
提供用户创建的数据库对象的相关信息。也可以使用 SHOW DATABASES 来查询这些信息。
TODO
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ------------------------------------------------ |
| 1 | name | BINARY(32) | 数据库名 |
| 2 | create_time | TIMESTAMP | 创建时间 |
| 3 | ntables | INT | 数据库中表的数量,包含子表和普通表但不包含超级表 |
| 4 | vgroups | INT | 数据库中有多少个 vgroup |
| 5 | replica | INT | 副本数 |
| 6 | quorum | INT | 写成功的确认数 |
| 7 | days | INT | 单文件存储数据的时间跨度 |
| 8 | keep | INT | 数据保留时长 |
| 9 | buffer | INT | 每个 vnode 写缓存的内存块大小,单位 MB |
| 10 | minrows | INT | 文件块中记录的最大条数 |
| 11 | maxrows | INT | 文件块中记录的最小条数 |
| 12 | wallevel | INT | WAL 级别 |
| 13 | fsync | INT | 数据落盘周期 |
| 14 | comp | INT | 数据压缩方式 |
| 15 | precision | BINARY(2) | 时间分辨率 |
| 16 | status | BINARY(10) | 数据库状态 |
## USER_FUNCTIONS
TODO
## USER_INDEXES
提供用户创建的索引的相关信息。也可以使用 SHOW INDEX 来查询这些信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :--------------: | ------------ | ---------------------------------------------------------------------------------- |
| 1 | db_name | BINARY(32) | 包含此索引的表所在的数据库名 |
| 2 | table_name | BINARY(192) | 包含此索引的表的名称 |
| 3 | index_name | BINARY(192) | 索引名 |
| 4 | column_name | BINARY(64) | 建索引的列的列名 |
| 5 | index_type | BINARY(10) | 目前有 SMA 和 FULLTEXT |
| 6 | index_extensions | BINARY(256) | 索引的额外信息。对 SMA 类型的索引,是函数名的列表。对 FULLTEXT 类型的索引为 NULL。 |
## USER_STABLES
提供用户创建的超级表的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :-----------: | ------------ | ------------------------ |
| 1 | stable_name | BINARY(192) | 超级表表名 |
| 2 | db_name | BINARY(64) | 超级表所在的数据库的名称 |
| 3 | create_time | TIMESTAMP | 创建时间 |
| 4 | columns | INT | 列数目 |
| 5 | tags | INT | 标签数目 |
| 6 | last_update | TIMESTAMP | 最后更新时间 |
| 7 | table_comment | BINARY(1024) | 表注释 |
| 8 | watermark | BINARY(64) | 窗口的关闭时间 |
| 9 | max_delay | BINARY(64) | 推送计算结果的最大延迟 |
| 10 | rollup | BINARY(128) | rollup 聚合函数 |
## USER_STREAMS
提供用户创建的流计算的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | --------------------------- |
| 1 | stream_name | BINARY(192) | 流计算名称 |
| 2 | user_name | BINARY(23) | 创建流计算的用户 |
| 3 | dest_table | BINARY(192) | 流计算写入的目标表 |
| 4 | create_time | TIMESTAMP | 创建时间 |
| 5 | sql | BLOB | 创建流计算时提供的 SQL 语句 |
## USER_TABLES
提供用户创建的普通表和子表的相关信息
| # | **列名** | **数据类型** | **说明** |
| --- | :-----------: | ------------ | ---------------- |
| 1 | table_name | BINARY(192) | 表名 |
| 2 | db_name | BINARY(64) | 数据库名 |
| 3 | create_time | TIMESTAMP | 创建时间 |
| 4 | columns | INT | 列数目 |
| 5 | stable_name | BINARY(192) | 所属的超级表表名 |
| 6 | uid | BIGINT | 表 id |
| 7 | vgroup_id | INT | vgroup id |
| 8 | ttl | INT | 表的生命周期 |
| 9 | table_comment | BINARY(1024) | 表注释 |
| 10 | type | BINARY(20) | 表类型 |
## USER_USERS
提供系统中创建的用户的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | -------- |
| 1 | user_name | BINARY(23) | 用户名 |
| 2 | privilege | BINARY(256) | 权限 |
| 3 | create_time | TIMESTAMP | 创建时间 |
## VGROUPS
系统中所有 vgroups 的信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :--------: | ------------ | ---------------------------- |
| 1 | vg_id | INT | vgroup id |
| 2 | db_name | BINARY(32) | 数据库名 |
| 3 | tables | INT | 此 vgroup 内有多少表 |
| 4 | status | BINARY(10) | 此 vgroup 的状态 |
| 5 | onlines | INT | 在线的成员数目 |
| 6 | v1_dnode | INT | 第一个成员所在的 dnode 的 id |
| 7 | v1_status | BINARY(10) | 第一个成员的状态 |
| 8 | v2_dnode | INT | 第二个成员所在的 dnode 的 id |
| 9 | v2_status | BINARY(10) | 第二个成员的状态 |
| 10 | v3_dnode | INT | 第三个成员所在的 dnode 的 id |
| 11 | v3_status | BINARY(10) | 第三个成员的状态 |
| 12 | compacting | INT | compact 状态 |
---
sidebar_label: SHOW 命令
title: 使用 SHOW 命令查看系统元数据
---
除了使用 `select` 语句查询 `INFORMATION_SCHEMA` 数据库中的表获得系统中的各种元数据、系统信息和状态之外,也可以用 `SHOW` 命令来实现同样的目的。
## SHOW ACCOUNTS
```sql
SHOW ACCOUNTS;
```
显示当前系统中所有租户的信息。
注:企业版独有
## SHOW APPS
```sql
SHOW APPS;
```
显示接入集群的应用(客户端)信息。
## SHOW BNODES
```sql
SHOW BNODES;
```
显示当前系统中存在的 BNODE (backup node, 即备份节点)的信息。
## SHOW CLUSTER
```sql
SHOW CLUSTER;
```
显示当前集群的信息
## SHOW CONNECTIONS
```sql
SHOW CONNECTIONS;
```
显示当前系统中存在的连接的信息。
## SHOW CONSUMERS
```sql
SHOW CONSUMERS;
```
显示当前数据库下所有活跃的消费者的信息。
## SHOW CREATE DATABASE
```sql
SHOW CREATE DATABASE db_name;
```
显示 db_name 指定的数据库的创建语句。
## SHOW CREATE STABLE
```sql
SHOW CREATE STABLE [db_name.]stb_name;
```
显示 tb_name 指定的超级表的创建语句
## SHOW CREATE TABLE
```sql
SHOW CREATE TABLE [db_name.]tb_name
```
显示 tb_name 指定的表的创建语句。支持普通表、超级表和子表。
## SHOW DATABASES
```sql
SHOW DATABASES;
```
显示用户定义的所有数据库。
## SHOW DNODES
```sql
SHOW DNODES;
```
显示当前系统中 DNODE 的信息。
## SHOW FUNCTIONS
```sql
SHOW FUNCTIONS;
```
显示用户定义的自定义函数。
## SHOW LICENSE
```sql
SHOW LICENSE;
SHOW GRANTS;
```
显示企业版许可授权的信息。
注:企业版独有
## SHOW INDEXES
```sql
SHOW INDEXES FROM tbl_name [FROM db_name];
```
显示已创建的索引。
## SHOW LOCAL VARIABLES
```sql
SHOW LOCAL VARIABLES;
```
显示当前客户端配置参数的运行值。
## SHOW MNODES
```sql
SHOW MNODES;
```
显示当前系统中 MNODE 的信息。
## SHOW MODULES
```sql
SHOW MODULES;
```
显示当前系统中所安装的组件的信息。
## SHOW QNODES
```sql
SHOW QNODES;
```
显示当前系统中 QNODE (查询节点)的信息。
## SHOW SCORES
```sql
SHOW SCORES;
```
显示系统被许可授权的容量的信息。
注:企业版独有
## SHOW SNODES
```sql
SHOW SNODES;
```
显示当前系统中 SNODE (流计算节点)的信息。
## SHOW STABLES
```sql
SHOW [db_name.]STABLES [LIKE 'pattern'];
```
显示当前数据库下的所有超级表的信息。可以使用 LIKE 对表名进行模糊匹配。
## SHOW STREAMS
```sql
SHOW STREAMS;
```
显示当前系统内所有流计算的信息。
## SHOW SUBSCRIPTIONS
```sql
SHOW SUBSCRIPTIONS;
```
显示当前数据库下的所有的订阅关系
## SHOW TABLES
```sql
SHOW [db_name.]TABLES [LIKE 'pattern'];
```
显示当前数据库下的所有普通表和子表的信息。可以使用 LIKE 对表名进行模糊匹配。
## SHOW TABLE DISTRIBUTED
```sql
SHOW TABLE DISTRIBUTED table_name;
```
显示表的数据分布信息。
## SHOW TAGS
```sql
SHOW TAGS FROM child_table_name [FROM db_name];
```
显示子表的标签信息。
## SHOW TOPICS
```sql
SHOW TOPICS;
```
显示当前数据库下的所有主题的信息。
## SHOW TRANSACTIONS
```sql
SHOW TRANSACTIONS;
```
显示当前系统中正在执行的事务的信息
## SHOW USERS
```sql
SHOW USERS;
```
显示当前系统中所有用户的信息。包括用户自定义的用户和系统默认用户。
## SHOW VARIABLES
```sql
SHOW VARIABLES;
SHOW DNODE dnode_id VARIABLES;
```
显示当前系统中各节点需要相同的配置参数的运行值,也可以指定 DNODE 来查看其的配置参数。
## SHOW VGROUPS
```sql
SHOW [db_name.]VGROUPS;
```
显示当前系统中所有 VGROUP 或某个 db 的 VGROUPS 的信息。
## SHOW VNODES
```sql
SHOW VNODES [dnode_name];
```
显示当前系统中所有 VNODE 或某个 DNODE 的 VNODE 的信息。
---
sidebar_label: 权限管理
title: 权限管理
---
本节讲述如何在 TDengine 中进行权限管理的相关操作。
## 创建用户
```sql
CREATE USER use_name PASS password;
```
创建用户。
use_name最长为23字节。
password最长为128字节,合法字符包括"a-zA-Z0-9!?$%^&*()_–+={[}]:;@~#|<,>.?/",不可以出现单双引号、撇号、反斜杠和空格,且不可以为空。
## 删除用户
```sql
DROP USER user_name;
```
## 授权
```sql
GRANT privileges ON priv_level TO user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.*
| *.*
}
```
对用户授权。
授权级别支持到DATABASE,权限有READ和WRITE两种。
TDengine 有超级用户和普通用户两类用户。超级用户缺省创建为root,拥有所有权限。使用超级用户创建出来的用户为普通用户。在未授权的情况下,普通用户可以创建DATABASE,并拥有自己创建的DATABASE的所有权限,包括删除数据库、修改数据库、查询时序数据和写入时序数据。超级用户可以给普通用户授予其他DATABASE的读写权限,使其可以在此DATABASE上读写数据,但不能对其进行删除和修改数据库的操作。
对于非DATABASE的对象,如USER、DNODE、UDF、QNODE等,普通用户只有读权限(一般为SHOW命令),不能创建和修改。
## 撤销授权
```sql
REVOKE privileges ON priv_level FROM user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.*
| *.*
}
```
收回对用户的授权。
\ No newline at end of file
......@@ -1154,6 +1154,10 @@ typedef struct {
int32_t numOfRetensions;
SArray* pRetensions; // SRetention
void* pTsma;
int32_t walRetentionPeriod;
int64_t walRetentionSize;
int32_t walRollPeriod;
int64_t walSegmentSize;
} SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......
......@@ -103,8 +103,8 @@ typedef struct SWal {
int32_t fsyncSeq;
// meta
SWalVer vers;
TdFilePtr pWriteLogTFile;
TdFilePtr pWriteIdxTFile;
TdFilePtr pLogFile;
TdFilePtr pIdxFile;
int32_t writeCur;
SArray *fileInfoSet; // SArray<SWalFileInfo>
// status
......
......@@ -63,6 +63,7 @@ extern int32_t metaDebugFlag;
extern int32_t udfDebugFlag;
extern int32_t smaDebugFlag;
extern int32_t idxDebugFlag;
extern int32_t tdbDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles);
void taosCloseLog();
......
......@@ -316,6 +316,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, 0) != 0) return -1;
return 0;
}
......@@ -506,6 +507,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
udfDebugFlag = cfgGetItem(pCfg, "udfDebugFlag")->i32;
smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32;
idxDebugFlag = cfgGetItem(pCfg, "idxDebugFlag")->i32;
tdbDebugFlag = cfgGetItem(pCfg, "tdbDebugFlag")->i32;
}
static int32_t taosSetClientCfg(SConfig *pCfg) {
......@@ -950,6 +952,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1;
}
} else if (strcasecmp("tdbDebugFlag", name) == 0) {
tdbDebugFlag = cfgGetItem(pCfg, "tdbDebugFlag")->i32;
} else if (strcasecmp("telemetryReporting", name) == 0) {
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
} else if (strcasecmp("telemetryInterval", name) == 0) {
......@@ -1151,14 +1155,14 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
}
const char *options[] = {
"dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag",
"tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tmrDebugFlag",
"uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag",
"dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag",
"tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag",
"tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag",
};
int32_t *optionVars[] = {
&dDebugFlag, &vDebugFlag, &mDebugFlag, &wDebugFlag, &sDebugFlag, &tsdbDebugFlag,
&tqDebugFlag, &fsDebugFlag, &udfDebugFlag, &smaDebugFlag, &idxDebugFlag, &tmrDebugFlag,
&uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag,
&dDebugFlag, &vDebugFlag, &mDebugFlag, &wDebugFlag, &sDebugFlag, &tsdbDebugFlag,
&tqDebugFlag, &fsDebugFlag, &udfDebugFlag, &smaDebugFlag, &idxDebugFlag, &tdbDebugFlag,
&tmrDebugFlag, &uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag,
};
int32_t optionSize = tListLen(options);
......@@ -1204,5 +1208,6 @@ void taosSetAllDebugFlag(int32_t flag) {
taosSetDebugFlag(&udfDebugFlag, "udfDebugFlag", flag);
taosSetDebugFlag(&smaDebugFlag, "smaDebugFlag", flag);
taosSetDebugFlag(&idxDebugFlag, "idxDebugFlag", flag);
taosSetDebugFlag(&tdbDebugFlag, "tdbDebugFlag", flag);
uInfo("all debug flag are set to %d", flag);
}
......@@ -3750,6 +3750,10 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen));
if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI64(&encoder, pReq->walRetentionSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
if (tEncodeI64(&encoder, pReq->walSegmentSize) < 0) return -1;
tEndEncode(&encoder);
......@@ -3818,6 +3822,11 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeBinary(&decoder, (uint8_t **)&pReq->pTsma, NULL) < 0) return -1;
}
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->walRetentionSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->walSegmentSize) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......
......@@ -160,6 +160,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
}
pCfg->walCfg.vgId = pCreate->vgId;
pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
pCfg->walCfg.segSize = pCreate->walSegmentSize;
pCfg->walCfg.level = pCreate->walLevel;
pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd;
pCfg->hashMethod = pCreate->hashMethod;
......
......@@ -302,9 +302,13 @@ typedef struct {
int8_t strict;
int8_t hashMethod; // default is 1
int8_t cacheLast;
int8_t schemaless;
int32_t numOfRetensions;
SArray* pRetensions;
int8_t schemaless;
int32_t walRetentionPeriod;
int64_t walRetentionSize;
int32_t walRollPeriod;
int64_t walSegmentSize;
} SDbCfg;
typedef struct {
......
......@@ -120,6 +120,10 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER)
}
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.schemaless, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRetentionPeriod, _OVER)
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walRetentionSize, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRollPeriod, _OVER)
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walSegmentSize, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
......@@ -199,6 +203,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
}
}
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.schemaless, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRetentionPeriod, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walRetentionSize, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRollPeriod, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walSegmentSize, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pDb->lock);
......@@ -318,6 +326,10 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
if (pCfg->walRetentionPeriod < TSDB_DB_MIN_WAL_RETENTION_PERIOD) return -1;
if (pCfg->walRetentionSize < TSDB_DB_MIN_WAL_RETENTION_SIZE) return -1;
if (pCfg->walRollPeriod < TSDB_DB_MIN_WAL_ROLL_PERIOD) return -1;
if (pCfg->walSegmentSize < TSDB_DB_MIN_WAL_SEGMENT_SIZE) return -1;
terrno = 0;
return terrno;
......@@ -345,6 +357,12 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_SIZE;
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
if (pCfg->walRetentionPeriod < 0 && pCfg->walRetentionPeriod != -1)
pCfg->walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD;
if (pCfg->walRetentionSize < 0 && pCfg->walRetentionSize != -1)
pCfg->walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE;
if (pCfg->walRollPeriod < 0) pCfg->walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD;
if (pCfg->walSegmentSize < 0) pCfg->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
}
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
......@@ -457,6 +475,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.cacheLast = pCreate->cacheLast,
.hashMethod = 1,
.schemaless = pCreate->schemaless,
.walRetentionPeriod = pCreate->walRetentionPeriod,
.walRetentionSize = pCreate->walRetentionSize,
.walRollPeriod = pCreate->walRollPeriod,
.walSegmentSize = pCreate->walSegmentSize,
};
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
......
......@@ -788,9 +788,9 @@ _OVER:
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
const char *options[] = {
"debugFlag", "dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag",
"tsdbDebugFlag", "tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag",
"tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag",
"debugFlag", "dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag",
"tsdbDebugFlag", "tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag",
"tdbDebugFlag", "tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag",
};
int32_t optionSize = tListLen(options);
......@@ -813,7 +813,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
SEpSet epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SDCfgDnodeReq dcfgReq = {0};
if (strcasecmp(cfgReq.config, "resetlog") == 0) {
strcpy(dcfgReq.config, "resetlog");
......@@ -839,7 +838,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (strncasecmp(cfgReq.config, optName, optLen) != 0) continue;
const char *value = cfgReq.value;
int32_t flag = atoi(value);
int32_t flag = atoi(value);
if (flag <= 0) {
flag = atoi(cfgReq.config + optLen + 1);
}
......
......@@ -230,6 +230,10 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.standby = standby;
createReq.isTsma = pVgroup->isTsma;
createReq.pTsma = pVgroup->pTsma;
createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
createReq.walRetentionSize = pDb->cfg.walRetentionSize;
createReq.walRollPeriod = pDb->cfg.walRollPeriod;
createReq.walSegmentSize = pDb->cfg.walSegmentSize;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &createReq.replicas[v];
......
......@@ -371,8 +371,8 @@ struct SBlockIdx {
struct SMapData {
int32_t nItem;
int32_t *aOffset;
int32_t nData;
int32_t *aOffset;
uint8_t *pData;
};
......
......@@ -180,11 +180,41 @@ int metaClose(SMeta *pMeta) {
return 0;
}
int32_t metaRLock(SMeta *pMeta) { return taosThreadRwlockRdlock(&pMeta->lock); }
int32_t metaRLock(SMeta *pMeta) {
int32_t ret = 0;
int32_t metaWLock(SMeta *pMeta) { return taosThreadRwlockWrlock(&pMeta->lock); }
metaDebug("meta rlock %p B", &pMeta->lock);
int32_t metaULock(SMeta *pMeta) { return taosThreadRwlockUnlock(&pMeta->lock); }
ret = taosThreadRwlockRdlock(&pMeta->lock);
metaDebug("meta rlock %p E", &pMeta->lock);
return ret;
}
int32_t metaWLock(SMeta *pMeta) {
int32_t ret = 0;
metaDebug("meta wlock %p B", &pMeta->lock);
ret = taosThreadRwlockWrlock(&pMeta->lock);
metaDebug("meta wlock %p E", &pMeta->lock);
return ret;
}
int32_t metaULock(SMeta *pMeta) {
int32_t ret = 0;
metaDebug("meta ulock %p B", &pMeta->lock);
ret = taosThreadRwlockUnlock(&pMeta->lock);
metaDebug("meta ulock %p E", &pMeta->lock);
return ret;
}
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1;
......@@ -259,7 +289,7 @@ static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STagIdxKey *pTagIdxKey1 = (STagIdxKey *)pKey1;
STagIdxKey *pTagIdxKey2 = (STagIdxKey *)pKey2;
tb_uid_t uid1, uid2;
tb_uid_t uid1 = 0, uid2 = 0;
int c;
// compare suid
......@@ -287,14 +317,15 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
// all not NULL, compr tag vals
c = doCompare(pTagIdxKey1->data, pTagIdxKey2->data, pTagIdxKey1->type, 0);
if (c) return c;
}
if (IS_VAR_DATA_TYPE(pTagIdxKey1->type)) {
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + varDataTLen(pTagIdxKey1->data));
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + varDataTLen(pTagIdxKey2->data));
} else {
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + tDataTypes[pTagIdxKey1->type].bytes);
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + tDataTypes[pTagIdxKey2->type].bytes);
}
// both null or tag values are equal, then continue to compare uids
if (IS_VAR_DATA_TYPE(pTagIdxKey1->type)) {
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + varDataTLen(pTagIdxKey1->data));
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + varDataTLen(pTagIdxKey2->data));
} else {
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + tDataTypes[pTagIdxKey1->type].bytes);
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + tDataTypes[pTagIdxKey2->type].bytes);
}
// compare uid
......
......@@ -583,7 +583,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
tqDebug("vgId:%d, tq try get suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
......
......@@ -24,6 +24,8 @@ void tMapDataReset(SMapData *pMapData) {
void tMapDataClear(SMapData *pMapData) {
tFree((uint8_t *)pMapData->aOffset);
tFree(pMapData->pData);
pMapData->pData = NULL;
pMapData->aOffset = NULL;
}
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
......
......@@ -40,8 +40,8 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
.vgId = -1,
.fsyncPeriod = 0,
.retentionPeriod = -1,
.rollPeriod = -1,
.segSize = -1,
.rollPeriod = 0,
.segSize = 0,
.retentionSize = -1,
.level = TAOS_WAL_WRITE,
},
......
......@@ -206,13 +206,13 @@ static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool
}
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId;
int32_t code = 0;
SRpcMsg *pMsg = NULL;
int32_t arrayPos = 0;
SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg*));
bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId;
int32_t code = 0;
SRpcMsg *pMsg = NULL;
int32_t arrayPos = 0;
SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
for (int32_t msg = 0; msg < numOfMsgs; msg++) {
......@@ -501,34 +501,56 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
}
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), pMsg->msgType,
TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
if (cbMeta.isWeak == 0) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
}
}
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.isWeak == 1) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync pre-commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
}
}
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
......
......@@ -359,6 +359,8 @@ typedef struct STableMergeScanInfo {
// window to check if current data block needs to be loaded.
SInterval interval;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;
typedef struct STagScanInfo {
......
......@@ -2831,6 +2831,13 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
pInfo->sortExecInfo.loops += sortExecInfo.loops;
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
for (int32_t i = 0; i < numReaders; ++i) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
blockDataDestroy(param->inputBlock);
......@@ -2955,7 +2962,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
STableMergeScanInfo* pInfo = pOptr->info;
execInfo->blockRecorder = pInfo->readRecorder;
execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
execInfo->sortExecInfo = pInfo->sortExecInfo;
*pOptrExplain = execInfo;
*len = sizeof(STableMergeScanExecInfo);
......
......@@ -1616,6 +1616,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
}
pOperator->status = OP_RES_TO_RETURN;
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated,
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
......@@ -1628,6 +1629,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->pDelRes->info.rows > 0) {
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
......
......@@ -2259,6 +2259,7 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t currentRow = pBlock->info.rows;
if (0 == pInfo->num) {
colDataAppendNULL(pCol, currentRow);
return 0;
}
......
......@@ -238,6 +238,7 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
......
......@@ -244,22 +244,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
syncNodePreCommit(ths, pAppendEntry, 0);
}
// free memory
......@@ -280,22 +265,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 3;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
syncNodePreCommit(ths, pAppendEntry, 0);
// free memory
syncEntryDestory(pAppendEntry);
......@@ -440,7 +410,7 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
return code;
}
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
......@@ -456,7 +426,7 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 2;
cbMeta.code = code;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
......@@ -594,7 +564,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
......@@ -715,7 +685,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
......@@ -919,7 +889,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
}
// pre commit
code = syncNodePreCommit(ths, pAppendEntry);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// update match index
......@@ -1032,7 +1002,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
}
// pre commit
code = syncNodePreCommit(ths, pAppendEntry);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
syncEntryDestory(pAppendEntry);
......
......@@ -67,11 +67,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
bool agree = syncAgree(pSyncNode, index);
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%" PRId64 ", pSyncNode->commitIndex:%" PRId64, agree,
index, pSyncNode->commitIndex);
}
if (agree) {
// term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
......@@ -82,20 +77,15 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// update commit index
newCommitIndex = index;
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%" PRId64
" commit, pSyncNode->commitIndex:%" PRId64,
newCommitIndex, pSyncNode->commitIndex);
}
syncEntryDestory(pEntry);
break;
} else {
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%" PRIu64
", pSyncNode->pRaftStore->currentTerm:%" PRIu64,
pEntry->term, pSyncNode->pRaftStore->currentTerm);
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%ld, term:%lu", pEntry->index,
pEntry->term);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
}
syncEntryDestory(pEntry);
......@@ -107,10 +97,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex sync commit %" PRId64, newCommitIndex);
}
// update commit index
pSyncNode->commitIndex = newCommitIndex;
......
......@@ -2504,22 +2504,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
}
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
syncNodePreCommit(ths, pEntry, 0);
// if only myself, maybe commit right now
if (ths->replicaNum == 1) {
......@@ -2528,22 +2513,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
} else {
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 1;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
syncNodePreCommit(ths, pEntry, 0);
}
if (pRetIndex != NULL) {
......
......@@ -101,8 +101,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
// open meta
walResetVer(&pWal->vers);
pWal->pWriteLogTFile = NULL;
pWal->pWriteIdxTFile = NULL;
pWal->pLogFile = NULL;
pWal->pIdxFile = NULL;
pWal->writeCur = -1;
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
if (pWal->fileInfoSet == NULL) {
......@@ -179,10 +179,10 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
void walClose(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex);
taosCloseFile(&pWal->pWriteLogTFile);
pWal->pWriteLogTFile = NULL;
taosCloseFile(&pWal->pWriteIdxTFile);
pWal->pWriteIdxTFile = NULL;
taosCloseFile(&pWal->pLogFile);
pWal->pLogFile = NULL;
taosCloseFile(&pWal->pIdxFile);
pWal->pIdxFile = NULL;
walSaveMeta(pWal);
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
......@@ -223,7 +223,7 @@ static void walFsyncAll() {
if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq,
atomic_load_32(&tsWal.seq));
int32_t code = taosFsyncFile(pWal->pWriteLogTFile);
int32_t code = taosFsyncFile(pWal->pLogFile);
if (code != 0) {
wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(code));
......
......@@ -22,8 +22,8 @@
static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
int64_t code = 0;
TdFilePtr pIdxTFile = pWal->pWriteIdxTFile;
TdFilePtr pLogTFile = pWal->pWriteLogTFile;
TdFilePtr pIdxTFile = pWal->pIdxFile;
TdFilePtr pLogTFile = pWal->pLogFile;
// seek position
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
......@@ -68,8 +68,8 @@ int walInitWriteFile(SWal* pWal) {
return -1;
}
// switch file
pWal->pWriteIdxTFile = pIdxTFile;
pWal->pWriteLogTFile = pLogTFile;
pWal->pIdxFile = pIdxTFile;
pWal->pLogFile = pLogTFile;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
return 0;
}
......@@ -78,15 +78,15 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
int code;
TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN];
if (pWal->pWriteLogTFile != NULL) {
code = taosCloseFile(&pWal->pWriteLogTFile);
if (pWal->pLogFile != NULL) {
code = taosCloseFile(&pWal->pLogFile);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
if (pWal->pWriteIdxTFile != NULL) {
code = taosCloseFile(&pWal->pWriteIdxTFile);
if (pWal->pIdxFile != NULL) {
code = taosCloseFile(&pWal->pIdxFile);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -106,7 +106,7 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pIdxTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
pWal->pWriteIdxTFile = NULL;
pWal->pIdxFile = NULL;
return -1;
}
walBuildLogName(pWal, fileFirstVer, fnameStr);
......@@ -114,12 +114,12 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
if (pLogTFile == NULL) {
taosCloseFile(&pIdxTFile);
terrno = TAOS_SYSTEM_ERROR(errno);
pWal->pWriteLogTFile = NULL;
pWal->pLogFile = NULL;
return -1;
}
pWal->pWriteLogTFile = pLogTFile;
pWal->pWriteIdxTFile = pIdxTFile;
pWal->pLogFile = pLogTFile;
pWal->pIdxFile = pIdxTFile;
pWal->writeCur = idx;
return fileFirstVer;
}
......
......@@ -32,8 +32,8 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
}
}
taosCloseFile(&pWal->pWriteLogTFile);
taosCloseFile(&pWal->pWriteIdxTFile);
taosCloseFile(&pWal->pLogFile);
taosCloseFile(&pWal->pIdxFile);
if (pWal->vers.firstVer != -1) {
int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
......@@ -261,14 +261,13 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec();
int64_t minVerToDelete = ver;
void *pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break;
SWalRef *pRef = *(SWalRef **)pIter;
if (pRef->refVer == -1) continue;
minVerToDelete = TMIN(minVerToDelete, pRef->refVer);
ver = TMIN(ver, pRef->refVer);
}
int deleteCnt = 0;
......@@ -277,35 +276,37 @@ int32_t walEndSnapshot(SWal *pWal) {
tmp.firstVer = ver;
// find files safe to delete
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
if (ver >= pInfo->lastVer) {
pInfo++;
}
// iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) ||
(pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
// delete according to file size or close time
deleteCnt++;
newTotSize -= iter->fileSize;
if (pInfo) {
if (ver >= pInfo->lastVer) {
pInfo++;
}
// iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) ||
(pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
// delete according to file size or close time
deleteCnt++;
newTotSize -= iter->fileSize;
}
}
char fnameStr[WAL_FILE_LEN];
// remove file
for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
}
}
char fnameStr[WAL_FILE_LEN];
// remove file
for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
}
// make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1;
pWal->vers.firstVer = -1;
} else {
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
// make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1;
pWal->vers.firstVer = -1;
} else {
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
}
}
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
pWal->totSize = newTotSize;
......@@ -324,34 +325,34 @@ END:
int32_t walRollImpl(SWal *pWal) {
int32_t code = 0;
if (pWal->pWriteIdxTFile != NULL) {
code = taosCloseFile(&pWal->pWriteIdxTFile);
if (pWal->pIdxFile != NULL) {
code = taosCloseFile(&pWal->pIdxFile);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto END;
}
}
if (pWal->pWriteLogTFile != NULL) {
code = taosCloseFile(&pWal->pWriteLogTFile);
if (pWal->pLogFile != NULL) {
code = taosCloseFile(&pWal->pLogFile);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto END;
}
}
TdFilePtr pIdxTFile, pLogTFile;
TdFilePtr pIdxFile, pLogFile;
// create new file
int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
int64_t newFileFirstVer = pWal->vers.lastVer + 1;
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pIdxTFile == NULL) {
walBuildIdxName(pWal, newFileFirstVer, fnameStr);
pIdxFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pIdxFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
code = -1;
goto END;
}
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pLogTFile == NULL) {
walBuildLogName(pWal, newFileFirstVer, fnameStr);
pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pLogFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
code = -1;
goto END;
......@@ -363,8 +364,8 @@ int32_t walRollImpl(SWal *pWal) {
}
// switch file
pWal->pWriteIdxTFile = pIdxTFile;
pWal->pWriteLogTFile = pLogTFile;
pWal->pIdxFile = pIdxFile;
pWal->pLogFile = pLogFile;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
ASSERT(pWal->writeCur >= 0);
......@@ -378,10 +379,10 @@ END:
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset};
int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
int64_t idxOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END);
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
idxOffset);
int64_t size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
// TODO truncate
......@@ -407,7 +408,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
// TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
......@@ -416,7 +417,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
goto END;
}
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
if (taosWriteFile(pWal->pLogFile, (char *)body, bodyLen) != bodyLen) {
// TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
......@@ -456,14 +457,14 @@ int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const vo
return -1;
}
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
......@@ -494,14 +495,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
return -1;
}
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
......@@ -524,7 +525,7 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
void walFsync(SWal *pWal, bool forceFsync) {
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (taosFsyncFile(pWal->pWriteLogTFile) < 0) {
if (taosFsyncFile(pWal->pLogFile) < 0) {
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
strerror(errno));
}
......
......@@ -183,7 +183,7 @@ if $rows != 12800 then
return -1
endi
sql select _rowts, top(c1, 80), tbname, t1, t2 from select_tags_mt0;
sql select ts, top(c1, 80), tbname, t1, t2 from select_tags_mt0 order by ts;
if $rows != 80 then
return -1
endi
......@@ -212,7 +212,7 @@ if $data04 != @abc12@ then
return -1
endi
sql select top(c1, 80), tbname, t1, t2 from select_tags_mt0;
sql select ts, top(c1, 80), tbname, t1, t2 from select_tags_mt0 order by ts;
if $rows != 80 then
return -1
endi
......@@ -241,7 +241,7 @@ if $data04 != @abc12@ then
return -1
endi
sql select bottom(c1, 72), tbname, t1, t2 from select_tags_mt0;
sql select ts, bottom(c1, 72), tbname, t1, t2 from select_tags_mt0 order by ts;
if $rows != 72 then
return -1
endi
......@@ -293,7 +293,7 @@ if $data03 != 15 then
endi
print ====== selectivity+tags+group by tags=======================
sql select first(c1), tbname, t1, t2 from select_tags_mt0 group by tbname;
sql select first(c1), tbname, t1, t2, tbname from select_tags_mt0 group by tbname order by t1;
if $rows != 16 then
return -1
endi
......@@ -327,7 +327,7 @@ if $data04 != @select_tags_tb0@ then
return -1
endi
sql select last_row(ts,c1), tbname, t1, t2 from select_tags_mt0 group by tbname;
sql select last_row(ts,c1), tbname, t1, t2, tbname from select_tags_mt0 group by tbname order by t1;
if $rows != 16 then
return -1
endi
......@@ -361,7 +361,7 @@ if $data04 != @abc0@ then
return -1
endi
sql select tbname,t1,t2 from select_tags_mt0;
sql select distinct tbname,t1,t2 from select_tags_mt0;
if $row != 16 then
return -1
endi
......@@ -411,7 +411,7 @@ if $data11 != @70-01-01 08:01:40.001@ then
return -1
endi
sql select top(c1, 100), tbname, t1, t2 from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by tbname;
sql select ts, top(c1, 100), tbname, t1, t2 from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by tbname order by ts;
if $row != 200 then
return -1
endi
......@@ -448,7 +448,7 @@ if $data04 != @abc0@ then
return -1
endi
sql select top(c1, 2), t2 from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by tbname,t2;
sql select ts, top(c1, 2), t2, tbname, t2 from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by tbname,t2 order by ts;
if $row != 4 then
return -1
endi
......@@ -535,33 +535,13 @@ endi
# slimit /limit
sql select top(c1, 2), t2 from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by tbname,t2 limit 2 offset 1;
sql select ts, top(c1, 2), t2 from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by tbname,t2 limit 2 offset 1;
if $row != 2 then
return -1
endi
if $data00 != @70-01-01 08:01:40.199@ then
return -1
endi
if $data01 != 99 then
return -1
endi
if $data02 != @abc0@ then
return -1
endi
if $data03 != @select_tags_tb0@ then
return -1
endi
if $data04 != @abc0@ then
return -1
endi
print ======= selectivity + tags + group by + tags + filter ===========================
sql select first(c1), t1 from select_tags_mt0 where c1<=2 group by tbname;
sql select first(c1), t1, tbname from select_tags_mt0 where c1<=2 group by tbname order by t1;
if $row != 3 then
return -1
endi
......@@ -602,7 +582,7 @@ if $data22 != @select_tags_tb2@ then
return -1
endi
sql select first(c1), tbname from select_tags_mt0 where c1<=2 interval(1s);
sql select _wstart, first(c1), tbname from select_tags_mt0 where c1<=2 interval(1s);
if $row != 3 then
return -1
endi
......@@ -671,7 +651,7 @@ if $data01 != @70-01-01 08:01:50.001@ then
endi
print ======= selectivity + tags + group by + tags + filter + interval ================
sql select first(c1), t2, t1, tbname from select_tags_mt0 where c1<=2 interval(1d) group by tbname;
sql select _wstart,first(c1), t2, t1, tbname, tbname from select_tags_mt0 where c1<=2 partition by tbname interval(1d) order by t1;
if $row != 3 then
return -1
endi
......@@ -708,7 +688,7 @@ if $data25 != @select_tags_tb2@ then
return -1
endi
sql select top(c1, 5), t2 from select_tags_mt0 where c1<=2 interval(1d) group by tbname;
sql select ts, top(c1, 5), t2, tbname from select_tags_mt0 where c1<=2 partition by tbname interval(1d) order by ts, t2;
if $row != 15 then
return -1
endi
......@@ -746,7 +726,7 @@ if $data93 != @select_tags_tb1@ then
endi
#if data
sql select top(c1, 50), t2, t1, tbname from select_tags_mt0 where c1<=2 interval(1d) group by tbname;
sql select ts, top(c1, 50), t2, t1, tbname, tbname from select_tags_mt0 where c1<=2 partition by tbname interval(1d) order by ts, t2;
if $row != 48 then
return -1
endi
......@@ -831,7 +811,7 @@ endi
print TODO ======= selectivity + tags+ group by + tags + filter + interval + join===========
print ==========================mix tag columns and group by columns======================
sql select top(c1, 100), tbname from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by t3
sql select ts, top(c1, 100), tbname, t3 from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by t3 order by ts, tbname;
if $rows != 100 then
return -1
endi
......@@ -887,9 +867,9 @@ sql_error select twa(c2), tbname from select_tags_mt0;
sql_error select interp(c2), tbname from select_tags_mt0 where ts=100001;
sql_error select t1,t2,tbname from select_tags_mt0 group by tbname;
sql_error select count(tbname) from select_tags_mt0 interval(1d);
sql_error select count(tbname) from select_tags_mt0 group by t1;
sql_error select count(tbname),SUM(T1) from select_tags_mt0 interval(1d);
sql select count(tbname) from select_tags_mt0 interval(1d);
sql select count(tbname) from select_tags_mt0 group by t1;
sql select count(tbname),SUM(T1) from select_tags_mt0 interval(1d);
sql_error select first(c1), count(*), t2, t1, tbname from select_tags_mt0 where c1<=2 interval(1d) group by tbname;
sql_error select ts from select_tags_mt0 interval(1y);
sql_error select count(*), tbname from select_tags_mt0 interval(1y);
......@@ -902,8 +882,8 @@ sql_error select tbname, t1 from select_tags_mt0 interval(1y);
#valid sql: select first(c1), tbname, t1 from select_tags_mt0 group by t2;
print ==================================>TD-4231
sql_error select t1,tbname from select_tags_mt0 where c1<0
sql_error select t1,tbname from select_tags_mt0 where c1<0 and tbname in ('select_tags_tb12')
sql select t1,tbname from select_tags_mt0 where c1<0
sql select t1,tbname from select_tags_mt0 where c1<0 and tbname in ('select_tags_tb12')
sql select tbname from select_tags_mt0 where tbname in ('select_tags_tb12');
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start -v
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== step1: create drop show dnodes
......@@ -21,88 +22,73 @@ if $data(1)[4] != ready then
goto step1
endi
print =============== step2: create db
sql create database db
sql use db
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
sql create table db.c1 using db.stb tags(101, 102, "103")
print =============== step3: alter stb
sql_error alter table db.stb add column ts int
sql alter table db.stb add column c3 int
sql alter table db.stb add column c4 bigint
sql alter table db.stb add column c5 binary(12)
sql alter table db.stb drop column c1
sql alter table db.stb drop column c4
sql alter table db.stb MODIFY column c2 binary(32)
sql alter table db.stb add tag t4 bigint
sql alter table db.stb add tag c1 int
sql alter table db.stb add tag t5 binary(12)
sql alter table db.stb drop tag c1
sql alter table db.stb drop tag t5
sql alter table db.stb MODIFY tag t3 binary(32)
sql alter table db.stb rename tag t1 tx
sql alter table db.stb comment 'abcde' ;
sql drop table db.stb
print =============== step4: alter tb
sql create table tb (ts timestamp, a int)
sql insert into tb values(now-28d, -28)
sql select count(a) from tb
sql alter table tb add column b smallint
sql insert into tb values(now-25d, -25, 0)
sql select count(b) from tb
sql alter table tb add column c tinyint
sql insert into tb values(now-22d, -22, 3, 0)
sql select count(c) from tb
sql alter table tb add column d int
sql insert into tb values(now-19d, -19, 6, 0, 0)
sql select count(d) from tb
sql alter table tb add column e bigint
sql alter table tb add column f float
sql alter table tb add column g double
sql alter table tb add column h binary(10)
sql select count(a), count(b), count(c), count(d), count(e), count(f), count(g), count(h) from tb
sql select * from tb order by ts desc
$tbPrefix = tb
$tbNum = 5
$rowNum = 10
print =============== step5: alter stb and insert data
sql create table stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
sql show db.stables
sql describe stb
sql_error alter table stb add column ts int
sql create table db.ctb using db.stb tags(101, 102, "103")
sql insert into db.ctb values(now, 1, "2")
sql show db.tables
sql select * from db.stb
sql select * from tb
print =============== step2: prepare data
sql create database db vgroups 2
sql use db
sql create table if not exists stb (ts timestamp, tbcol int, tbcol2 float, tbcol3 double) tags (tgcol int unsigned)
sql alter table stb add column c3 int
sql describe stb
sql select * from db.stb
sql select * from tb
sql insert into db.ctb values(now+1s, 1, 2, 3)
sql select * from db.stb
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using stb tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x , $x , $x )
$x = $x + 1
endw
sql alter table db.stb add column c4 bigint
sql select * from db.stb
sql insert into db.ctb values(now+2s, 1, 2, 3, 4)
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , NULL , NULL , NULL )
$i = $i + 1
endw
sql alter table db.stb drop column c1
sql reset query cache
sql select * from tb
sql insert into db.ctb values(now+3s, 2, 3, 4)
sql select * from db.stb
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start -v
sql alter table db.stb add tag t4 bigint
sql select * from db.stb
sql select * from db.stb
sql_error create table db.ctb2 using db.stb tags(101, "102")
sql create table db.ctb2 using db.stb tags(101, 102, "103", 104)
sql insert into db.ctb2 values(now, 1, 2, 3)
print =============== step3: tb
sql select avg(tbcol) from tb1
sql select avg(tbcol) from tb1 where ts <= 1601481840000
sql select avg(tbcol) as b from tb1
sql select avg(tbcol) as b from tb1 interval(1d)
sql select avg(tbcol) as b from tb1 where ts <= 1601481840000 interval(1m)
sql select bottom(tbcol, 2) from tb1 where ts <= 1601481840000
sql select top(tbcol, 2) from tb1 where ts <= 1601481840000
sql select percentile(tbcol, 2) from tb1 where ts <= 1601481840000
sql select leastsquares(tbcol, 1, 1) as b from tb1 where ts <= 1601481840000
sql show table distributed tb1
sql select count(tbcol) as b from tb1 where ts <= 1601481840000 interval(1m)
sql select diff(tbcol) from tb1 where ts <= 1601481840000
sql select diff(tbcol) from tb1 where tbcol > 5 and tbcol < 20
sql select first(tbcol), last(tbcol) as b from tb1 where ts <= 1601481840000 interval(1m)
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), sum(tbcol), stddev(tbcol) from tb1 where ts <= 1601481840000 partition by tgcol interval(1m)
#sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from tb1 where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0)
sql select last_row(*) from tb1 where tbcol > 5 and tbcol < 20
print =============== step6: query data
sql select * from db.stb where tbname = 'ctb2';
print =============== step4: stb
sql select avg(tbcol) as c from stb
sql select avg(tbcol) as c from stb where ts <= 1601481840000
sql select avg(tbcol) as c from stb where tgcol < 5 and ts <= 1601481840000
sql select avg(tbcol) as c from stb interval(1m)
sql select avg(tbcol) as c from stb interval(1d)
sql select avg(tbcol) as b from stb where ts <= 1601481840000 interval(1m)
sql select avg(tbcol) as c from stb group by tgcol
sql select avg(tbcol) as b from stb where ts <= 1601481840000 partition by tgcol interval(1m)
sql show table distributed stb
sql select count(tbcol) as b from stb where ts <= 1601481840000 partition by tgcol interval(1m)
sql select diff(tbcol) from stb where ts <= 1601481840000
sql select first(tbcol), last(tbcol) as c from stb group by tgcol
sql select first(tbcol), last(tbcol) as b from stb where ts <= 1601481840000 and tbcol2 is null partition by tgcol interval(1m)
sql select first(tbcol), last(tbcol) as b from stb where ts <= 1601481840000 partition by tgcol interval(1m)
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), sum(tbcol), stddev(tbcol) from stb where ts <= 1601481840000 partition by tgcol interval(1m)
#sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from stb where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0)
sql select last_row(tbcol), stddev(tbcol) from stb where tbcol > 5 and tbcol < 20 group by tgcol
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
print =============== step1: create drop show dnodes
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ---> dnode not ready!
return -1
endi
sql show dnodes
print ---> $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
print =============== step2: create db
sql create database d1 vgroups 2 buffer 3
sql show databases
sql use d1
sql show vgroups
print =============== step3: create show stable
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
return -1
endi
print =============== step4: create show table
sql create table ct1 using stb tags(1000)
sql create table ct2 using stb tags(2000)
sql create table ct3 using stb tags(3000)
sql show tables
if $rows != 3 then
return -1
endi
print =============== step5: insert data (null / update)
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, NULL)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
sql insert into ct2 values(now+0s, 10, 2.0, 3.0)
sql insert into ct2 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
sql insert into ct3 values('2021-01-01 00:00:00.000', NULL, NULL, 3.0)
sql insert into ct3 values('2022-03-02 16:59:00.010', 3 , 4, 5), ('2022-03-02 16:59:00.010', 33 , 4, 5), ('2022-04-01 16:59:00.011', 4, 4, 5), ('2022-04-01 16:59:00.011', 6, 4, 5), ('2022-03-06 16:59:00.013', 8, 4, 5);
sql insert into ct3 values('2022-03-02 16:59:00.010', 103, 1, 2), ('2022-03-02 16:59:00.010', 303, 3, 4), ('2022-04-01 16:59:00.011', 40, 5, 6), ('2022-04-01 16:59:00.011', 60, 4, 5), ('2022-03-06 16:59:00.013', 80, 4, 5);
print =============== step6: query data=
sql select * from stb where t1 between 1000 and 2500
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
$null=
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 0 then
return -1
endi
if $system_content == $null then
return -1
endi
......@@ -37,6 +37,7 @@ sql show stables
if $rows != 4 then
return -1
endi
sql show stables like 'stb'
print =============== step4: ccreate child table
sql create table c1 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
......
......@@ -105,6 +105,39 @@ sql insert into db.ctb2 values(now, 1, 2, 3)
print =============== step6: query data
sql select * from db.stb where tbname = 'ctb2';
print =============== step7: normal table
sql create database d1 replica 1 duration 7 keep 50
sql use d1
sql create table tb (ts timestamp, a int)
sql insert into tb values(now-28d, -28)
sql alter table tb add column b smallint
sql insert into tb values(now-25d, -25, 0)
sql alter table tb add column c tinyint
sql insert into tb values(now-22d, -22, 3, 0)
sql alter table tb add column d int
sql insert into tb values(now-19d, -19, 6, 0, 0)
sql alter table tb add column e bigint
sql insert into tb values(now-16d, -16, 9, 0, 0, 0)
sql alter table tb add column f float
sql insert into tb values(now-13d, -13, 12, 0, 0, 0, 0)
sql alter table tb add column g double
sql insert into tb values(now-10d, -10, 15, 0, 0, 0, 0, 0)
sql alter table tb add column h binary(10)
sql insert into tb values(now-7d, -7, 18, 0, 0, 0, 0, 0, '0')
sql select count(a), count(b), count(c), count(d), count(e), count(f), count(g), count(h) from d1.tb;
sql alter table tb drop column a
sql insert into tb values(now-4d, 1, 1, 1, 1, 1, 1, '1')
sql alter table tb drop column b
sql insert into tb values(now-3d, 1, 1, 1, 1, 1, '1')
sql alter table tb drop column c
sql insert into tb values(now-2d, 1, 1, 1, 1, '1')
sql alter table tb drop column d
sql insert into tb values(now-1d, 1, 1, 1, '1')
sql alter table tb drop column e
sql insert into tb values(now, 1, 1, '1')
sql select count(h) from tb
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start -v
sql connect
print ======================== create stable
sql create database d1
sql use d1
$x = 0
while $x < 128
$tb = d1.s . $x
sql create table $tb (ts timestamp, i int) tags (j int)
$x = $x + 1
endw
print ======================== describe stables
# TODO : create stable error
$m = 0
while $m < 128
$tb = s . $m
$filter = ' . $tb
$filter = $filter . '
sql show stables like $filter
print sql : show stables like $filter
if $rows != 1 then
print expect 1, actual: $rows
return -1
endi
$m = $m + 1
endw
print ======================== show stables
sql show d1.stables
print num of stables is $rows
if $rows != 128 then
return -1
endi
print ======================== create table
$x = 0
while $x < 424
$tb = d1.t . $x
sql create table $tb using d1.s0 tags( $x )
$x = $x + 1
endw
print ======================== show stables
sql show d1.tables
print num of tables is $rows
if $rows != 424 then
return -1
endi
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
$null=
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 2 then
return -1
endi
if $system_content == $null then
return -1
endi
......@@ -30,7 +30,7 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def csum_query_form(self, col="c1", alias="", table_expr="t1", condition=""):
def csum_query_form(self, col="c1", alias="", table_expr="db.t1", condition=""):
'''
csum function:
......@@ -44,7 +44,7 @@ class TDTestCase:
return f"select csum({col}) {alias} from {table_expr} {condition}"
def checkcsum(self,col="c1", alias="", table_expr="t1", condition="" ):
def checkcsum(self,col="c1", alias="", table_expr="db.t1", condition="" ):
line = sys._getframe().f_back.f_lineno
pre_sql = self.csum_query_form(
col=col, table_expr=table_expr, condition=condition
......@@ -59,11 +59,11 @@ class TDTestCase:
tdSql.checkRows(0)
return
if "order by tbname" in condition:
tdSql.error(self.csum_query_form(
col=col, alias=alias, table_expr=table_expr, condition=condition
))
return
# if "order by tbname" in condition:
# tdSql.error(self.csum_query_form(
# col=col, alias=alias, table_expr=table_expr, condition=condition
# ))
# return
if "group" in condition:
......@@ -123,7 +123,8 @@ class TDTestCase:
return
else:
tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}")
tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)} " )
offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0
pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'):
......@@ -161,12 +162,12 @@ class TDTestCase:
self.checkcsum(**case6)
# case7~8: nested query
# case7 = {"table_expr": "(select c1 from stb1)"}
# self.checkcsum(**case7)
# case8 = {"table_expr": "(select csum(c1) c1 from stb1 group by tbname)"}
# self.checkcsum(**case8)
case7 = {"table_expr": "(select c1 from db.stb1 order by tbname ,ts )"}
self.checkcsum(**case7)
case8 = {"table_expr": "(select csum(c1) c1 from db.t1 partition by tbname)"}
self.checkcsum(**case8)
# case9~10: mix with tbname/ts/tag/col
# case9~10: mix with tbname/ts/tag/col not support , must partition by alias ,such as select tbname ,csum(c1) partition by tbname
# case9 = {"alias": ", tbname"}
# self.checkcsum(**case9)
# case10 = {"alias": ", _c0"}
......@@ -196,37 +197,37 @@ class TDTestCase:
}
self.checkcsum(**case17)
# case18~19: with group by
# case18 = {
# "table_expr": "t1",
# "condition": "group by c6"
# }
# self.checkcsum(**case18)
# case19 = {
# "table_expr": "stb1",
# "condition": "partition by tbname" # partition by tbname
# }
# self.checkcsum(**case19)
# # case20~21: with order by
# case20 = {"condition": "order by ts"}
# self.checkcsum(**case20)
# # case22: with union
# case22 = {
# "condition": "union all select csum(c1) from t2"
# }
# self.checkcsum(**case22)
case18 = {
"table_expr": "db.t1",
"condition": "where c6 <0 partition by c6 order by c6"
}
self.checkcsum(**case18)
case19 = {
"table_expr": "db.t1",
"condition": " " # partition by tbname
}
self.checkcsum(**case19)
# case20~21: with order by
case20 = {"condition": "partition by tbname order by tbname "}
# self.checkcsum(**case20) # order by without order by tbname ,because check will random failed
# case22: with union
case22 = {
"condition": "union all select csum(c1) from db.t2"
}
# self.checkcsum(**case22) union all without check result becasue ,it will random return table_records
# case23: with limit/slimit
case23 = {
"condition": "limit 1"
}
self.checkcsum(**case23)
# case24 = {
# "table_expr": "stb1",
# "condition": "group by tbname slimit 1 soffset 1"
# }
# self.checkcsum(**case24)
case24 = {
"table_expr": "db.t1",
"condition": "partition by tbname "
}
self.checkcsum(**case24)
pass
......@@ -291,17 +292,17 @@ class TDTestCase:
}
tdSql.error(self.csum_query_form(**interval_sql)) # interval
group_normal_col = {
"table_expr": "t1",
"table_expr": "db.t1",
"condition": "group by c6"
}
tdSql.error(self.csum_query_form(**group_normal_col)) # group by normal col
slimit_soffset_sql = {
"table_expr": "stb1",
"table_expr": "db.stb1",
"condition": "group by tbname slimit 1 soffset 1"
}
# tdSql.error(self.csum_query_form(**slimit_soffset_sql))
order_by_tbname_sql = {
"table_expr": "stb1",
"table_expr": "db.stb1",
"condition": "group by tbname order by tbname"
}
tdSql.error(self.csum_query_form(**order_by_tbname_sql))
......@@ -346,8 +347,8 @@ class TDTestCase:
"create stable db.stb2 (ts timestamp, c1 int) tags(st2 int)"
)
for i in range(tbnum):
tdSql.execute(f"create table t{i} using stb1 tags({i})")
tdSql.execute(f"create table tt{i} using stb2 tags({i})")
tdSql.execute(f"create table db.t{i} using db.stb1 tags({i})")
tdSql.execute(f"create table db.tt{i} using db.stb2 tags({i})")
pass
......@@ -364,25 +365,25 @@ class TDTestCase:
tdLog.printNoPrefix("######## insert only NULL test:")
for i in range(tbnum):
tdSql.execute(f"insert into t{i}(ts) values ({nowtime - 5})")
tdSql.execute(f"insert into t{i}(ts) values ({nowtime + 5})")
tdSql.execute(f"insert into db.t{i}(ts) values ({nowtime - 5})")
tdSql.execute(f"insert into db.t{i}(ts) values ({nowtime + 5})")
self.csum_current_query()
self.csum_error_query()
tdLog.printNoPrefix("######## insert data in the range near the max(bigint/double):")
self.csum_test_table(tbnum)
tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
tdSql.execute(f"insert into db.t1(ts, c1,c2,c5,c7) values "
f"({nowtime - (per_table_rows + 1) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})")
tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
tdSql.execute(f"insert into db.t1(ts, c1,c2,c5,c7) values "
f"({nowtime - (per_table_rows + 2) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})")
self.csum_current_query()
self.csum_error_query()
tdLog.printNoPrefix("######## insert data in the range near the min(bigint/double):")
self.csum_test_table(tbnum)
tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
tdSql.execute(f"insert into db.t1(ts, c1,c2,c5,c7) values "
f"({nowtime - (per_table_rows + 1) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {1-2**63})")
tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values "
tdSql.execute(f"insert into db.t1(ts, c1,c2,c5,c7) values "
f"({nowtime - (per_table_rows + 2) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {512-2**63})")
self.csum_current_query()
self.csum_error_query()
......@@ -396,9 +397,9 @@ class TDTestCase:
tdLog.printNoPrefix("######## insert data mix with NULL test:")
for i in range(tbnum):
tdSql.execute(f"insert into t{i}(ts) values ({nowtime})")
tdSql.execute(f"insert into t{i}(ts) values ({nowtime-(per_table_rows+3)*10})")
tdSql.execute(f"insert into t{i}(ts) values ({nowtime+(per_table_rows+3)*10})")
tdSql.execute(f"insert into db.t{i}(ts) values ({nowtime})")
tdSql.execute(f"insert into db.t{i}(ts) values ({nowtime-(per_table_rows+3)*10})")
tdSql.execute(f"insert into db.t{i}(ts) values ({nowtime+(per_table_rows+3)*10})")
self.csum_current_query()
self.csum_error_query()
......@@ -411,65 +412,65 @@ class TDTestCase:
tdDnodes.start(index)
self.csum_current_query()
self.csum_error_query()
tdSql.query("select csum(1) from t1 ")
tdSql.query("select csum(1) from db.t1 ")
tdSql.checkRows(7)
tdSql.checkData(0,0,1)
tdSql.checkData(1,0,2)
tdSql.checkData(2,0,3)
tdSql.checkData(3,0,4)
tdSql.query("select csum(abs(c1))+2 from t1 ")
tdSql.query("select csum(abs(c1))+2 from db.t1 ")
tdSql.checkRows(4)
def csum_support_stable(self):
tdSql.query(" select csum(1) from stb1 ")
tdSql.query(" select csum(1) from db.stb1 ")
tdSql.checkRows(70)
tdSql.query("select csum(c1) from stb1 partition by tbname ")
tdSql.query("select csum(c1) from db.stb1 partition by tbname ")
tdSql.checkRows(40)
tdSql.query("select csum(st1) from stb1 partition by tbname")
tdSql.query("select csum(st1) from db.stb1 partition by tbname")
tdSql.checkRows(70)
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.query("select csum(st1+c1) from db.stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.query("select csum(st1+c1) from db.stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.query("select csum(st1+c1) from db.stb1 partition by tbname")
tdSql.checkRows(40)
# # bug need fix
tdSql.query("select csum(st1+c1) from stb1 partition by tbname slimit 1 ")
tdSql.query("select csum(st1+c1) from db.stb1 partition by tbname slimit 1 ")
tdSql.checkRows(4)
# tdSql.error("select csum(st1+c1) from stb1 partition by tbname limit 1 ")
# tdSql.error("select csum(st1+c1) from db.stb1 partition by tbname limit 1 ")
# bug need fix
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.query("select csum(st1+c1) from db.stb1 partition by tbname")
tdSql.checkRows(40)
# bug need fix
tdSql.query("select tbname , csum(c1) from stb1 partition by tbname")
tdSql.query("select tbname , csum(c1) from db.stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select tbname , csum(st1) from stb1 partition by tbname")
tdSql.query("select tbname , csum(st1) from db.stb1 partition by tbname")
tdSql.checkRows(70)
tdSql.query("select tbname , csum(st1) from stb1 partition by tbname slimit 1")
tdSql.query("select tbname , csum(st1) from db.stb1 partition by tbname slimit 1")
tdSql.checkRows(7)
# partition by tags
tdSql.query("select st1 , csum(c1) from stb1 partition by st1")
tdSql.query("select st1 , csum(c1) from db.stb1 partition by st1")
tdSql.checkRows(40)
tdSql.query("select csum(c1) from stb1 partition by st1")
tdSql.query("select csum(c1) from db.stb1 partition by st1")
tdSql.checkRows(40)
tdSql.query("select st1 , csum(c1) from stb1 partition by st1 slimit 1")
tdSql.query("select st1 , csum(c1) from db.stb1 partition by st1 slimit 1")
tdSql.checkRows(4)
tdSql.query("select csum(c1) from stb1 partition by st1 slimit 1")
tdSql.query("select csum(c1) from db.stb1 partition by st1 slimit 1")
tdSql.checkRows(4)
# partition by col
# tdSql.query("select c1 , csum(c1) from stb1 partition by c1")
# tdSql.query("select c1 , csum(c1) from db.stb1 partition by c1")
# tdSql.checkRows(41)
# tdSql.query("select csum(c1) from stb1 partition by c1")
# tdSql.query("select csum(c1) from db.stb1 partition by c1")
# tdSql.checkRows(41)
# tdSql.query("select c1 , csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.query("select c1 , csum(c1) from db.stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
# tdSql.query("select csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.query("select csum(c1) from db.stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
......
......@@ -193,20 +193,11 @@ class TDTestCase:
tdSql.query("select c1 , DERIVATIVE(c1,2,1) from stb partition by c1 order by c1")
tdSql.checkRows(90)
# bug need fix
# tdSql.checkData(0,1,None)
tdSql.checkData(0,1,None)
# bug need fix
# tdSql.query(" select tbname , max(c1) from stb partition by tbname order by tbname slimit 5 soffset 0 ")
# tdSql.checkRows(5)
# tdSql.query(" select tbname , max(c1) from stb partition by tbname order by tbname slimit 5 soffset 1 ")
# tdSql.checkRows(5)
tdSql.query(" select tbname , max(c1) from stb partition by tbname order by tbname slimit 5 soffset 0 ")
tdSql.checkRows(10)
tdSql.query(" select tbname , max(c1) from sub_stb_1 partition by tbname interval(10s) sliding(5s) ")
......
......@@ -118,7 +118,7 @@ class TDTestCase:
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period -1 wal_retention_size -1"%(dbName, vgroups, replica))
tdLog.debug("complete to create database %s"%(dbName))
return
......
from distutils.log import error
import taos
import sys
import time
import socket
import os
import threading
import subprocess
import platform
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.snapshot = 0
self.replica = 3
self.vgroups = 3
self.ctbNum = 2
self.rowsPerTbl = 2
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 2,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replica)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.asyncInsertDataByInterlace(paraDict)
tmqCom.create_ntable(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=1)
tmqCom.insert_rows_into_ntbl(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"], startTs=paraDict["startTs"], tblNum=1, rows=2) # tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("drop database %s"%paraDict["dbName"])
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
# create and start thread
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 100
showMsg = 1
showRow = 1
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString)
tmqCom.waitSubscriptionExit(tdSql, topicFromStb1)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
self.prepareTestEnv()
# self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -52,7 +52,7 @@ class TDTestCase:
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
......
......@@ -52,7 +52,7 @@ class TDTestCase:
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
......
......@@ -53,7 +53,7 @@ class TDTestCase:
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
......
......@@ -52,7 +52,7 @@ class TDTestCase:
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1, wal_retention_size=-1, wal_retention_period=-1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
......
......@@ -173,7 +173,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
# python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3
python3 ./test.py -f 7-tmq/basic5.py
python3 ./test.py -f 7-tmq/subscribeDb.py
python3 ./test.py -f 7-tmq/subscribeDb0.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册