提交 915d3c42 编写于 作者: wmmhello's avatar wmmhello

fix:conflicts from 3.0

......@@ -22,6 +22,7 @@ mac/
.mypy_cache
*.tmp
*.swp
*.swo
*.orig
src/connector/nodejs/node_modules/
src/connector/nodejs/out/
......
此差异已折叠。
......@@ -3,6 +3,7 @@ title: 立即开始
description: '快速设置 TDengine 环境并体验其高效写入和查询'
---
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](/reference/taosadapter) 提供 [RESTful 接口](/reference/rest-api)
本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。
......
......@@ -6,53 +6,85 @@ description: "创建、删除数据库,查看、修改数据库参数"
## 创建数据库
```
CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1];
```
:::info
1. KEEP 是该数据库的数据保留多长天数,缺省是 3650 天(10 年),数据库会自动删除超过时限的数据;<!-- REPLACE_OPEN_TO_ENTERPRISE__KEEP_PARAM_DESCRIPTION -->
2. UPDATE 标志数据库支持更新相同时间戳数据;(从 2.1.7.0 版本开始此参数支持设为 2,表示允许部分列更新,也即更新数据行时未被设置的列会保留原值。)(从 2.0.8.0 版本开始支持此参数。注意此参数不能通过 `ALTER DATABASE` 指令进行修改。)
1. UPDATE 设为 0 时,表示不允许更新数据,后发送的相同时间戳的数据会被直接丢弃;
2. UPDATE 设为 1 时,表示更新全部列数据,即如果更新一个数据行,其中某些列没有提供取值,那么这些列会被设为 NULL;
3. UPDATE 设为 2 时,表示支持更新部分列数据,即如果更新一个数据行,其中某些列没有提供取值,那么这些列会保持原有数据行中的对应值;
4. 更多关于 UPDATE 参数的用法,请参考[FAQ](/train-faq/faq)
3. 数据库名最大长度为 33;
4. 一条 SQL 语句的最大长度为 65480 个字符;
5. 创建数据库时可用的参数有:
- cache: [详细说明](/reference/config/#cache)
- blocks: [详细说明](/reference/config/#blocks)
- days: [详细说明](/reference/config/#days)
- keep: [详细说明](/reference/config/#keep)
- minRows: [详细说明](/reference/config/#minrows)
- maxRows: [详细说明](/reference/config/#maxrows)
- wal: [详细说明](/reference/config/#wallevel)
- fsync: [详细说明](/reference/config/#fsync)
- update: [详细说明](/reference/config/#update)
- cacheLast: [详细说明](/reference/config/#cachelast)
- replica: [详细说明](/reference/config/#replica)
- quorum: [详细说明](/reference/config/#quorum)
- comp: [详细说明](/reference/config/#comp)
- precision: [详细说明](/reference/config/#precision)
6. 请注意上面列出的所有参数都可以配置在配置文件 `taosd.cfg` 中作为创建数据库时使用的默认配置, `create database` 的参数中明确指定的会覆盖配置文件中的设置。
:::
```sql
CREATE DATABASE [IF NOT EXISTS] db_name [database_options]
database_options:
database_option ...
database_option: {
BUFFER value
| CACHEMODEL {'none' | 'last_row' | 'last_value' | 'both'}
| CACHESIZE value
| COMP {0 | 1 | 2}
| DURATION value
| FSYNC value
| MAXROWS value
| MINROWS value
| KEEP value
| PAGES value
| PAGESIZE value
| PRECISION {'ms' | 'us' | 'ns'}
| REPLICA value
| RETENTIONS ingestion_duration:keep_duration ...
| STRICT {'off' | 'on'}
| WAL {1 | 2}
| VGROUPS value
| SINGLE_STABLE {0 | 1}
| WAL_RETENTION_PERIOD value
| WAL_ROLL_PERIOD value
| WAL_RETENTION_SIZE value
| WAL_SEGMENT_SIZE value
}
```
### 参数说明
- buffer: 一个 VNODE 写入内存池大小,单位为MB,默认为96,最小为3,最大为16384。
- CACHEMODEL:表示是否在内存中缓存子表的最近数据。默认为none。
- none:表示不缓存。
- last_row:表示缓存子表最近一行数据。这将显著改善 LAST_ROW 函数的性能表现。
- last_value:表示缓存子表每一列的最近的非 NULL 值。这将显著改善无特殊影响(WHERE、ORDER BY、GROUP BY、INTERVAL)下的 LAST 函数的性能表现。
- both:表示同时打开缓存最近行和列功能。
- CACHESIZE:表示缓存子表最近数据的内存大小。默认为 1 ,范围是[1, 65536],单位是 MB。
- COMP:表示数据库文件压缩标志位,缺省值为 2,取值范围为 [0, 2]。
- 0:表示不压缩。
- 1:表示一阶段压缩。
- 2:表示两阶段压缩。
- DURATION:数据文件存储数据的时间跨度。可以使用加单位的表示形式,如 DURATION 100h、DURATION 10d等,支持 m(分钟)、h(小时)和 d(天)三个单位。不加时间单位时默认单位为天,如 DURATION 50 表示 50 天。
- FSYNC:当 WAL 参数设置为2时,落盘的周期。默认为3000,单位毫秒。最小为0,表示每次写入立即落盘;最大为180000,即三分钟。
- MAXROWS:文件块中记录的最大条数,默认为4096条。
- MINROWS:文件块中记录的最小条数,默认为100条。
- KEEP:表示数据文件保存的天数,缺省值为 3650,取值范围 [1, 365000],且必须大于或等于 DURATION 参数值。数据库会自动删除保存时间超过KEEP值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持m(分钟)、h(小时)和 d(天)三个单位。也可以不写单位,如 KEEP 50,此时默认单位为天。
- PAGES:一个 VNODE 中元数据存储引擎的缓存页个数,默认为256,最小64。一个 VNODE 元数据存储占用 PAGESIZE * PAGES,默认情况下为1MB内存。
- PAGESIZE:一个 VNODE 中元数据存储引擎的页大小,单位为KB,默认为4 KB。范围为1到16384,即1 KB到16 MB。
- PRECISION:数据库的时间戳精度。ms表示毫秒,us表示微秒,ns表示纳秒,默认ms毫秒。
- REPLICA:表示数据库副本数,取值为1或3,默认为1。在集群中使用,副本数必须小于或等于 DNODE 的数目。
- RETENTIONS:表示数据的聚合周期和保存时长,如RETENTIONS 15s:7d,1m:21d,15m:50d表示数据原始采集周期为15秒,原始数据保存7天;按1分钟聚合的数据保存21天;按15分钟聚合的数据保存50天。目前支持且只支持三级存储周期。
- STRICT:表示数据同步的一致性要求,默认为off。
- on 表示强一致,即运行标准的 raft 协议,半数提交返回成功。
- off表示弱一致,本地提交即返回成功。
- WAL:WAL级别,默认为1。
- 1:写WAL,但不执行fsync。
- 2:写WAL,而且执行fsync。
- VGROUPS:数据库中初始vgroup的数目。
- SINGLE_STABLE:表示此数据库中是否只可以创建一个超级表,用于超级表列非常多的情况。
- 0:表示可以创建多张超级表。
- 1:表示只可以创建一张超级表。
- WAL_RETENTION_PERIOD:wal文件的额外保留策略,用于数据订阅。wal的保存时长,单位为s。默认为0,即落盘后立即删除。-1表示不删除。
- WAL_RETENTION_SIZE:wal文件的额外保留策略,用于数据订阅。wal的保存的最大上限,单位为KB。默认为0,即落盘后立即删除。-1表示不删除。
- WAL_ROLL_PERIOD:wal文件切换时长,单位为s。当wal文件创建并写入后,经过该时间,会自动创建一个新的wal文件。默认为0,即仅在落盘时创建新文件。
- WAL_SEGMENT_SIZE:wal单个文件大小,单位为KB。当前写入文件大小超过上限后会自动创建一个新的wal文件。默认为0,即仅在落盘时创建新文件。
### 创建数据库示例
创建时间精度为纳秒的数据库, 保留 1 年数据:
```sql
CREATE DATABASE test PRECISION 'ns' KEEP 365;
```
## 显示系统当前参数
create database if not exists db vgroups 10 buffer 10
```
SHOW VARIABLES;
```
## 使用数据库
以上示例创建了一个有 10 个 vgroup 名为 db 的数据库, 其中每个 vnode 分配也 10MB 的写入缓存
### 使用数据库
```
USE db_name;
......@@ -63,61 +95,42 @@ USE db_name;
## 删除数据库
```
DROP DATABASE [IF EXISTS] db_name;
DROP DATABASE [IF EXISTS] db_name
```
删除数据库。指定 Database 所包含的全部数据表将被删除,谨慎使用!
删除数据库。指定 Database 所包含的全部数据表将被删除,该数据库的所有 vgroups 也会被全部销毁,请谨慎使用!
## 修改数据库参数
```
ALTER DATABASE db_name COMP 2;
```
COMP 参数是指修改数据库文件压缩标志位,缺省值为 2,取值范围为 [0, 2]。0 表示不压缩,1 表示一阶段压缩,2 表示两阶段压缩。
```
ALTER DATABASE db_name REPLICA 2;
```
REPLICA 参数是指修改数据库副本数,取值范围 [1, 3]。在集群中使用,副本数必须小于或等于 DNODE 的数目。
```
ALTER DATABASE db_name KEEP 365;
```
KEEP 参数是指修改数据文件保存的天数,缺省值为 3650,取值范围 [days, 365000],必须大于或等于 days 参数值。
```
ALTER DATABASE db_name QUORUM 2;
```
QUORUM 参数是指数据写入成功所需要的确认数,取值范围 [1, 2]。对于异步复制,quorum 设为 1,具有 master 角色的虚拟节点自己确认即可。对于同步复制,quorum 设为 2。原则上,Quorum >= 1 并且 Quorum <= replica(副本数),这个参数在启动一个同步模块实例时需要提供。
```
ALTER DATABASE db_name BLOCKS 100;
```
BLOCKS 参数是每个 VNODE (TSDB) 中有多少 cache 大小的内存块,因此一个 VNODE 的用的内存大小粗略为(cache \* blocks)。取值范围 [3, 1000]。
```
ALTER DATABASE db_name CACHELAST 0;
```
CACHELAST 参数控制是否在内存中缓存子表的最近数据。缺省值为 0,取值范围 [0, 1, 2, 3]。其中 0 表示不缓存,1 表示缓存子表最近一行数据,2 表示缓存子表每一列的最近的非 NULL 值,3 表示同时打开缓存最近行和列功能。(从 2.0.11.0 版本开始支持参数值 [0, 1],从 2.1.2.0 版本开始支持参数值 [0, 1, 2, 3]。)
说明:缓存最近行,将显著改善 LAST_ROW 函数的性能表现;缓存每列的最近非 NULL 值,将显著改善无特殊影响(WHERE、ORDER BY、GROUP BY、INTERVAL)下的 LAST 函数的性能表现。
```sql
ALTER DATABASE db_name [alter_database_options]
alter_database_options:
alter_database_option ...
alter_database_option: {
CACHEMODEL {'none' | 'last_row' | 'last_value' | 'both'}
| CACHESIZE value
| FSYNC value
| KEEP value
| WAL value
}
```
:::note
其它参数在3.0.0.0中暂不支持修改
:::tip
以上所有参数修改后都可以用 show databases 来确认是否修改成功。另外,从 2.1.3.0 版本开始,修改这些参数后无需重启服务器即可生效。
:::
## 显示系统所有数据库
## 查看数据库
### 查看系统中的所有数据库
```
SHOW DATABASES;
```
## 显示一个数据库的创建语句
### 显示一个数据库的创建语句
```
SHOW CREATE DATABASE db_name;
......@@ -125,3 +138,4 @@ SHOW CREATE DATABASE db_name;
常用于数据库迁移。对一个已经存在的数据库,返回其创建语句;在另一个集群中执行该语句,就能得到一个设置完全相同的 Database。
### 查看数据库参数
......@@ -2,13 +2,45 @@
title: 表管理
---
## 创建数据表
```
CREATE TABLE [IF NOT EXISTS] tb_name (timestamp_field_name TIMESTAMP, field1_name data_type1 [, field2_name data_type2 ...]);
```
:::info 说明
## 创建表
`CREATE TABLE` 语句用于创建普通表和以超级表为模板创建子表。
```sql
CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definitionn] ...) [table_options]
CREATE TABLE create_subtable_clause
CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definitionn] ...)
[TAGS (create_definition [, create_definitionn] ...)]
[table_options]
create_subtable_clause: {
create_subtable_clause [create_subtable_clause] ...
| [IF NOT EXISTS] [db_name.]tb_name USING [db_name.]stb_name [(tag_name [, tag_name] ...)] TAGS (tag_value [, tag_value] ...)
}
create_definition:
col_name column_definition
column_definition:
type_name [comment 'string_value']
table_options:
table_option ...
table_option: {
COMMENT 'string_value'
| WATERMARK duration[,duration]
| MAX_DELAY duration[,duration]
| ROLLUP(func_name [, func_name] ...)
| SMA(col_name [, col_name] ...)
| TTL value
}
```
**使用说明**
1. 表的第一个字段必须是 TIMESTAMP,并且系统自动将其设为主键;
2. 表名最大长度为 192;
......@@ -18,106 +50,147 @@ CREATE TABLE [IF NOT EXISTS] tb_name (timestamp_field_name TIMESTAMP, field1_nam
6. 为了兼容支持更多形式的表名,TDengine 引入新的转义符 "\`",可以让表名与关键词不冲突,同时不受限于上述表名称合法性约束检查。但是同样具有长度限制要求。使用转义字符以后,不再对转义字符中的内容进行大小写统一。
例如:\`aBc\`\`abc\` 是不同的表名,但是 abc 和 aBc 是相同的表名。
需要注意的是转义字符中的内容必须是可打印字符。
上述的操作逻辑和约束要求与 MySQL 数据的操作一致。
从 2.3.0.0 版本开始支持这种方式。
:::
**参数说明**
1. COMMENT:表注释。可用于超级表、子表和普通表。
2. WATERMARK:指定窗口的关闭时间,默认值为 5 秒,最小单位毫秒,范围为0到15分钟,多个以逗号分隔。只可用于超级表,且只有当数据库使用了RETENTIONS参数时,才可以使用此表参数。
3. MAX_DELAY:用于控制推送计算结果的最大延迟,默认值为 interval 的值(但不能超过最大值),最小单位毫秒,范围为1毫秒到15分钟,多个以逗号分隔。注:不建议 MAX_DELAY 设置太小,否则会过于频繁的推送结果,影响存储和查询性能,如无特殊需求,取默认值即可。只可用于超级表,且只有当数据库使用了RETENTIONS参数时,才可以使用此表参数。
4. ROLLUP:Rollup 指定的聚合函数,提供基于多层级的降采样聚合结果。只可用于超级表。只有当数据库使用了RETENTIONS参数时,才可以使用此表参数。作用于超级表除TS列外的其它所有列,但是只能定义一个聚合函数。 聚合函数支持 avg, sum, min, max, last, first。
5. SMA:Small Materialized Aggregates,提供基于数据块的自定义预计算功能。预计算类型包括MAX、MIN和SUM。可用于超级表/普通表。
6. TTL:Time to Live,是用户用来指定表的生命周期的参数。如果在持续的TTL时间内,都没有数据写入该表,则TDengine系统会自动删除该表。这个TTL的时间只是一个大概时间,我们系统不保证到了时间一定会将其删除,而只保证存在这样一个机制。TTL单位是天,默认为0,表示不限制。用户需要注意,TTL优先级高于KEEP,即TTL时间满足删除机制时,即使当前数据的存在时间小于KEEP,此表也会被删除。只可用于子表和普通表。
### 以超级表为模板创建数据
## 创建子
```
### 创建子表
```sql
CREATE TABLE [IF NOT EXISTS] tb_name USING stb_name TAGS (tag_value1, ...);
```
以指定的超级表为模板,指定 TAGS 的值来创建数据表。
### 创建子表并指定标签的值
### 以超级表为模板创建数据表,并指定具体的 TAGS 列
```
```sql
CREATE TABLE [IF NOT EXISTS] tb_name USING stb_name (tag_name1, ...) TAGS (tag_value1, ...);
```
以指定的超级表为模板,指定一部分 TAGS 列的值来创建数据表(没被指定的 TAGS 列会设为空值)。
说明:从 2.0.17.0 版本开始支持这种方式。在之前的版本中,不允许指定 TAGS 列,而必须显式给出所有 TAGS 列的取值。
以指定的超级表为模板,也可以指定一部分 TAGS 列的值来创建数据表(没被指定的 TAGS 列会设为空值)。
### 批量创建数据
### 批量创建
```
```sql
CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
```
以更快的速度批量创建大量数据表(服务器端 2.0.14 及以上版本)
批量建表方式要求数据表必须以超级表为模板。 在不超出 SQL 语句长度限制的前提下,单条语句中的建表数量建议控制在 1000 ~ 3000 之间,将会获得比较理想的建表速度
:::info
## 修改普通表
1.批量建表方式要求数据表必须以超级表为模板。 2.在不超出 SQL 语句长度限制的前提下,单条语句中的建表数量建议控制在 1000 ~ 3000 之间,将会获得比较理想的建表速度。
```sql
ALTER TABLE [db_name.]tb_name alter_table_clause
alter_table_clause: {
alter_table_options
| ADD COLUMN col_name column_type
| DROP COLUMN col_name
| MODIFY COLUMN col_name column_type
| RENAME COLUMN old_col_name new_col_name
}
alter_table_options:
alter_table_option ...
alter_table_option: {
TTL value
| COMMENT 'string_value'
}
:::
```
## 删除数据表
**使用说明**
对普通表可以进行如下修改操作
1. ADD COLUMN:添加列。
2. DROP COLUMN:删除列。
3. ODIFY COLUMN:修改列定义,如果数据列的类型是可变长类型,那么可以使用此指令修改其宽度,只能改大,不能改小。
4. RENAME COLUMN:修改列名称。
```
DROP TABLE [IF EXISTS] tb_name;
### 增加列
```sql
ALTER TABLE tb_name ADD COLUMN field_name data_type;
```
## 显示当前数据库下的所有数据表信息
### 删除列
```
SHOW TABLES [LIKE tb_name_wildchar];
```sql
ALTER TABLE tb_name DROP COLUMN field_name;
```
显示当前数据库下的所有数据表信息。
## 显示一个数据表的创建语句
### 修改列宽
```sql
ALTER TABLE tb_name MODIFY COLUMN field_name data_type(length);
```
SHOW CREATE TABLE tb_name;
### 修改列名
```sql
ALTER TABLE tb_name RENAME COLUMN old_col_name new_col_name
```
常用于数据库迁移。对一个已经存在的数据表,返回其创建语句;在另一个集群中执行该语句,就能得到一个结构完全相同的数据表。
## 修改子表
ALTER TABLE [db_name.]tb_name alter_table_clause
alter_table_clause: {
alter_table_options
| SET TAG tag_name = new_tag_value
}
alter_table_options:
alter_table_option ...
alter_table_option: {
TTL value
| COMMENT 'string_value'
}
**使用说明**
1. 对子表的列和标签的修改,除了更改标签值以外,都要通过超级表才能进行。
## 获取表的结构信息
### 修改子表标签值
```
DESCRIBE tb_name;
ALTER TABLE tb_name SET TAG tag_name=new_tag_value;
```
## 修改表定义
## 删除表
### 表增加列
可以在一条SQL语句中删除一个或多个普通表或子表。
```sql
DROP TABLE [IF EXISTS] [db_name.]tb_name [, [IF EXISTS] [db_name.]tb_name] ...
```
ALTER TABLE tb_name ADD COLUMN field_name data_type;
```
:::info
1. 列的最大个数为 1024,最小个数为 2;(从 2.1.7.0 版本开始,改为最多允许 4096 列)
2. 列名最大长度为 64。
## 查看表的信息
:::
### 显示所有表
### 表删除列
如下SQL语句可以列出当前数据库中的所有表名。
```sql
SHOW TABLES [LIKE tb_name_wildchar];
```
ALTER TABLE tb_name DROP COLUMN field_name;
```
如果表是通过超级表创建,更改表结构的操作只能对超级表进行。同时针对超级表的结构更改对所有通过该结构创建的表生效。对于不是通过超级表创建的表,可以直接修改表结构。
### 表修改列宽
### 显示表创建语句
```
ALTER TABLE tb_name MODIFY COLUMN field_name data_type(length);
SHOW CREATE TABLE tb_name;
```
如果数据列的类型是可变长格式(BINARY 或 NCHAR),那么可以使用此指令修改其宽度(只能改大,不能改小)。(2.1.3.0 版本新增)
如果表是通过超级表创建,更改表结构的操作只能对超级表进行。同时针对超级表的结构更改对所有通过该结构创建的表生效。对于不是通过超级表创建的表,可以直接修改表结构。
常用于数据库迁移。对一个已经存在的数据表,返回其创建语句;在另一个集群中执行该语句,就能得到一个结构完全相同的数据表。
### 修改子表标签值
### 获取表结构信息
```
ALTER TABLE tb_name SET TAG tag_name=new_tag_value;
```
如果表是通过超级表创建,可以使用此指令修改其标签值
DESCRIBE tb_name;
```
\ No newline at end of file
......@@ -3,38 +3,31 @@ sidebar_label: 超级表管理
title: 超级表 STable 管理
---
:::note
在 2.0.15.0 及以后的版本中开始支持 STABLE 保留字。也即,在本节后文的指令说明中,CREATE、DROP、ALTER 三个指令在 2.0.15.0 之前的版本中 STABLE 保留字需写作 TABLE。
:::
## 创建超级表
```sql
CREATE STABLE [IF NOT EXISTS] stb_name (create_definition [, create_definitionn] ...) TAGS (create_definition [, create_definition] ...) [table_options]
create_definition:
col_name column_definition
column_definition:
type_name [COMMENT 'string_value']
```
CREATE STABLE [IF NOT EXISTS] stb_name (timestamp_field_name TIMESTAMP, field1_name data_type1 [, field2_name data_type2 ...]) TAGS (tag1_name tag_type1, tag2_name tag_type2 [, tag3_name tag_type3]);
```
创建 STable,与创建表的 SQL 语法相似,但需要指定 TAGS 字段的名称和类型。
:::info
1. TAGS 列的数据类型不能是 timestamp 类型;(从 2.1.3.0 版本开始,TAGS 列中支持使用 timestamp 类型,但需注意在 TAGS 中的 timestamp 列写入数据时需要提供给定值,而暂不支持四则运算,例如 `NOW + 10s` 这类表达式)
2. TAGS 列名不能与其他列名相同;
3. TAGS 列名不能为预留关键字(参见:[参数限制与保留关键字](/taos-sql/keywords/) 章节);
4. TAGS 最多允许 128 个,至少 1 个,总长度不超过 16 KB。
:::
## 删除超级表
```
DROP STABLE [IF EXISTS] stb_name;
```
**使用说明**
- 超级表中列的最大个数为 4096,需要注意,这里的 4096 是包含 TAG 列在内的,最小个数为 3,包含一个时间戳主键、一个 TAG 列和一个数据列。
- 建表时可以给列或标签附加注释。
- TAGS语法指定超级表的标签列,标签列需要遵循以下约定:
- TAGS 中的 TIMESTAMP 列写入数据时需要提供给定值,而暂不支持四则运算,例如 NOW + 10s 这类表达式。
- TAGS 列名不能与其他列名相同。
- TAGS 列名不能为预留关键字。
- TAGS 最多允许 128 个,至少 1 个,总长度不超过 16 KB。
- 关于表参数的详细说明,参见 CREATE TABLE 中的介绍。
删除 STable 会自动删除通过 STable 创建的子表。
## 查看超级表
## 显示当前数据库下的所有超级表信息
### 显示当前数据库下的所有超级表信息
```
SHOW STABLES [LIKE tb_name_wildcard];
......@@ -42,7 +35,7 @@ SHOW STABLES [LIKE tb_name_wildcard];
查看数据库内全部 STable,及其相关信息,包括 STable 的名称、创建时间、列数量、标签(TAG)数量、通过该 STable 建表的数量。
## 显示一个超级表的创建语句
### 显示一个超级表的创建语句
```
SHOW CREATE STABLE stb_name;
......@@ -50,40 +43,81 @@ SHOW CREATE STABLE stb_name;
常用于数据库迁移。对一个已经存在的超级表,返回其创建语句;在另一个集群中执行该语句,就能得到一个结构完全相同的超级表。
## 获取超级表的结构信息
### 获取超级表的结构信息
```
DESCRIBE stb_name;
```
## 修改超级表普通列
### 超级表增加列
## 删除超级表
```
ALTER STABLE stb_name ADD COLUMN field_name data_type;
DROP STABLE [IF EXISTS] [db_name.]stb_name
```
删除 STable 会自动删除通过 STable 创建的子表以及子表中的所有数据。
## 修改超级表
```sql
ALTER STABLE [db_name.]tb_name alter_table_clause
alter_table_clause: {
alter_table_options
| ADD COLUMN col_name column_type
| DROP COLUMN col_name
| MODIFY COLUMN col_name column_type
| ADD TAG tag_name tag_type
| DROP TAG tag_name
| MODIFY TAG tag_name tag_type
| RENAME TAG old_tag_name new_tag_name
}
alter_table_options:
alter_table_option ...
alter_table_option: {
COMMENT 'string_value'
}
```
### 超级表删除列
**使用说明**
修改超级表的结构会对其下的所有子表生效。无法针对某个特定子表修改表结构。标签结构的修改需要对超级表下发,TDengine 会自动作用于此超级表的所有子表。
- ADD COLUMN:添加列。
- DROP COLUMN:删除列。
- MODIFY COLUMN:修改列定义,如果数据列的类型是可变长类型,那么可以使用此指令修改其宽度,只能改大,不能改小。
- ADD TAG:给超级表添加一个标签。
- DROP TAG:删除超级表的一个标签。从超级表删除某个标签后,该超级表下的所有子表也会自动删除该标签。
- MODIFY TAG:修改超级表的一个标签的定义。如果标签的类型是可变长类型,那么可以使用此指令修改其宽度,只能改大,不能改小。
- RENAME TAG:修改超级表的一个标签的名称。从超级表修改某个标签名后,该超级表下的所有子表也会自动更新该标签名。
### 增加列
```
ALTER STABLE stb_name DROP COLUMN field_name;
ALTER STABLE stb_name ADD COLUMN col_name column_type;
```
### 超级表修改列宽
### 删除列
```
ALTER STABLE stb_name MODIFY COLUMN field_name data_type(length);
ALTER STABLE stb_name DROP COLUMN col_name;
```
如果数据列的类型是可变长格式(BINARY 或 NCHAR),那么可以使用此指令修改其宽度(只能改大,不能改小)。(2.1.3.0 版本新增)
### 修改列宽
```
ALTER STABLE stb_name MODIFY COLUMN col_name data_type(length);
```
## 修改超级表标签列
如果数据列的类型是可变长格式(BINARY 或 NCHAR),那么可以使用此指令修改其宽度(只能改大,不能改小)。
### 添加标签
```
ALTER STABLE stb_name ADD TAG new_tag_name tag_type;
ALTER STABLE stb_name ADD TAG tag_name tag_type;
```
为 STable 增加一个新的标签,并指定新标签的类型。标签总数不能超过 128 个,总长度不超过 16KB 。
......@@ -99,7 +133,7 @@ ALTER STABLE stb_name DROP TAG tag_name;
### 修改标签名
```
ALTER STABLE stb_name CHANGE TAG old_tag_name new_tag_name;
ALTER STABLE stb_name RENAME TAG old_tag_name new_tag_name;
```
修改超级表的标签名,从超级表修改某个标签名后,该超级表下的所有子表也会自动更新该标签名。
......
......@@ -154,11 +154,10 @@ typedef struct SQueryTableDataCond {
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo* colList;
int32_t type; // data block load type:
// int32_t numOfTWindows;
STimeWindow twindows;
int64_t startVersion;
int64_t endVersion;
int32_t type; // data block load type:
STimeWindow twindows;
int64_t startVersion;
int64_t endVersion;
} SQueryTableDataCond;
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
......
......@@ -152,7 +152,10 @@ void taosCfgDynamicOptions(const char *option, const char *value);
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary);
struct SConfig *taosGetCfg();
int32_t taosSetCfg(SConfig *pCfg, char* name);
void taosSetAllDebugFlag(int32_t flag);
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal);
int32_t taosSetCfg(SConfig *pCfg, char *name);
#ifdef __cplusplus
}
......
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "os.h"
#include "taoserror.h"
typedef enum {
TSDB_GRANT_ALL,
......
......@@ -748,6 +748,10 @@ typedef struct {
int8_t ignoreExist;
int32_t numOfRetensions;
SArray* pRetensions; // SRetention
int32_t walRetentionPeriod;
int32_t walRetentionSize;
int32_t walRollPeriod;
int32_t walSegmentSize;
} SCreateDbReq;
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
......@@ -1150,6 +1154,10 @@ typedef struct {
int32_t numOfRetensions;
SArray* pRetensions; // SRetention
void* pTsma;
int32_t walRetentionPeriod;
int64_t walRetentionSize;
int32_t walRollPeriod;
int64_t walSegmentSize;
} SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......@@ -1977,7 +1985,7 @@ typedef struct SVCreateTbReq {
union {
struct {
char* name; // super table name
uint8_t tagNum;
uint8_t tagNum;
tb_uid_t suid;
SArray* tagName;
uint8_t* pTag;
......
......@@ -16,261 +16,265 @@
#ifndef _TD_COMMON_TOKEN_H_
#define _TD_COMMON_TOKEN_H_
#define TK_OR 1
#define TK_AND 2
#define TK_UNION 3
#define TK_ALL 4
#define TK_MINUS 5
#define TK_EXCEPT 6
#define TK_INTERSECT 7
#define TK_NK_BITAND 8
#define TK_NK_BITOR 9
#define TK_NK_LSHIFT 10
#define TK_NK_RSHIFT 11
#define TK_NK_PLUS 12
#define TK_NK_MINUS 13
#define TK_NK_STAR 14
#define TK_NK_SLASH 15
#define TK_NK_REM 16
#define TK_NK_CONCAT 17
#define TK_CREATE 18
#define TK_ACCOUNT 19
#define TK_NK_ID 20
#define TK_PASS 21
#define TK_NK_STRING 22
#define TK_ALTER 23
#define TK_PPS 24
#define TK_TSERIES 25
#define TK_STORAGE 26
#define TK_STREAMS 27
#define TK_QTIME 28
#define TK_DBS 29
#define TK_USERS 30
#define TK_CONNS 31
#define TK_STATE 32
#define TK_USER 33
#define TK_ENABLE 34
#define TK_NK_INTEGER 35
#define TK_SYSINFO 36
#define TK_DROP 37
#define TK_GRANT 38
#define TK_ON 39
#define TK_TO 40
#define TK_REVOKE 41
#define TK_FROM 42
#define TK_NK_COMMA 43
#define TK_READ 44
#define TK_WRITE 45
#define TK_NK_DOT 46
#define TK_DNODE 47
#define TK_PORT 48
#define TK_DNODES 49
#define TK_NK_IPTOKEN 50
#define TK_LOCAL 51
#define TK_QNODE 52
#define TK_BNODE 53
#define TK_SNODE 54
#define TK_MNODE 55
#define TK_DATABASE 56
#define TK_USE 57
#define TK_FLUSH 58
#define TK_TRIM 59
#define TK_IF 60
#define TK_NOT 61
#define TK_EXISTS 62
#define TK_BUFFER 63
#define TK_CACHEMODEL 64
#define TK_CACHESIZE 65
#define TK_COMP 66
#define TK_DURATION 67
#define TK_NK_VARIABLE 68
#define TK_FSYNC 69
#define TK_MAXROWS 70
#define TK_MINROWS 71
#define TK_KEEP 72
#define TK_PAGES 73
#define TK_PAGESIZE 74
#define TK_PRECISION 75
#define TK_REPLICA 76
#define TK_STRICT 77
#define TK_WAL 78
#define TK_VGROUPS 79
#define TK_SINGLE_STABLE 80
#define TK_RETENTIONS 81
#define TK_SCHEMALESS 82
#define TK_NK_COLON 83
#define TK_TABLE 84
#define TK_NK_LP 85
#define TK_NK_RP 86
#define TK_STABLE 87
#define TK_ADD 88
#define TK_COLUMN 89
#define TK_MODIFY 90
#define TK_RENAME 91
#define TK_TAG 92
#define TK_SET 93
#define TK_NK_EQ 94
#define TK_USING 95
#define TK_TAGS 96
#define TK_COMMENT 97
#define TK_BOOL 98
#define TK_TINYINT 99
#define TK_SMALLINT 100
#define TK_INT 101
#define TK_INTEGER 102
#define TK_BIGINT 103
#define TK_FLOAT 104
#define TK_DOUBLE 105
#define TK_BINARY 106
#define TK_TIMESTAMP 107
#define TK_NCHAR 108
#define TK_UNSIGNED 109
#define TK_JSON 110
#define TK_VARCHAR 111
#define TK_MEDIUMBLOB 112
#define TK_BLOB 113
#define TK_VARBINARY 114
#define TK_DECIMAL 115
#define TK_MAX_DELAY 116
#define TK_WATERMARK 117
#define TK_ROLLUP 118
#define TK_TTL 119
#define TK_SMA 120
#define TK_FIRST 121
#define TK_LAST 122
#define TK_SHOW 123
#define TK_DATABASES 124
#define TK_TABLES 125
#define TK_STABLES 126
#define TK_MNODES 127
#define TK_MODULES 128
#define TK_QNODES 129
#define TK_FUNCTIONS 130
#define TK_INDEXES 131
#define TK_ACCOUNTS 132
#define TK_APPS 133
#define TK_CONNECTIONS 134
#define TK_LICENCE 135
#define TK_GRANTS 136
#define TK_QUERIES 137
#define TK_SCORES 138
#define TK_TOPICS 139
#define TK_VARIABLES 140
#define TK_BNODES 141
#define TK_SNODES 142
#define TK_CLUSTER 143
#define TK_TRANSACTIONS 144
#define TK_DISTRIBUTED 145
#define TK_CONSUMERS 146
#define TK_SUBSCRIPTIONS 147
#define TK_LIKE 148
#define TK_INDEX 149
#define TK_FUNCTION 150
#define TK_INTERVAL 151
#define TK_TOPIC 152
#define TK_AS 153
#define TK_WITH 154
#define TK_META 155
#define TK_CONSUMER 156
#define TK_GROUP 157
#define TK_DESC 158
#define TK_DESCRIBE 159
#define TK_RESET 160
#define TK_QUERY 161
#define TK_CACHE 162
#define TK_EXPLAIN 163
#define TK_ANALYZE 164
#define TK_VERBOSE 165
#define TK_NK_BOOL 166
#define TK_RATIO 167
#define TK_NK_FLOAT 168
#define TK_COMPACT 169
#define TK_VNODES 170
#define TK_IN 171
#define TK_OUTPUTTYPE 172
#define TK_AGGREGATE 173
#define TK_BUFSIZE 174
#define TK_STREAM 175
#define TK_INTO 176
#define TK_TRIGGER 177
#define TK_AT_ONCE 178
#define TK_WINDOW_CLOSE 179
#define TK_IGNORE 180
#define TK_EXPIRED 181
#define TK_KILL 182
#define TK_CONNECTION 183
#define TK_TRANSACTION 184
#define TK_BALANCE 185
#define TK_VGROUP 186
#define TK_MERGE 187
#define TK_REDISTRIBUTE 188
#define TK_SPLIT 189
#define TK_SYNCDB 190
#define TK_DELETE 191
#define TK_INSERT 192
#define TK_NULL 193
#define TK_NK_QUESTION 194
#define TK_NK_ARROW 195
#define TK_ROWTS 196
#define TK_TBNAME 197
#define TK_QSTART 198
#define TK_QEND 199
#define TK_QDURATION 200
#define TK_WSTART 201
#define TK_WEND 202
#define TK_WDURATION 203
#define TK_CAST 204
#define TK_NOW 205
#define TK_TODAY 206
#define TK_TIMEZONE 207
#define TK_CLIENT_VERSION 208
#define TK_SERVER_VERSION 209
#define TK_SERVER_STATUS 210
#define TK_CURRENT_USER 211
#define TK_COUNT 212
#define TK_LAST_ROW 213
#define TK_BETWEEN 214
#define TK_IS 215
#define TK_NK_LT 216
#define TK_NK_GT 217
#define TK_NK_LE 218
#define TK_NK_GE 219
#define TK_NK_NE 220
#define TK_MATCH 221
#define TK_NMATCH 222
#define TK_CONTAINS 223
#define TK_JOIN 224
#define TK_INNER 225
#define TK_SELECT 226
#define TK_DISTINCT 227
#define TK_WHERE 228
#define TK_PARTITION 229
#define TK_BY 230
#define TK_SESSION 231
#define TK_STATE_WINDOW 232
#define TK_SLIDING 233
#define TK_FILL 234
#define TK_VALUE 235
#define TK_NONE 236
#define TK_PREV 237
#define TK_LINEAR 238
#define TK_NEXT 239
#define TK_HAVING 240
#define TK_RANGE 241
#define TK_EVERY 242
#define TK_ORDER 243
#define TK_SLIMIT 244
#define TK_SOFFSET 245
#define TK_LIMIT 246
#define TK_OFFSET 247
#define TK_ASC 248
#define TK_NULLS 249
#define TK_ID 250
#define TK_NK_BITNOT 251
#define TK_VALUES 252
#define TK_IMPORT 253
#define TK_NK_SEMI 254
#define TK_FILE 255
#define TK_OR 1
#define TK_AND 2
#define TK_UNION 3
#define TK_ALL 4
#define TK_MINUS 5
#define TK_EXCEPT 6
#define TK_INTERSECT 7
#define TK_NK_BITAND 8
#define TK_NK_BITOR 9
#define TK_NK_LSHIFT 10
#define TK_NK_RSHIFT 11
#define TK_NK_PLUS 12
#define TK_NK_MINUS 13
#define TK_NK_STAR 14
#define TK_NK_SLASH 15
#define TK_NK_REM 16
#define TK_NK_CONCAT 17
#define TK_CREATE 18
#define TK_ACCOUNT 19
#define TK_NK_ID 20
#define TK_PASS 21
#define TK_NK_STRING 22
#define TK_ALTER 23
#define TK_PPS 24
#define TK_TSERIES 25
#define TK_STORAGE 26
#define TK_STREAMS 27
#define TK_QTIME 28
#define TK_DBS 29
#define TK_USERS 30
#define TK_CONNS 31
#define TK_STATE 32
#define TK_USER 33
#define TK_ENABLE 34
#define TK_NK_INTEGER 35
#define TK_SYSINFO 36
#define TK_DROP 37
#define TK_GRANT 38
#define TK_ON 39
#define TK_TO 40
#define TK_REVOKE 41
#define TK_FROM 42
#define TK_NK_COMMA 43
#define TK_READ 44
#define TK_WRITE 45
#define TK_NK_DOT 46
#define TK_DNODE 47
#define TK_PORT 48
#define TK_DNODES 49
#define TK_NK_IPTOKEN 50
#define TK_LOCAL 51
#define TK_QNODE 52
#define TK_BNODE 53
#define TK_SNODE 54
#define TK_MNODE 55
#define TK_DATABASE 56
#define TK_USE 57
#define TK_FLUSH 58
#define TK_TRIM 59
#define TK_IF 60
#define TK_NOT 61
#define TK_EXISTS 62
#define TK_BUFFER 63
#define TK_CACHEMODEL 64
#define TK_CACHESIZE 65
#define TK_COMP 66
#define TK_DURATION 67
#define TK_NK_VARIABLE 68
#define TK_FSYNC 69
#define TK_MAXROWS 70
#define TK_MINROWS 71
#define TK_KEEP 72
#define TK_PAGES 73
#define TK_PAGESIZE 74
#define TK_PRECISION 75
#define TK_REPLICA 76
#define TK_STRICT 77
#define TK_WAL 78
#define TK_VGROUPS 79
#define TK_SINGLE_STABLE 80
#define TK_RETENTIONS 81
#define TK_SCHEMALESS 82
#define TK_WAL_RETENTION_PERIOD 83
#define TK_WAL_RETENTION_SIZE 84
#define TK_WAL_ROLL_PERIOD 85
#define TK_WAL_SEGMENT_SIZE 86
#define TK_NK_COLON 87
#define TK_TABLE 88
#define TK_NK_LP 89
#define TK_NK_RP 90
#define TK_STABLE 91
#define TK_ADD 92
#define TK_COLUMN 93
#define TK_MODIFY 94
#define TK_RENAME 95
#define TK_TAG 96
#define TK_SET 97
#define TK_NK_EQ 98
#define TK_USING 99
#define TK_TAGS 100
#define TK_COMMENT 101
#define TK_BOOL 102
#define TK_TINYINT 103
#define TK_SMALLINT 104
#define TK_INT 105
#define TK_INTEGER 106
#define TK_BIGINT 107
#define TK_FLOAT 108
#define TK_DOUBLE 109
#define TK_BINARY 110
#define TK_TIMESTAMP 111
#define TK_NCHAR 112
#define TK_UNSIGNED 113
#define TK_JSON 114
#define TK_VARCHAR 115
#define TK_MEDIUMBLOB 116
#define TK_BLOB 117
#define TK_VARBINARY 118
#define TK_DECIMAL 119
#define TK_MAX_DELAY 120
#define TK_WATERMARK 121
#define TK_ROLLUP 122
#define TK_TTL 123
#define TK_SMA 124
#define TK_FIRST 125
#define TK_LAST 126
#define TK_SHOW 127
#define TK_DATABASES 128
#define TK_TABLES 129
#define TK_STABLES 130
#define TK_MNODES 131
#define TK_MODULES 132
#define TK_QNODES 133
#define TK_FUNCTIONS 134
#define TK_INDEXES 135
#define TK_ACCOUNTS 136
#define TK_APPS 137
#define TK_CONNECTIONS 138
#define TK_LICENCE 139
#define TK_GRANTS 140
#define TK_QUERIES 141
#define TK_SCORES 142
#define TK_TOPICS 143
#define TK_VARIABLES 144
#define TK_BNODES 145
#define TK_SNODES 146
#define TK_CLUSTER 147
#define TK_TRANSACTIONS 148
#define TK_DISTRIBUTED 149
#define TK_CONSUMERS 150
#define TK_SUBSCRIPTIONS 151
#define TK_LIKE 152
#define TK_INDEX 153
#define TK_FUNCTION 154
#define TK_INTERVAL 155
#define TK_TOPIC 156
#define TK_AS 157
#define TK_WITH 158
#define TK_META 159
#define TK_CONSUMER 160
#define TK_GROUP 161
#define TK_DESC 162
#define TK_DESCRIBE 163
#define TK_RESET 164
#define TK_QUERY 165
#define TK_CACHE 166
#define TK_EXPLAIN 167
#define TK_ANALYZE 168
#define TK_VERBOSE 169
#define TK_NK_BOOL 170
#define TK_RATIO 171
#define TK_NK_FLOAT 172
#define TK_COMPACT 173
#define TK_VNODES 174
#define TK_IN 175
#define TK_OUTPUTTYPE 176
#define TK_AGGREGATE 177
#define TK_BUFSIZE 178
#define TK_STREAM 179
#define TK_INTO 180
#define TK_TRIGGER 181
#define TK_AT_ONCE 182
#define TK_WINDOW_CLOSE 183
#define TK_IGNORE 184
#define TK_EXPIRED 185
#define TK_KILL 186
#define TK_CONNECTION 187
#define TK_TRANSACTION 188
#define TK_BALANCE 189
#define TK_VGROUP 190
#define TK_MERGE 191
#define TK_REDISTRIBUTE 192
#define TK_SPLIT 193
#define TK_SYNCDB 194
#define TK_DELETE 195
#define TK_INSERT 196
#define TK_NULL 197
#define TK_NK_QUESTION 198
#define TK_NK_ARROW 199
#define TK_ROWTS 200
#define TK_TBNAME 201
#define TK_QSTART 202
#define TK_QEND 203
#define TK_QDURATION 204
#define TK_WSTART 205
#define TK_WEND 206
#define TK_WDURATION 207
#define TK_CAST 208
#define TK_NOW 209
#define TK_TODAY 210
#define TK_TIMEZONE 211
#define TK_CLIENT_VERSION 212
#define TK_SERVER_VERSION 213
#define TK_SERVER_STATUS 214
#define TK_CURRENT_USER 215
#define TK_COUNT 216
#define TK_LAST_ROW 217
#define TK_BETWEEN 218
#define TK_IS 219
#define TK_NK_LT 220
#define TK_NK_GT 221
#define TK_NK_LE 222
#define TK_NK_GE 223
#define TK_NK_NE 224
#define TK_MATCH 225
#define TK_NMATCH 226
#define TK_CONTAINS 227
#define TK_JOIN 228
#define TK_INNER 229
#define TK_SELECT 230
#define TK_DISTINCT 231
#define TK_WHERE 232
#define TK_PARTITION 233
#define TK_BY 234
#define TK_SESSION 235
#define TK_STATE_WINDOW 236
#define TK_SLIDING 237
#define TK_FILL 238
#define TK_VALUE 239
#define TK_NONE 240
#define TK_PREV 241
#define TK_LINEAR 242
#define TK_NEXT 243
#define TK_HAVING 244
#define TK_RANGE 245
#define TK_EVERY 246
#define TK_ORDER 247
#define TK_SLIMIT 248
#define TK_SOFFSET 249
#define TK_LIMIT 250
#define TK_OFFSET 251
#define TK_ASC 252
#define TK_NULLS 253
#define TK_ID 254
#define TK_NK_BITNOT 255
#define TK_VALUES 256
#define TK_IMPORT 257
#define TK_NK_SEMI 258
#define TK_FILE 259
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -143,6 +143,7 @@ typedef struct SqlFunctionCtx {
struct SExprInfo *pExpr;
struct SDiskbasedBuf *pBuf;
struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity
int32_t curBufPage;
bool increase;
......
......@@ -74,6 +74,10 @@ typedef struct SDatabaseOptions {
int8_t singleStable;
SNodeList* pRetentions;
int8_t schemaless;
int32_t walRetentionPeriod;
int32_t walRetentionSize;
int32_t walRollPeriod;
int32_t walSegmentSize;
} SDatabaseOptions;
typedef struct SCreateDatabaseStmt {
......
......@@ -104,6 +104,7 @@ typedef struct SJoinLogicNode {
SNode* pMergeCondition;
SNode* pOnConditions;
bool isSingleTableJoin;
EOrder inputTsOrder;
} SJoinLogicNode;
typedef struct SAggLogicNode {
......@@ -201,6 +202,7 @@ typedef struct SWindowLogicNode {
int64_t watermark;
int8_t igExpired;
EWindowAlgorithm windowAlgo;
EOrder inputTsOrder;
} SWindowLogicNode;
typedef struct SFillLogicNode {
......@@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode {
SNode* pTimeSeries; // SColumnNode
} SInterpFuncPhysiNode;
typedef struct SJoinPhysiNode {
typedef struct SSortMergeJoinPhysiNode {
SPhysiNode node;
EJoinType joinType;
SNode* pMergeCondition;
SNode* pOnConditions;
SNodeList* pTargets;
} SJoinPhysiNode;
typedef SJoinPhysiNode SSortMergeJoinPhysiNode;
EOrder inputTsOrder;
} SSortMergeJoinPhysiNode;
typedef struct SAggPhysiNode {
SPhysiNode node;
......
......@@ -255,6 +255,7 @@ typedef struct SSelectStmt {
int32_t selectFuncNum;
bool isEmptyResult;
bool isTimeLineResult;
bool isSubquery;
bool hasAggFuncs;
bool hasRepeatScanFuncs;
bool hasIndefiniteRowsFunc;
......
......@@ -26,7 +26,7 @@ extern "C" {
extern bool gRaftDetailLog;
#define SYNC_RESP_TTL_MS 10000
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_MAX_BATCH_SIZE 500
#define SYNC_INDEX_BEGIN 0
......
......@@ -103,8 +103,8 @@ typedef struct SWal {
int32_t fsyncSeq;
// meta
SWalVer vers;
TdFilePtr pWriteLogTFile;
TdFilePtr pWriteIdxTFile;
TdFilePtr pLogFile;
TdFilePtr pIdxFile;
int32_t writeCur;
SArray *fileInfoSet; // SArray<SWalFileInfo>
// status
......@@ -114,30 +114,41 @@ typedef struct SWal {
int64_t refId;
TdThreadMutex mutex;
// ref
SHashObj *pRefHash; // ref -> SWalRef
SHashObj *pRefHash; // refId -> SWalRef
// path
char path[WAL_PATH_LEN];
// reusable write head
SWalCkHead writeHead;
} SWal; // WAL HANDLE
} SWal;
typedef struct {
int64_t refId;
int64_t refVer;
int64_t refFile;
SWal *pWal;
} SWalRef;
typedef struct {
int8_t scanUncommited;
int8_t scanNotApplied;
int8_t scanMeta;
int8_t enableRef;
} SWalFilterCond;
typedef struct {
SWal *pWal;
int64_t readerId;
TdFilePtr pLogFile;
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int8_t curInvalid;
int8_t curStopped;
TdThreadMutex mutex;
SWalFilterCond cond;
SWalCkHead *pHead;
// TODO remove it
SWalCkHead *pHead;
} SWalReader;
// module initialization
......@@ -156,11 +167,7 @@ int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen);
// This interface assign version automatically and return to caller.
// When using this interface with concurrent writes,
// wal will write all logs atomically,
// but not sure which one will be actually write first,
// and then the unique index of successful writen is returned.
// Assign version automatically and return to caller,
// -1 will be returned for failed writes
int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
......@@ -190,17 +197,15 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
typedef struct {
int64_t refId;
int64_t ver;
} SWalRef;
SWalRef *walRefCommittedVer(SWal *);
SWalRef *walOpenRef(SWal *);
void walCloseRef(SWalRef *);
void walCloseRef(SWal *pWal, int64_t refId);
int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *);
void walUnrefVer(SWalRef *);
// help function for raft
// helper function for raft
bool walLogExist(SWal *, int64_t ver);
bool walIsEmpty(SWal *);
......
此差异已折叠。
......@@ -358,6 +358,15 @@ typedef enum ELogicConditionType {
#define TSDB_DB_SCHEMALESS_OFF 0
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
#define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1
#define TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD 0
#define TSDB_DB_MIN_WAL_RETENTION_SIZE -1
#define TSDB_DEFAULT_DB_WAL_RETENTION_SIZE 0
#define TSDB_DB_MIN_WAL_ROLL_PERIOD 0
#define TSDB_DEFAULT_DB_WAL_ROLL_PERIOD 0
#define TSDB_DB_MIN_WAL_SEGMENT_SIZE 0
#define TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE 0
#define TSDB_MIN_ROLLUP_MAX_DELAY 1 // unit millisecond
#define TSDB_MAX_ROLLUP_MAX_DELAY (15 * 60 * 1000)
#define TSDB_MIN_ROLLUP_WATERMARK 0 // unit millisecond
......
......@@ -67,7 +67,6 @@ extern int32_t idxDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles);
void taosCloseLog();
void taosResetLog();
void taosSetAllDebugFlag(int32_t flag);
void taosDumpData(uint8_t *msg, int32_t len);
void taosPrintLog(const char *flags, ELogLevel level, int32_t dflag, const char *format, ...)
......
......@@ -1750,7 +1750,10 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) {
colLength[i] = htonl(colLength[i]);
ASSERT(colLength[i] < dataLen);
if (colLength[i] >= dataLen) {
tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
ASSERT(0);
}
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->pCol[i].offset = (int32_t*)pStart;
......@@ -2016,7 +2019,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
}
if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
('0' <= *(tbList + i) && '9' >= *(tbList + i))) {
('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
if (vLen[vIdx] > 0) {
goto _return;
}
......
......@@ -973,7 +973,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, NULL, NULL);
code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, pRequest->body.param, NULL);
if (code) {
goto _return;
}
......
......@@ -499,7 +499,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) {
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
SSchemaAction schemaAction;
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
......
......@@ -158,7 +158,7 @@ static const SSysDbTableSchema userTagsSchema[] = {
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "tag_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "tag_type", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "tag_type", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
......
......@@ -1763,9 +1763,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|\n", flag,
len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
pDataBlock->info.uid);
pDataBlock->info.uid, pDataBlock->info.rows);
if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) {
......@@ -1878,7 +1878,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0;
for (int32_t j = 0; j < rows; ++j) { // iterate by row
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen + dataLen)); // set row buf
bool isStartKey = false;
int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
......
......@@ -213,10 +213,6 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg));
if (pCfg->level == 0 && pCfg->primary == 1) {
tstrncpy(tsDataDir, pCfg->dir, PATH_MAX);
if (taosMulMkDir(tsDataDir) != 0) {
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
return -1;
}
}
if (taosMulMkDir(pCfg->dir) != 0) {
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
......@@ -227,12 +223,13 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
if (tsDataDir[0] == 0) {
if (pItem->str != NULL) {
taosAddDataDir(0, pItem->str, 0, 1);
taosAddDataDir(tsDiskCfgNum, pItem->str, 0, 1);
tstrncpy(tsDataDir, pItem->str, PATH_MAX);
if (taosMulMkDir(tsDataDir) != 0) {
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
return -1;
}
tsDiskCfgNum++;
} else {
uError("datadir not set");
return -1;
......@@ -1146,6 +1143,10 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
int32_t monitor = atoi(value);
uInfo("monitor set from %d to %d", tsEnableMonitor, monitor);
tsEnableMonitor = monitor;
SConfigItem *pItem = cfgGetItem(tsCfg, "monitor");
if (pItem != NULL) {
pItem->bval = tsEnableMonitor;
}
return;
}
......@@ -1169,8 +1170,39 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
int32_t flag = atoi(value);
uInfo("%s set from %d to %d", optName, *optionVars[d], flag);
*optionVars[d] = flag;
taosSetDebugFlag(optionVars[d], optName, flag);
return;
}
uError("failed to cfg dynamic option:%s value:%s", option, value);
}
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal) {
SConfigItem *pItem = cfgGetItem(tsCfg, flagName);
if (pItem != NULL) {
pItem->i32 = flagVal;
}
*pFlagPtr = flagVal;
}
void taosSetAllDebugFlag(int32_t flag) {
if (flag <= 0) return;
taosSetDebugFlag(&uDebugFlag, "uDebugFlag", flag);
taosSetDebugFlag(&rpcDebugFlag, "rpcDebugFlag", flag);
taosSetDebugFlag(&jniDebugFlag, "jniDebugFlag", flag);
taosSetDebugFlag(&qDebugFlag, "qDebugFlag", flag);
taosSetDebugFlag(&cDebugFlag, "cDebugFlag", flag);
taosSetDebugFlag(&dDebugFlag, "dDebugFlag", flag);
taosSetDebugFlag(&vDebugFlag, "vDebugFlag", flag);
taosSetDebugFlag(&mDebugFlag, "mDebugFlag", flag);
taosSetDebugFlag(&wDebugFlag, "wDebugFlag", flag);
taosSetDebugFlag(&sDebugFlag, "sDebugFlag", flag);
taosSetDebugFlag(&tsdbDebugFlag, "tsdbDebugFlag", flag);
taosSetDebugFlag(&tqDebugFlag, "tqDebugFlag", flag);
taosSetDebugFlag(&fsDebugFlag, "fsDebugFlag", flag);
taosSetDebugFlag(&udfDebugFlag, "udfDebugFlag", flag);
taosSetDebugFlag(&smaDebugFlag, "smaDebugFlag", flag);
taosSetDebugFlag(&idxDebugFlag, "idxDebugFlag", flag);
uInfo("all debug flag are set to %d", flag);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tgrant.h"
#ifndef _GRANT
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
#endif
\ No newline at end of file
......@@ -2018,6 +2018,10 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
if (tEncodeI8(&encoder, pReq->schemaless) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walSegmentSize) < 0) return -1;
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
......@@ -2060,6 +2064,10 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->schemaless) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walSegmentSize) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
......@@ -3742,6 +3750,10 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen));
if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI64(&encoder, pReq->walRetentionSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
if (tEncodeI64(&encoder, pReq->walSegmentSize) < 0) return -1;
tEndEncode(&encoder);
......@@ -3810,6 +3822,11 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeBinary(&decoder, (uint8_t **)&pReq->pTsma, NULL) < 0) return -1;
}
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->walRetentionSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->walSegmentSize) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......
......@@ -160,6 +160,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
}
pCfg->walCfg.vgId = pCreate->vgId;
pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
pCfg->walCfg.segSize = pCreate->walSegmentSize;
pCfg->walCfg.level = pCreate->walLevel;
pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd;
pCfg->hashMethod = pCreate->hashMethod;
......
......@@ -164,8 +164,8 @@ typedef struct {
int32_t lastErrorNo;
tmsg_t lastMsgType;
SEpSet lastEpset;
char dbname1[TSDB_DB_FNAME_LEN];
char dbname2[TSDB_DB_FNAME_LEN];
char dbname1[TSDB_TABLE_FNAME_LEN];
char dbname2[TSDB_TABLE_FNAME_LEN];
int32_t startFunc;
int32_t stopFunc;
int32_t paramLen;
......@@ -302,9 +302,13 @@ typedef struct {
int8_t strict;
int8_t hashMethod; // default is 1
int8_t cacheLast;
int8_t schemaless;
int32_t numOfRetensions;
SArray* pRetensions;
int8_t schemaless;
int32_t walRetentionPeriod;
int64_t walRetentionSize;
int32_t walRollPeriod;
int64_t walSegmentSize;
} SDbCfg;
typedef struct {
......
......@@ -120,6 +120,10 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER)
}
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.schemaless, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRetentionPeriod, _OVER)
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walRetentionSize, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRollPeriod, _OVER)
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walSegmentSize, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
......@@ -199,6 +203,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
}
}
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.schemaless, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRetentionPeriod, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walRetentionSize, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRollPeriod, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walSegmentSize, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pDb->lock);
......@@ -318,6 +326,10 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
if (pCfg->walRetentionPeriod < TSDB_DB_MIN_WAL_RETENTION_PERIOD) return -1;
if (pCfg->walRetentionSize < TSDB_DB_MIN_WAL_RETENTION_SIZE) return -1;
if (pCfg->walRollPeriod < TSDB_DB_MIN_WAL_ROLL_PERIOD) return -1;
if (pCfg->walSegmentSize < TSDB_DB_MIN_WAL_SEGMENT_SIZE) return -1;
terrno = 0;
return terrno;
......@@ -345,6 +357,12 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_SIZE;
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
if (pCfg->walRetentionPeriod < 0 && pCfg->walRetentionPeriod != -1)
pCfg->walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD;
if (pCfg->walRetentionSize < 0 && pCfg->walRetentionSize != -1)
pCfg->walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE;
if (pCfg->walRollPeriod < 0) pCfg->walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD;
if (pCfg->walSegmentSize < 0) pCfg->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
}
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
......@@ -457,6 +475,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.cacheLast = pCreate->cacheLast,
.hashMethod = 1,
.schemaless = pCreate->schemaless,
.walRetentionPeriod = pCreate->walRetentionPeriod,
.walRetentionSize = pCreate->walRetentionSize,
.walRollPeriod = pCreate->walRollPeriod,
.walSegmentSize = pCreate->walSegmentSize,
};
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
......
......@@ -874,7 +874,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
}
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
mInfo("config rsp from dnode, app:%p", pRsp->info.ahandle);
mInfo("config rsp from dnode");
return 0;
}
......
......@@ -190,6 +190,10 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
int32_t code = -1;
STrans *pTrans = NULL;
if ((terrno = grantCheck(TSDB_GRANT_USER)) < 0) {
return code;
}
SFuncObj func = {0};
memcpy(func.name, pCreate->name, TSDB_FUNC_NAME_LEN);
func.createdTime = taosGetTimestampMs();
......
......@@ -128,7 +128,6 @@ int32_t mndInitGrant(SMnode *pMnode) {
void mndCleanupGrant() {}
void grantParseParameter() { mError("can't parsed parameter k"); }
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {}
......
......@@ -66,7 +66,7 @@ static int32_t mndInsInitMeta(SHashObj *hash) {
int32_t mndBuildInsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
if (NULL == pMnode->infosMeta) {
terrno = TSDB_CODE_MND_NOT_READY;
terrno = TSDB_CODE_APP_NOT_READY;
return -1;
}
......@@ -92,7 +92,7 @@ int32_t mndBuildInsTableSchema(SMnode *pMnode, const char *dbFName, const char *
int32_t mndBuildInsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
if (NULL == pMnode->infosMeta) {
terrno = TSDB_CODE_MND_NOT_READY;
terrno = TSDB_CODE_APP_NOT_READY;
return -1;
}
......
......@@ -281,7 +281,7 @@ static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffse
}
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1;
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
......@@ -297,15 +297,15 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END;
sdbCancelFetch(pSdb, pIter);
code = -1;
break;
}
sdbRelease(pSdb, pOffset);
}
code = 0;
END:
return code;
return code;
}
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
......
......@@ -68,7 +68,7 @@ int32_t mndPerfsInitMeta(SHashObj *hash) {
int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
if (NULL == pMnode->perfsMeta) {
terrno = TSDB_CODE_MND_NOT_READY;
terrno = TSDB_CODE_APP_NOT_READY;
return -1;
}
......@@ -94,7 +94,7 @@ int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char
int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
if (NULL == pMnode->perfsMeta) {
terrno = TSDB_CODE_MND_NOT_READY;
terrno = TSDB_CODE_APP_NOT_READY;
return -1;
}
......
......@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
pBasic->queryDesc = NULL;
mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries);
mDebug("queries updated in conn %u, num:%d", pConn->id, pConn->numOfQueries);
taosWUnLockLatch(&pConn->queryLock);
......
......@@ -641,6 +641,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.contLen = contLen;
action.msgType = TDMT_VND_CREATE_STB;
action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter);
......@@ -805,7 +806,7 @@ _OVER:
}
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbName(pTrans, pDb->name, NULL);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
......@@ -1612,7 +1613,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (needRsp) {
void *pCont = NULL;
......@@ -1811,7 +1812,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
......
......@@ -824,7 +824,7 @@ int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj
}
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1;
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
......@@ -840,12 +840,14 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub);
goto END;
sdbCancelFetch(pSdb, pIter);
code = -1;
break;
}
sdbRelease(pSdb, pSub);
}
code = 0;
END:
return code;
}
......
......@@ -833,7 +833,7 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
}
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1;
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
......@@ -848,11 +848,14 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
goto END;
sdbRelease(pSdb, pTopic);
sdbCancelFetch(pSdb, pIter);
code = -1;
break;
}
sdbRelease(pSdb, pTopic);
}
code = 0;
END:
return code;
}
......@@ -127,8 +127,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
......@@ -290,8 +290,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->exec = exec;
pTrans->oper = oper;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
......@@ -727,10 +727,10 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) {
if (dbname1 != NULL) {
memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN);
tstrncpy(pTrans->dbname1, dbname1, TSDB_TABLE_FNAME_LEN);
}
if (dbname2 != NULL) {
memcpy(pTrans->dbname2, dbname2, TSDB_DB_FNAME_LEN);
tstrncpy(pTrans->dbname2, dbname2, TSDB_TABLE_FNAME_LEN);
}
}
......@@ -1289,6 +1289,19 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
} else {
pTrans->code = terrno;
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->lastAction != 0) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction);
if (pAction->retryCode != 0 && pAction->retryCode != pAction->errCode) {
if (pTrans->failedTimes < 6) {
mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id,
pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes);
taosMsleep(1000);
continueExec = true;
return true;
}
}
}
pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr());
continueExec = true;
......
......@@ -230,6 +230,10 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.standby = standby;
createReq.isTsma = pVgroup->isTsma;
createReq.pTsma = pVgroup->pTsma;
createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
createReq.walRetentionSize = pDb->cfg.walRetentionSize;
createReq.walRollPeriod = pDb->cfg.walRollPeriod;
createReq.walSegmentSize = pDb->cfg.walSegmentSize;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &createReq.replicas[v];
......
......@@ -27,8 +27,8 @@
#include "wal.h"
#include "tcommon.h"
#include "tgrant.h"
#include "tfs.h"
#include "tgrant.h"
#include "tmsg.h"
#include "trow.h"
......@@ -101,7 +101,7 @@ typedef struct SMetaFltParam {
} SMetaFltParam;
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *results);
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *results);
#if 1 // refact APIs below (TODO)
typedef SVCreateTbReq STbCfg;
......@@ -118,11 +118,10 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// typedef struct STsdb STsdb;
typedef struct STsdbReader STsdbReader;
#define BLOCK_LOAD_OFFSET_ORDER 1
#define BLOCK_LOAD_TABLESEQ_ORDER 2
#define BLOCK_LOAD_EXTERN_ORDER 3
#define TIMEWINDOW_RANGE_CONTAINED 1
#define TIMEWINDOW_RANGE_EXTERNAL 2
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
......@@ -238,8 +237,8 @@ typedef struct {
uint64_t groupId;
} STableKeyInfo;
#define TABLE_ROLLUP_ON ((int8_t)0x1)
#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0)
#define TABLE_ROLLUP_ON ((int8_t)0x1)
#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0)
#define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON)
struct SMetaEntry {
int64_t version;
......
......@@ -104,6 +104,8 @@ typedef struct {
// TODO remove
SWalReader* pWalReader;
SWalRef* pRef;
// push
STqPushHandle pushHandle;
......
......@@ -371,8 +371,8 @@ struct SBlockIdx {
struct SMapData {
int32_t nItem;
int32_t *aOffset;
int32_t nData;
int32_t *aOffset;
uint8_t *pData;
};
......
......@@ -268,6 +268,7 @@ struct SVnode {
tsem_t canCommit;
int64_t sync;
int32_t blockCount;
bool restored;
tsem_t syncSem;
SQHandle* pQuery;
};
......
......@@ -180,11 +180,41 @@ int metaClose(SMeta *pMeta) {
return 0;
}
int32_t metaRLock(SMeta *pMeta) { return taosThreadRwlockRdlock(&pMeta->lock); }
int32_t metaRLock(SMeta *pMeta) {
int32_t ret = 0;
int32_t metaWLock(SMeta *pMeta) { return taosThreadRwlockWrlock(&pMeta->lock); }
metaDebug("meta rlock %p B", &pMeta->lock);
int32_t metaULock(SMeta *pMeta) { return taosThreadRwlockUnlock(&pMeta->lock); }
ret = taosThreadRwlockRdlock(&pMeta->lock);
metaDebug("meta rlock %p E", &pMeta->lock);
return ret;
}
int32_t metaWLock(SMeta *pMeta) {
int32_t ret = 0;
metaDebug("meta wlock %p B", &pMeta->lock);
ret = taosThreadRwlockWrlock(&pMeta->lock);
metaDebug("meta wlock %p E", &pMeta->lock);
return ret;
}
int32_t metaULock(SMeta *pMeta) {
int32_t ret = 0;
metaDebug("meta ulock %p B", &pMeta->lock);
ret = taosThreadRwlockUnlock(&pMeta->lock);
metaDebug("meta ulock %p E", &pMeta->lock);
return ret;
}
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1;
......@@ -259,7 +289,7 @@ static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STagIdxKey *pTagIdxKey1 = (STagIdxKey *)pKey1;
STagIdxKey *pTagIdxKey2 = (STagIdxKey *)pKey2;
tb_uid_t uid1, uid2;
tb_uid_t uid1 = 0, uid2 = 0;
int c;
// compare suid
......@@ -287,14 +317,15 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
// all not NULL, compr tag vals
c = doCompare(pTagIdxKey1->data, pTagIdxKey2->data, pTagIdxKey1->type, 0);
if (c) return c;
}
if (IS_VAR_DATA_TYPE(pTagIdxKey1->type)) {
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + varDataTLen(pTagIdxKey1->data));
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + varDataTLen(pTagIdxKey2->data));
} else {
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + tDataTypes[pTagIdxKey1->type].bytes);
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + tDataTypes[pTagIdxKey2->type].bytes);
}
// both null or tag values are equal, then continue to compare uids
if (IS_VAR_DATA_TYPE(pTagIdxKey1->type)) {
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + varDataTLen(pTagIdxKey1->data));
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + varDataTLen(pTagIdxKey2->data));
} else {
uid1 = *(tb_uid_t *)(pTagIdxKey1->data + tDataTypes[pTagIdxKey1->type].bytes);
uid2 = *(tb_uid_t *)(pTagIdxKey2->data + tDataTypes[pTagIdxKey2->type].bytes);
}
// compare uid
......
......@@ -765,12 +765,14 @@ typedef struct {
int32_t vLen;
} SIdxCursor;
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
SIdxCursor *pCursor = NULL;
char *buf = NULL;
int32_t maxSize = 0;
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
int32_t ret = 0;
char *buf = NULL;
int32_t ret = 0, valid = 0;
STagIdxKey *pKey = NULL;
int32_t nKey = 0;
SIdxCursor *pCursor = NULL;
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
pCursor->pMeta = pMeta;
pCursor->suid = param->suid;
......@@ -782,9 +784,8 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
if (ret < 0) {
goto END;
}
STagIdxKey *pKey = NULL;
int32_t nKey = 0;
int32_t maxSize = 0;
int32_t nTagData = 0;
void *tagData = NULL;
......@@ -822,10 +823,12 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
goto END;
}
void *entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal;
bool first = true;
int32_t valid = 0;
while (1) {
void *entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal;
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
if (valid < 0) {
break;
......@@ -864,10 +867,12 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
break;
}
}
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
taosMemoryFree(buf);
taosMemoryFree(pKey);
taosMemoryFree(pCursor);
......
......@@ -178,7 +178,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
// TODO: just for pass case
#if 0
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
terrno = TSDB_CODE_TDB_STB_ALREADY_EXIST;
metaReaderClear(&mr);
return -1;
#else
......@@ -223,7 +223,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
// check if super table exists
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
return -1;
}
......@@ -358,6 +358,14 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
goto _err;
}
if (pReq->type == TSDB_CHILD_TABLE) {
tb_uid_t suid = metaGetTableEntryUidByName(pMeta, pReq->ctb.name);
if (suid != pReq->ctb.suid) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
}
// validate req
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
......@@ -371,13 +379,6 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
}
metaReaderClear(&mr);
if (pReq->type == TSDB_CHILD_TABLE) {
tb_uid_t suid = metaGetTableEntryUidByName(pMeta, pReq->ctb.name);
if (suid == 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
}
// build SMetaEntry
me.version = version;
me.type = pReq->type;
......@@ -442,7 +443,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
if (ret != 0) {
return ret;
}
if (taosArrayGetSize(tbUids) == 0){
if (taosArrayGetSize(tbUids) == 0) {
return 0;
}
......@@ -811,6 +812,9 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
SSchema *pCol = &pTagSchema->pSchema[i];
if (iCol == i) {
if (pAlterTbReq->isNull) {
continue;
}
STagVal val = {0};
val.type = pCol->type;
val.cid = pCol->colId;
......
......@@ -138,6 +138,14 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
}
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
if (pRsp->blockNum > 0) {
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
} else {
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
}
}
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
......@@ -205,6 +213,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT(0);
return -1;
}
if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
ASSERT(0);
return -1;
}
}
/*}*/
/*}*/
......@@ -369,8 +386,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
int64_t fetchVer = fetchOffsetNew.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
code = -1;
goto OVER;
......@@ -527,11 +544,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta;
// TODO version should be assigned and refed during preprocess
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
if (pRef == NULL) {
ASSERT(0);
}
int64_t ver = pRef->refVer;
pHandle->pRef = pRef;
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
// TODO version should be assigned in preprocess
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg;
pHandle->snapshotVer = ver;
......@@ -553,14 +573,18 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
tqDebug("vgId:%d, tq try get suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
......
......@@ -65,12 +65,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
qTaskInfo_t task = pExec->execCol.task;
if (qStreamPrepareScan(task, pOffset) < 0) {
tqDebug("prepare scan failed, return");
if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset;
return 0;
} else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset) < 0) {
tqDebug("prepare scan failed, return");
pRsp->rspOffset = *pOffset;
return 0;
}
......@@ -126,9 +128,16 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
ASSERT(pRsp->rspOffset.type != 0);
#if 0
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
if (pRsp->blockNum > 0) {
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
} else {
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
}
}
#endif
tqDebug("task exec exited");
break;
}
......
......@@ -43,20 +43,16 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
return 0;
}
int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_t kLen2) {
return strcmp(pKey1, pKey2);
}
int32_t tqMetaOpen(STQ* pTq) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
ASSERT(0);
}
if (tdbTbOpen("handles", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) {
if (tdbTbOpen("handles", -1, -1, 0, pTq->pMetaStore, &pTq->pExecStore) < 0) {
ASSERT(0);
}
TXN txn;
TXN txn = {0};
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0);
......@@ -79,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
ASSERT(0);
}
walRefVer(handle.pRef, handle.snapshotVer);
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SReadHandle reader = {
.meta = pTq->pVnode->pMeta,
......@@ -98,10 +100,11 @@ int32_t tqMetaOpen(STQ* pTq) {
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader);
} else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
tqDebug("tq restore %s consumer %ld", handle.subKey, handle.consumerId);
tqDebug("tq restore %s consumer %ld vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
}
......@@ -126,6 +129,9 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
ASSERT(code == 0);
tqDebug("tq save %s(%d) consumer %ld vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId,
TD_VID(pTq->pVnode));
void* buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) {
ASSERT(0);
......
......@@ -132,10 +132,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while (1) {
if (!fromProcessedMsg) {
if (walNextValidMsg(pReader->pWalReader) < 0) {
pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curInvalid;
pReader->ver =
pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped);
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ret->fetchType = FETCH_TYPE__NONE;
tqDebug("return offset %ld, no more valid", ret->offset.version);
ASSERT(ret->offset.version >= 0);
return -1;
}
......@@ -167,6 +169,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
ret->offset.version = pReader->ver;
ASSERT(pReader->ver >= 0);
ret->fetchType = FETCH_TYPE__NONE;
tqDebug("return offset %ld, processed finish", ret->offset.version);
return 0;
}
}
......
......@@ -24,6 +24,8 @@ void tMapDataReset(SMapData *pMapData) {
void tMapDataClear(SMapData *pMapData) {
tFree((uint8_t *)pMapData->aOffset);
tFree(pMapData->pData);
pMapData->pData = NULL;
pMapData->aOffset = NULL;
}
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
......@@ -1395,10 +1397,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
break;
case TSDB_DATA_TYPE_BOOL:
break;
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_TINYINT:{
pColAgg->sum += colVal.value.i8;
if (pColAgg->min > colVal.value.i8) {
pColAgg->min = colVal.value.i8;
}
if (pColAgg->max < colVal.value.i8) {
pColAgg->max = colVal.value.i8;
}
break;
case TSDB_DATA_TYPE_SMALLINT:
}
case TSDB_DATA_TYPE_SMALLINT:{
pColAgg->sum += colVal.value.i16;
if (pColAgg->min > colVal.value.i16) {
pColAgg->min = colVal.value.i16;
}
if (pColAgg->max < colVal.value.i16) {
pColAgg->max = colVal.value.i16;
}
break;
}
case TSDB_DATA_TYPE_INT: {
pColAgg->sum += colVal.value.i32;
if (pColAgg->min > colVal.value.i32) {
......@@ -1419,24 +1437,79 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
}
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_FLOAT:{
pColAgg->sum += colVal.value.f;
if (pColAgg->min > colVal.value.f) {
pColAgg->min = colVal.value.f;
}
if (pColAgg->max < colVal.value.f) {
pColAgg->max = colVal.value.f;
}
break;
case TSDB_DATA_TYPE_DOUBLE:
}
case TSDB_DATA_TYPE_DOUBLE:{
pColAgg->sum += colVal.value.d;
if (pColAgg->min > colVal.value.d) {
pColAgg->min = colVal.value.d;
}
if (pColAgg->max < colVal.value.d) {
pColAgg->max = colVal.value.d;
}
break;
}
case TSDB_DATA_TYPE_VARCHAR:
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_TIMESTAMP:{
if (pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
}
if (pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
}
break;
}
case TSDB_DATA_TYPE_NCHAR:
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_UTINYINT:{
pColAgg->sum += colVal.value.u8;
if (pColAgg->min > colVal.value.u8) {
pColAgg->min = colVal.value.u8;
}
if (pColAgg->max < colVal.value.u8) {
pColAgg->max = colVal.value.u8;
}
break;
case TSDB_DATA_TYPE_USMALLINT:
}
case TSDB_DATA_TYPE_USMALLINT:{
pColAgg->sum += colVal.value.u16;
if (pColAgg->min > colVal.value.u16) {
pColAgg->min = colVal.value.u16;
}
if (pColAgg->max < colVal.value.u16) {
pColAgg->max = colVal.value.u16;
}
break;
case TSDB_DATA_TYPE_UINT:
}
case TSDB_DATA_TYPE_UINT:{
pColAgg->sum += colVal.value.u32;
if (pColAgg->min > colVal.value.u32) {
pColAgg->min = colVal.value.u32;
}
if (pColAgg->max < colVal.value.u32) {
pColAgg->max = colVal.value.u32;
}
break;
case TSDB_DATA_TYPE_UBIGINT:
}
case TSDB_DATA_TYPE_UBIGINT:{
pColAgg->sum += colVal.value.u64;
if (pColAgg->min > colVal.value.u64) {
pColAgg->min = colVal.value.u64;
}
if (pColAgg->max < colVal.value.u64) {
pColAgg->max = colVal.value.u64;
}
break;
}
case TSDB_DATA_TYPE_JSON:
break;
case TSDB_DATA_TYPE_VARBINARY:
......
......@@ -40,8 +40,8 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
.vgId = -1,
.fsyncPeriod = 0,
.retentionPeriod = -1,
.rollPeriod = -1,
.segSize = -1,
.rollPeriod = 0,
.segSize = 0,
.retentionSize = -1,
.level = TAOS_WAL_WRITE,
},
......
......@@ -479,6 +479,11 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
rcode = -1;
goto _exit;
}
// validate hash
sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
if (vnodeValidateTableHash(pVnode, tbName) < 0) {
......@@ -823,6 +828,13 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit;
}
if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
pRsp->code = terrno;
tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit;
}
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
submitBlkRsp.code = terrno;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册