提交 3cbaaa5a 编写于 作者: M Minghao Li

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

--- ---
sidebar_label: Interval sidebar_label: Distinguished
title: Aggregate by Time Window title: Distinguished Query for Time Series Database
--- ---
Aggregation by time window is supported in TDengine. For example, in the case where temperature sensors report the temperature every seconds, the average temperature for every 10 minutes can be retrieved by performing a query with a time window. Aggregation by time window is supported in TDengine. For example, in the case where temperature sensors report the temperature every seconds, the average temperature for every 10 minutes can be retrieved by performing a query with a time window.
......
---
sidebar_label: 消息队列
title: 消息队列
---
TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。
## 创建订阅主题
```sql
CREATE TOPIC [IF NOT EXISTS] topic_name AS {subquery | DATABASE db_name | STABLE stb_name };
```
订阅主题包括三种:列订阅、超级表订阅和数据库订阅。
**列订阅是**用 subquery 描述,支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下:
1. TOPIC 一旦创建则返回结果的字段确定
2. 被订阅或用于计算的列不可被删除、修改
3. 列可以新增,但新增的列不出现在订阅结果字段中
4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
**超级表订阅和数据库订阅**规则如下:
1. 被订阅主体的 schema 变更不受限
2. 返回消息中 schema 是块级别的,每块的 schema 可能不一样
3. 列变更后写入的数据若未落盘,将以写入时的 schema 返回
4. 列变更后写入的数据若未已落盘,将以落盘时的 schema 返回
## 删除订阅主题
```sql
DROP TOPIC [IF EXISTS] topic_name;
```
此时如果该订阅主题上存在 consumer,则此 consumer 会收到一个错误。
## 查看订阅主题
## SHOW TOPICS
```sql
SHOW TOPICS;
```
显示当前数据库下的所有主题的信息。
## 创建消费组
消费组的创建只能通过 TDengine 客户端驱动或者连接器所提供的 API 创建。
## 删除消费组
```sql
DROP CONSUMER GROUP [IF EXISTS] cgroup_name ON topic_name;
```
删除主题 topic_name 上的消费组 cgroup_name。
## 查看消费组
```sql
SHOW CONSUMERS;
```
显示当前数据库下所有活跃的消费者的信息。
---
title: Naming & Restrictions
---
## Naming Rules
1. Only characters from the English alphabet, digits and underscore are allowed
2. Names cannot start with a digit
3. Case insensitive without escape character "\`"
4. Identifier with escape character "\`"
To support more flexible table or column names, a new escape character "\`" is introduced. For more details please refer to [escape](/taos-sql/escape).
## Password Rule
The legal character set is `[a-zA-Z0-9!?$%^&*()_–+={[}]:;@~#|<,>.?/]`.
## General Limits
- Maximum length of database name is 32 bytes, and it can't include "." or special characters.
- Maximum length of table name is 192 bytes, excluding the database name prefix and the separator.
- Maximum length of each data row is 48K bytes. Please note that the upper limit includes the extra 2 bytes consumed by each column of BINARY/NCHAR type.
- Maximum length of column name is 64.
- Maximum number of columns is 4096. There must be at least 2 columns, and the first column must be timestamp.
- Maximum length of tag name is 64.
- Maximum number of tags is 128. There must be at least 1 tag. The total length of tag values should not exceed 16K bytes.
- Maximum length of singe SQL statement is 1048576, i.e. 1 MB. It can be configured in the parameter `maxSQLLength` in the client side, the applicable range is [65480, 1048576].
- At most 4096 columns can be returned by `SELECT`. Functions in the query statement constitute columns. An error is returned if the limit is exceeded.
- Maximum numbers of databases, STables, tables are dependent only on the system resources.
- Maximum number of replicas for a database is 3.
- Maximum length of user name is 23 bytes.
- Maximum length of password is 15 bytes.
- Maximum number of rows depends only on the storage space.
- Maximum number of vnodes for a single database is 1024.
## Restrictions of Table/Column Names
### Name Restrictions of Table/Column
The name of a table or column can only be composed of ASCII characters, digits and underscore and it cannot start with a digit. The maximum length is 192 bytes. Names are case insensitive. The name mentioned in this rule doesn't include the database name prefix and the separator.
### Name Restrictions After Escaping
To support more flexible table or column names, new escape character "\`" is introduced in TDengine to avoid the conflict between table name and keywords and break the above restrictions for table names. The escape character is not counted in the length of table name.
With escaping, the string inside escape characters are case sensitive, i.e. will not be converted to lower case internally.
For example:
\`aBc\` and \`abc\` are different table or column names, but "abc" and "aBc" are same names because internally they are all "abc".
:::note
The characters inside escape characters must be printable characters.
:::
---
sidebar_label: 流式计算
title: 流式计算
---
在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。用户通常需要在时序数据库之外再搭建 Kafka、Flink、Spark 等流计算处理引擎,增加了用户的开发成本和维护成本。
使用 TDengine 3.0 的流式计算引擎能够最大限度的减少对这些额外中间件的依赖,真正将数据的写入、预处理、长期存储、复杂分析、实时计算、实时报警触发等功能融为一体,并且,所有这些任务只需要使用 SQL 完成,极大降低了用户的学习成本、使用成本。
## 创建流式计算
```sql
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {
TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
WATERMARK time
}
```
其中 subquery 是 select 普通查询语法的子集:
```sql
subquery: SELECT [DISTINCT] select_list
from_clause
[WHERE condition]
[PARTITION BY tag_list]
[window_clause]
[group_by_clause]
```
不支持 order_by,limit,slimit,fill 语句
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
```sql
CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
```
## 删除流式计算
```sql
DROP STREAM [IF NOT EXISTS] stream_name
```
仅删除流式计算任务,由流式计算写入的数据不会被删除。
## 展示流式计算
```sql
SHOW STREAMS;
```
## 流式计算的触发模式
在创建流时,可以通过 TRIGGER 指令指定流式计算的触发模式。
对于非窗口计算,流式计算的触发是实时的;对于窗口计算,目前提供 3 种触发模式:
1. AT_ONCE:写入立即触发
2. WINDOW_CLOSE:窗口关闭时触发(窗口关闭由事件时间决定,可配合 watermark 使用,详见《流式计算的乱序数据容忍策略》)
3. MAX_DELAY time:若窗口关闭,则触发计算。若窗口未关闭,且未关闭时长超过 max delay 指定的时间,则触发计算。
由于窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,则事件时间无法更新,可能导致无法得到最新的计算结果。
因此,流式计算提供了以事件时间结合处理时间计算的 MAX_DELAY 触发模式。
MAX_DELAY 模式在窗口关闭时会立即触发计算。此外,当数据写入后,计算触发的时间超过 max delay 指定的时间,则立即触发计算
## 流式计算的乱序数据容忍策略
在创建流时,可以在 stream_option 中指定 watermark。
流式计算通过 watermark 来度量对乱序数据的容忍程度,watermark 默认为 0。
T = 最新事件时间 - watermark
每批到来的数据都会以上述公式更新窗口关闭时间,并将窗口结束时间 < T 的所有打开的窗口关闭,若触发模式为 WINDOW_CLOSE 或 MAX_DELAY,则推送窗口聚合结果。
流式计算的过期数据处理策略
对于已关闭的窗口,再次落入该窗口中的数据被标记为过期数据,对于过期数据,流式计算提供两种处理方式:
1. 直接丢弃:这是常见流式计算引擎提供的默认(甚至是唯一)计算模式
2. 重新计算:从 TSDB 中重新查找对应窗口的所有数据并重新计算得到最新结果
无论在哪种模式下,watermark 都应该被妥善设置,来得到正确结果(直接丢弃模式)或避免频繁触发重算带来的性能开销(重新计算模式)。
## 流式计算的数据填充策略
TODO
## 流式计算与会话窗口(session window)
```sql
window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
}
```
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
## 流式计算的监控与流任务分布查询
TODO
## 流式计算的内存控制与存算分离
TODO
## 流式计算的暂停与恢复
```sql
STOP STREAM stream_name;
RESUME STREAM stream_name;
```
---
sidebar_label: 命名与边界限制
title: 命名与边界限制
---
## 名称命名规则
1. 合法字符:英文字符、数字和下划线
2. 允许英文字符或下划线开头,不允许以数字开头
3. 不区分大小写
4. 转义后表(列)名规则:
为了兼容支持更多形式的表(列)名,TDengine 引入新的转义符 "`"。可用让表名与关键词不冲突,同时不受限于上述表名称合法性约束检查
转义后的表(列)名同样受到长度限制要求,且长度计算的时候不计算转义符。使用转义字符以后,不再对转义字符中的内容进行大小写统一
例如:\`aBc\` 和 \`abc\` 是不同的表(列)名,但是 abc 和 aBc 是相同的表(列)名。
需要注意的是转义字符中的内容必须是可打印字符。
## 密码合法字符集
`[a-zA-Z0-9!?$%^&*()_–+={[}]:;@~#|<,>.?/]`
去掉了 `` ‘“`\ `` (单双引号、撇号、反斜杠、空格)
## 一般限制
- 数据库名最大长度为 32
- 表名最大长度为 192,不包括数据库名前缀和分隔符
- 每行数据最大长度 48KB (注意:数据行内每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)
- 列名最大长度为 64
- 最多允许 4096 列,最少需要 2 列,第一列必须是时间戳。
- 标签名最大长度为 64
- 最多允许 128 个,至少要有 1 个标签,一个表中标签值的总长度不超过 16KB
- SQL 语句最大长度 1048576 个字符,也可通过客户端配置参数 maxSQLLength 修改,取值范围 65480 ~ 1048576
- SELECT 语句的查询结果,最多允许返回 4096 列(语句中的函数调用可能也会占用一些列空间),超限时需要显式指定较少的返回数据列,以避免语句执行报错
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制
- 数据库的副本数只能设置为 1 或 3
- 用户名的最大长度是 23 个字节
- 用户密码的最大长度是 15 个字节
- 总数据行数取决于可用资源
- 单个数据库的虚拟结点数上限为 1024
## 表(列)名合法性说明
### TDengine 中的表(列)名命名规则如下:
只能由字母、数字、下划线构成,数字不能在首位,长度不能超过 192 字节,不区分大小写。这里表名称不包括数据库名的前缀和分隔符。
### 转义后表(列)名规则:
为了兼容支持更多形式的表(列)名,TDengine 引入新的转义符 "`",可以避免表名与关键词的冲突,同时不受限于上述表名合法性约束检查,转义符不计入表名的长度。
转义后的表(列)名同样受到长度限制要求,且长度计算的时候不计算转义符。使用转义字符以后,不再对转义字符中的内容进行大小写统一。
例如:
\`aBc\`\`abc\` 是不同的表(列)名,但是 abc 和 aBc 是相同的表(列)名。
:::note
转义字符中的内容必须是可打印字符。
:::
---
sidebar_label: 集群管理
title: 集群管理
---
组成 TDengine 集群的物理实体是 dnode (data node 的缩写),它是一个运行在操作系统之上的进程。在 dnode 中可以建立负责时序数据存储的 vnode (virtual node),在多节点集群环境下当某个数据库的 replica 为 3 时,该数据库中的每个 vgroup 由 3 个 vnode 组成;当数据库的 replica 为 1 时,该数据库中的每个 vgroup 由 1 个 vnode 组成。如果要想配置某个数据库为多副本,则集群中的 dnode 数量至少为 3。在 dnode 还可以创建 mnode (management node),单个集群中最多可以创建三个 mnode。在 TDengine 3.0.0.0 中为了支持存算分离,引入了一种新的逻辑节点 qnode (query node),qnode 和 vnode 既可以共存在一个 dnode 中,也可以完全分离在不同的 dnode 上。
## 创建数据节点
```sql
CREATE DNODE {dnode_endpoint | dnode_host_name PORT port_val}
```
其中 `dnode_endpoint` 是形成 `hostname:port`的格式。也可以分开指定 hostname 和 port。
实际操作中推荐先创建 dnode,再启动相应的 dnode 进程,这样该 dnode 就可以立即根据其配置文件中的 firstEP 加入集群。每个 dnode 在加入成功后都会被分配一个 ID。
## 查看数据节点
```sql
SHOW DNODES;
```
可以列出集群中所有的数据节点,所列出的字段有 dnode 的 ID, endpoint, status。
## 删除数据节点
```sql
DROP DNODE {dnode_id | dnode_endpoint}
```
可以用 dnoe_id 或 endpoint 两种方式从集群中删除一个 dnode。注意删除 dnode 不等于停止相应的进程。实际中推荐先将一个 dnode 删除之后再停止其所对应的进程。
## 修改数据节点配置
```sql
ALTER DNODE dnode_id dnode_option
ALTER ALL DNODES dnode_option
dnode_option: {
'resetLog'
| 'balance' value
| 'monitor' value
| 'debugFlag' value
| 'monDebugFlag' value
| 'vDebugFlag' value
| 'mDebugFlag' value
| 'cDebugFlag' value
| 'httpDebugFlag' value
| 'qDebugflag' value
| 'sdbDebugFlag' value
| 'uDebugFlag' value
| 'tsdbDebugFlag' value
| 'sDebugflag' value
| 'rpcDebugFlag' value
| 'dDebugFlag' value
| 'mqttDebugFlag' value
| 'wDebugFlag' value
| 'tmrDebugFlag' value
| 'cqDebugFlag' value
}
```
上面语法中的这些可修改配置项其配置方式与 dnode 配置文件中的配置方式相同,区别是修改是动态的立即生效,且不需要重启 dnode。
## 添加管理节点
```sql
CREATE MNODE ON DNODE dnode_id
```
系统启动默认在 firstEP 节点上创建一个 MNODE,用户可以使用此语句创建更多的 MNODE 来提高系统可用性。一个集群最多存在三个 MNODE,一个 DNODE 上只能创建一个 MNODE。
## 查看管理节点
```sql
SHOW MNODES;
```
列出集群中所有的管理节点,包括其 ID,所在 DNODE 以及状态。
## 删除管理节点
```sql
DROP MNODE ON DNODE dnode_id;
```
删除 dnode_id 所指定的 DNODE 上的 MNODE。
## 创建查询节点
```sql
CREATE QNODE ON DNODE dnode_id;
```
系统启动默认没有 QNODE,用户可以创建 QNODE 来实现计算和存储的分离。一个 DNODE 上只能创建一个 QNODE。一个 DNODE 的 `supportVnodes` 参数如果不为 0,同时又在其上创建上 QNODE,则在该 dnode 中既有负责存储管理的 vnode 又有负责查询计算的 qnode,如果还在该 dnode 上创建了 mnode,则一个 dnode 上最多三种逻辑节点都可以存在。但通过配置也可以使其彻底分离。将一个 dnode 的`supportVnodes`配置为 0,可以选择在其上创建 mnode 或者 qnode 中的一种,这样可以实现三种逻辑节点在物理上的彻底分离。
## 查看查询节点
```sql
SHOW QNODES;
```
列出集群中所有查询节点,包括 ID,及所在 DNODE。
## 删除查询节点
```sql
DROP QNODE ON DNODE dnode_id;
```
删除 ID 为 dnode_id 的 DNODE 上的 QNODE,但并不会影响该 dnode 的状态。
## 修改客户端配置
如果将客户端也看作广义的集群的一部分,可以通过如下命令动态修改客户端配置参数。
```sql
ALTER LOCAL local_option
local_option: {
'resetLog'
| 'rpcDebugFlag' value
| 'tmrDebugFlag' value
| 'cDebugFlag' value
| 'uDebugFlag' value
| 'debugFlag' value
}
```
上面语法中的参数与在配置文件中配置客户端的用法相同,但不需要重启客户端,修改后立即生效。
## 查看客户端配置
```sql
SHOW LOCAL VARIABLES;
```
## 合并 vgroup
```sql
MERGE VGROUP vgroup_no1 vgroup_no2;
```
如果在系统实际运行一段时间后,因为不同时间线的数据特征不同导致在 vgroups 之间的数据和负载分布不均衡,可以通过合并或拆分 vgroups 的方式逐步实现负载均衡。
## 拆分 vgroup
```sql
SPLIT VGROUP vgroup_no;
```
会创建一个新的 vgroup,并将指定 vgroup 中的数据按照一致性 HASH 迁移一部分到新的 vgroup 中。此过程中,原 vgroup 可以正常提供读写服务。
---
sidebar_label: 元数据库
title: 元数据库
---
TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数据库元数据、数据库系统信息和状态的访问,例如数据库或表的名称,当前执行的 SQL 语句等。该数据库存储有关 TDengine 维护的所有其他数据库的信息。它包含多个只读表。实际上,这些表都是视图,而不是基表,因此没有与它们关联的文件。所以对这些表只能查询,不能进行 INSERT 等写入操作。`INFORMATION_SCHEMA` 数据库旨在以一种更一致的方式来提供对 TDengine 支持的各种 SHOW 语句(如 SHOW TABLES、SHOW DATABASES)所提供的信息的访问。与 SHOW 语句相比,使用 SELECT ... FROM INFORMATION_SCHEMA.tablename 具有以下优点:
1. 可以使用 USE 语句将 INFORMATION_SCHEMA 设为默认数据库
2. 可以使用 SELECT 语句熟悉的语法,只需要学习一些表名和列名
3. 可以对查询结果进行筛选、排序等操作。事实上,可以使用任意 TDengine 支持的 SELECT 语句对 INFORMATION_SCHEMA 中的表进行查询
4. TDengine 在后续演进中可以灵活的添加已有 INFORMATION_SCHEMA 中表的列,而不用担心对既有业务系统造成影响
5. 与其他数据库系统更具互操作性。例如,Oracle 数据库用户熟悉查询 Oracle 数据字典中的表
Note: 由于 SHOW 语句已经被开发者熟悉和广泛使用,所以它们仍然被保留。
本章将详细介绍 `INFORMATION_SCHEMA` 这个内置元数据库中的表和表结构。
## INS_DNODES
提供 dnode 的相关信息。也可以使用 SHOW DNODES 来查询这些信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :------------: | ------------ | ------------------------- |
| 1 | vnodes | SMALLINT | dnode 中的实际 vnode 个数 |
| 2 | support_vnodes | SMALLINT | 最多支持的 vnode 个数 |
| 3 | status | BINARY(10) | 当前状态 |
| 4 | note | BINARY(256) | 离线原因等信息 |
| 5 | id | SMALLINT | dnode id |
| 6 | endpoint | BINARY(134) | dnode 的地址 |
| 7 | create | TIMESTAMP | 创建时间 |
## INS_MNODES
提供 mnode 的相关信息。也可以使用 SHOW MNODES 来查询这些信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ------------------ |
| 1 | id | SMALLINT | mnode id |
| 2 | endpoint | BINARY(134) | mnode 的地址 |
| 3 | role | BINARY(10) | 当前角色 |
| 4 | role_time | TIMESTAMP | 成为当前角色的时间 |
| 5 | create_time | TIMESTAMP | 创建时间 |
## INS_MODULES
提供组件的相关信息。也可以使用 SHOW MODULES 来查询这些信息
| # | **列名** | **数据类型** | **说明** |
| --- | :------: | ------------ | ---------- |
| 1 | id | SMALLINT | module id |
| 2 | endpoint | BINARY(134) | 组件的地址 |
| 3 | module | BINARY(10) | 组件状态 |
## INS_QNODES
当前系统中 QNODE 的信息。也可以使用 SHOW QNODES 来查询这些信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ------------ |
| 1 | id | SMALLINT | qnode id |
| 2 | endpoint | BINARY(134) | qnode 的地址 |
| 3 | create_time | TIMESTAMP | 创建时间 |
## INS_CLUSTER
存储集群相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ---------- |
| 1 | id | BIGINT | cluster id |
| 2 | name | BINARY(134) | 集群名称 |
| 3 | create_time | TIMESTAMP | 创建时间 |
## INS_DATABASES
提供用户创建的数据库对象的相关信息。也可以使用 SHOW DATABASES 来查询这些信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :------------------: | ---------------- | ------------------------------------------------ |
| 1 | name | BINARY(32) | 数据库名 |
| 2 | create_time | TIMESTAMP | 创建时间 |
| 3 | ntables | INT | 数据库中表的数量,包含子表和普通表但不包含超级表 |
| 4 | vgroups | INT | 数据库中有多少个 vgroup |
| 6 | replica | INT | 副本数 |
| 7 | quorum | BINARY(3) | 强一致性 |
| 8 | duration | INT | 单文件存储数据的时间跨度 |
| 9 | keep | INT | 数据保留时长 |
| 10 | buffer | INT | 每个 vnode 写缓存的内存块大小,单位 MB |
| 11 | pagesize | INT | 每个 VNODE 中元数据存储引擎的页大小,单位为 KB |
| 12 | pages | INT | 每个 vnode 元数据存储引擎的缓存页个数 |
| 13 | minrows | INT | 文件块中记录的最大条数 |
| 14 | maxrows | INT | 文件块中记录的最小条数 |
| 15 | comp | INT | 数据压缩方式 |
| 16 | precision | BINARY(2) | 时间分辨率 |
| 17 | status | BINARY(10) | 数据库状态 |
| 18 | retention | BINARY (60) | 数据的聚合周期和保存时长 |
| 19 | single_stable | BOOL | 表示此数据库中是否只可以创建一个超级表 |
| 20 | cachemodel | BINARY(60) | 表示是否在内存中缓存子表的最近数据 |
| 21 | cachesize | INT | 表示每个 vnode 中用于缓存子表最近数据的内存大小 |
| 22 | wal_level | INT | WAL 级别 |
| 23 | wal_fsync_period | INT | 数据落盘周期 |
| 24 | wal_retention_period | INT | WAL 的保存时长 |
| 25 | wal_retention_size | INT | WAL 的保存上限 |
| 26 | wal_roll_period | INT | wal 文件切换时长 |
| 27 | wal_segment_size | wal 单个文件大小 |
## INS_FUNCTIONS
用户创建的自定义函数的信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | -------------- |
| 1 | name | BINARY(64) | 函数名 |
| 2 | comment | BINARY(255) | 补充说明 |
| 3 | aggregate | INT | 是否为聚合函数 |
| 4 | output_type | BINARY(31) | 输出类型 |
| 5 | create_time | TIMESTAMP | 创建时间 |
| 6 | code_len | INT | 代码长度 |
| 7 | bufsize | INT | buffer 大小 |
## INS_INDEXES
提供用户创建的索引的相关信息。也可以使用 SHOW INDEX 来查询这些信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :--------------: | ------------ | ---------------------------------------------------------------------------------- |
| 1 | db_name | BINARY(32) | 包含此索引的表所在的数据库名 |
| 2 | table_name | BINARY(192) | 包含此索引的表的名称 |
| 3 | index_name | BINARY(192) | 索引名 |
| 4 | column_name | BINARY(64) | 建索引的列的列名 |
| 5 | index_type | BINARY(10) | 目前有 SMA 和 FULLTEXT |
| 6 | index_extensions | BINARY(256) | 索引的额外信息。对 SMA 类型的索引,是函数名的列表。对 FULLTEXT 类型的索引为 NULL。 |
## INS_STABLES
提供用户创建的超级表的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :-----------: | ------------ | ------------------------ |
| 1 | stable_name | BINARY(192) | 超级表表名 |
| 2 | db_name | BINARY(64) | 超级表所在的数据库的名称 |
| 3 | create_time | TIMESTAMP | 创建时间 |
| 4 | columns | INT | 列数目 |
| 5 | tags | INT | 标签数目 |
| 6 | last_update | TIMESTAMP | 最后更新时间 |
| 7 | table_comment | BINARY(1024) | 表注释 |
| 8 | watermark | BINARY(64) | 窗口的关闭时间 |
| 9 | max_delay | BINARY(64) | 推送计算结果的最大延迟 |
| 10 | rollup | BINARY(128) | rollup 聚合函数 |
## INS_TABLES
提供用户创建的普通表和子表的相关信息
| # | **列名** | **数据类型** | **说明** |
| --- | :-----------: | ------------ | ---------------- |
| 1 | table_name | BINARY(192) | 表名 |
| 2 | db_name | BINARY(64) | 数据库名 |
| 3 | create_time | TIMESTAMP | 创建时间 |
| 4 | columns | INT | 列数目 |
| 5 | stable_name | BINARY(192) | 所属的超级表表名 |
| 6 | uid | BIGINT | 表 id |
| 7 | vgroup_id | INT | vgroup id |
| 8 | ttl | INT | 表的生命周期 |
| 9 | table_comment | BINARY(1024) | 表注释 |
| 10 | type | BINARY(20) | 表类型 |
## INS_TAGS
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------- | ---------------------- |
| 1 | table_name | BINARY(192) | 表名 |
| 2 | db_name | BINARY(64) | 该表所在的数据库的名称 |
| 3 | stable_name | BINARY(192) | 所属的超级表表名 |
| 4 | tag_name | BINARY(64) | tag 的名称 |
| 5 | tag_type | BINARY(64) | tag 的类型 |
| 6 | tag_value | BINARY(16384) | tag 的值 |
## INS_USERS
提供系统中创建的用户的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | -------- |
| 1 | user_name | BINARY(23) | 用户名 |
| 2 | privilege | BINARY(256) | 权限 |
| 3 | create_time | TIMESTAMP | 创建时间 |
## INS_GRANTS
提供企业版授权的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | -------------------------------------------------- |
| 1 | version | BINARY(9) | 企业版授权说明:official(官方授权的)/trial(试用的) |
| 2 | cpu_cores | BINARY(9) | 授权使用的 CPU 核心数量 |
| 3 | dnodes | BINARY(10) | 授权使用的 dnode 节点数量 |
| 4 | streams | BINARY(10) | 授权创建的流数量 |
| 5 | users | BINARY(10) | 授权创建的用户数量 |
| 6 | accounts | BINARY(10) | 授权创建的帐户数量 |
| 7 | storage | BINARY(21) | 授权使用的存储空间大小 |
| 8 | connections | BINARY(21) | 授权使用的客户端连接数量 |
| 9 | databases | BINARY(11) | 授权使用的数据库数量 |
| 10 | speed | BINARY(9) | 授权使用的数据点每秒写入数量 |
| 11 | querytime | BINARY(9) | 授权使用的查询总时长 |
| 12 | timeseries | BINARY(21) | 授权使用的测点数量 |
| 13 | expired | BINARY(5) | 是否到期,true:到期,false:未到期 |
| 14 | expire_time | BINARY(19) | 试用期到期时间 |
## INS_VGROUPS
系统中所有 vgroups 的信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :-------: | ------------ | ------------------------------------------------------ |
| 1 | vgroup_id | INT | vgroup id |
| 2 | db_name | BINARY(32) | 数据库名 |
| 3 | tables | INT | 此 vgroup 内有多少表 |
| 4 | status | BINARY(10) | 此 vgroup 的状态 |
| 5 | v1_dnode | INT | 第一个成员所在的 dnode 的 id |
| 6 | v1_status | BINARY(10) | 第一个成员的状态 |
| 7 | v2_dnode | INT | 第二个成员所在的 dnode 的 id |
| 8 | v2_status | BINARY(10) | 第二个成员的状态 |
| 9 | v3_dnode | INT | 第三个成员所在的 dnode 的 id |
| 10 | v3_status | BINARY(10) | 第三个成员的状态 |
| 11 | nfiles | INT | 此 vgroup 中数据/元数据文件的数量 |
| 12 | file_size | INT | 此 vgroup 中数据/元数据文件的大小 |
| 13 | tsma | TINYINT | 此 vgroup 是否专用于 Time-range-wise SMA,1: 是, 0: 否 |
## INS_CONFIGS
系统配置参数。
| # | **列名** | **数据类型** | **说明** |
| --- | :------: | ------------ | ------------ |
| 1 | name | BINARY(32) | 配置项名称 |
| 2 | value | BINARY(64) | 该配置项的值 |
## INS_DNODE_VARIABLES
系统中每个 dnode 的配置参数。
| # | **列名** | **数据类型** | **说明** |
| --- | :------: | ------------ | ------------ |
| 1 | dnode_id | INT | dnode 的 ID |
| 2 | name | BINARY(32) | 配置项名称 |
| 3 | value | BINARY(64) | 该配置项的值 |
---
sidebar_label: 权限管理
title: 权限管理
---
本节讲述如何在 TDengine 中进行权限管理的相关操作。
## 创建用户
```sql
CREATE USER use_name PASS password;
```
创建用户。
use_name最长为23字节。
password最长为128字节,合法字符包括"a-zA-Z0-9!?$%^&*()_–+={[}]:;@~#|<,>.?/",不可以出现单双引号、撇号、反斜杠和空格,且不可以为空。
## 删除用户
```sql
DROP USER user_name;
```
## 修改用户信息
```sql
ALTER USER user_name alter_user_clause
alter_user_clause: {
PASS 'literal'
| ENABLE value
| SYSINFO value
}
```
- PASS:修改用户密码。
- ENABLE:修改用户是否启用。1表示启用此用户,0表示禁用此用户。
- SYSINFO:修改用户是否可查看系统信息。1表示可以查看系统信息,0表示不可以查看系统信息。
## 授权
```sql
GRANT privileges ON priv_level TO user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.*
| *.*
}
```
对用户授权。
授权级别支持到DATABASE,权限有READ和WRITE两种。
TDengine 有超级用户和普通用户两类用户。超级用户缺省创建为root,拥有所有权限。使用超级用户创建出来的用户为普通用户。在未授权的情况下,普通用户可以创建DATABASE,并拥有自己创建的DATABASE的所有权限,包括删除数据库、修改数据库、查询时序数据和写入时序数据。超级用户可以给普通用户授予其他DATABASE的读写权限,使其可以在此DATABASE上读写数据,但不能对其进行删除和修改数据库的操作。
对于非DATABASE的对象,如USER、DNODE、UDF、QNODE等,普通用户只有读权限(一般为SHOW命令),不能创建和修改。
## 撤销授权
```sql
REVOKE privileges ON priv_level FROM user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.*
| *.*
}
```
收回对用户的授权。
\ No newline at end of file
---
sidebar_label: 自定义函数
title: 用户自定义函数
---
除了 TDengine 的内置函数以外,用户还可以编写自己的函数逻辑并加入TDengine系统中。
## 创建函数
```sql
CREATE [AGGREGATE] FUNCTION func_name AS library_path OUTPUTTYPE type_name [BUFSIZE value]
```
语法说明:
AGGREGATE:标识此函数是标量函数还是聚集函数。
func_name:函数名,必须与函数实现中udfNormalFunc的实际名称一致。
library_path:包含UDF函数实现的动态链接库的绝对路径,是在客户端侧主机上的绝对路径。
OUTPUTTYPE:标识此函数的返回类型。
BUFSIZE:中间结果的缓冲区大小,单位是字节。不设置则默认为0。最大不可超过512字节。
关于如何开发自定义函数,请参考 [UDF使用说明](../../develop/udf)
## 删除自定义函数
```sql
DROP FUNCTION func_name
```
\ No newline at end of file
---
sidebar_label: 索引
title: 使用索引
---
TDengine 从 3.0.0.0 版本开始引入了索引功能,支持 SMA 索引和 FULLTEXT 索引。
## 创建索引
```sql
CREATE FULLTEXT INDEX index_name ON tb_name (col_name [, col_name] ...)
CREATE SMA INDEX index_name ON tb_name index_option
index_option:
FUNCTION(functions) INTERVAL(interval_val [, interval_offset]) [SLIDING(sliding_val)] [WATERMARK(watermark_val)] [MAX_DELAY(max_delay_val)]
functions:
function [, function] ...
```
### SMA 索引
对指定列按 INTERVAL 子句定义的时间窗口创建进行预聚合计算,预聚合计算类型由 functions_string 指定。SMA 索引能提升指定时间段的聚合查询的性能。目前,限制一个超级表只能创建一个 SMA INDEX。
- 支持的函数包括 MAX、MIN 和 SUM。
- WATERMARK: 最小单位毫秒,取值范围 [0ms, 900000ms],默认值为 5 秒,只可用于超级表。
- MAX_DELAY: 最小单位毫秒,取值范围 [1ms, 900000ms],默认值为 interval 的值(但不能超过最大值),只可用于超级表。注:不建议 MAX_DELAY 设置太小,否则会过于频繁的推送结果,影响存储和查询性能,如无特殊需求,取默认值即可。
### FULLTEXT 索引
对指定列建立文本索引,可以提升含有文本过滤的查询的性能。FULLTEXT 索引不支持 index_option 语法。现阶段只支持对 JSON 类型的标签列创建 FULLTEXT 索引。不支持多列联合索引,但可以为每个列分布创建 FULLTEXT 索引。
## 删除索引
```sql
DROP INDEX index_name;
```
## 查看索引
````sql
```sql
SHOW INDEXES FROM tbl_name [FROM db_name];
````
显示在所指定的数据库或表上已创建的索引。
---
sidebar_label: 异常恢复
title: 异常恢复
---
在一个复杂的应用场景中,连接和查询任务等有可能进入一种错误状态或者耗时过长迟迟无法结束,此时需要有能够终止这些连接或任务的方法。
## 终止连接
```sql
KILL CONNECTION conn_id;
```
conn_id 可以通过 `SHOW CONNECTIONS` 获取。
## 终止查询
```sql
SHOW QUERY query_id;
```
query_id 可以通过 `SHOW QUERIES` 获取。
## 终止事务
```sql
KILL TRANSACTION trans_id
```
trans_id 可以通过 `SHOW TRANSACTIONS` 获取。
## 重置客户端缓存
```sql
RESET QUERY CACHE;
```
如果在多客户端情况下出现元数据不同步的情况,可以用这条命令强制清空客户端缓存,随后客户端会从服务端拉取最新的元数据。
此差异已折叠。
---
sidebar_label: SHOW 命令
title: 使用 SHOW 命令查看系统元数据
---
除了使用 `select` 语句查询 `INFORMATION_SCHEMA` 数据库中的表获得系统中的各种元数据、系统信息和状态之外,也可以用 `SHOW` 命令来实现同样的目的。
## SHOW ACCOUNTS
```sql
SHOW ACCOUNTS;
```
显示当前系统中所有租户的信息。
注:企业版独有
## SHOW APPS
```sql
SHOW APPS;
```
显示接入集群的应用(客户端)信息。
## SHOW BNODES
```sql
SHOW BNODES;
```
显示当前系统中存在的 BNODE (backup node, 即备份节点)的信息。
## SHOW CLUSTER
```sql
SHOW CLUSTER;
```
显示当前集群的信息
## SHOW CONNECTIONS
```sql
SHOW CONNECTIONS;
```
显示当前系统中存在的连接的信息。
## SHOW CONSUMERS
```sql
SHOW CONSUMERS;
```
显示当前数据库下所有活跃的消费者的信息。
## SHOW CREATE DATABASE
```sql
SHOW CREATE DATABASE db_name;
```
显示 db_name 指定的数据库的创建语句。
## SHOW CREATE STABLE
```sql
SHOW CREATE STABLE [db_name.]stb_name;
```
显示 tb_name 指定的超级表的创建语句
## SHOW CREATE TABLE
```sql
SHOW CREATE TABLE [db_name.]tb_name
```
显示 tb_name 指定的表的创建语句。支持普通表、超级表和子表。
## SHOW DATABASES
```sql
SHOW DATABASES;
```
显示用户定义的所有数据库。
## SHOW DNODES
```sql
SHOW DNODES;
```
显示当前系统中 DNODE 的信息。
## SHOW FUNCTIONS
```sql
SHOW FUNCTIONS;
```
显示用户定义的自定义函数。
## SHOW LICENSE
```sql
SHOW LICENSE;
SHOW GRANTS;
```
显示企业版许可授权的信息。
注:企业版独有
## SHOW INDEXES
```sql
SHOW INDEXES FROM tbl_name [FROM db_name];
```
显示已创建的索引。
## SHOW LOCAL VARIABLES
```sql
SHOW LOCAL VARIABLES;
```
显示当前客户端配置参数的运行值。
## SHOW MNODES
```sql
SHOW MNODES;
```
显示当前系统中 MNODE 的信息。
## SHOW MODULES
```sql
SHOW MODULES;
```
显示当前系统中所安装的组件的信息。
## SHOW QNODES
```sql
SHOW QNODES;
```
显示当前系统中 QNODE (查询节点)的信息。
## SHOW SCORES
```sql
SHOW SCORES;
```
显示系统被许可授权的容量的信息。
注:企业版独有
## SHOW SNODES
```sql
SHOW SNODES;
```
显示当前系统中 SNODE (流计算节点)的信息。
## SHOW STABLES
```sql
SHOW [db_name.]STABLES [LIKE 'pattern'];
```
显示当前数据库下的所有超级表的信息。可以使用 LIKE 对表名进行模糊匹配。
## SHOW STREAMS
```sql
SHOW STREAMS;
```
显示当前系统内所有流计算的信息。
## SHOW SUBSCRIPTIONS
```sql
SHOW SUBSCRIPTIONS;
```
显示当前数据库下的所有的订阅关系
## SHOW TABLES
```sql
SHOW [db_name.]TABLES [LIKE 'pattern'];
```
显示当前数据库下的所有普通表和子表的信息。可以使用 LIKE 对表名进行模糊匹配。
## SHOW TABLE DISTRIBUTED
```sql
SHOW TABLE DISTRIBUTED table_name;
```
显示表的数据分布信息。
## SHOW TAGS
```sql
SHOW TAGS FROM child_table_name [FROM db_name];
```
显示子表的标签信息。
## SHOW TOPICS
```sql
SHOW TOPICS;
```
显示当前数据库下的所有主题的信息。
## SHOW TRANSACTIONS
```sql
SHOW TRANSACTIONS;
```
显示当前系统中正在执行的事务的信息
## SHOW USERS
```sql
SHOW USERS;
```
显示当前系统中所有用户的信息。包括用户自定义的用户和系统默认用户。
## SHOW VARIABLES
```sql
SHOW VARIABLES;
SHOW DNODE dnode_id VARIABLES;
```
显示当前系统中各节点需要相同的配置参数的运行值,也可以指定 DNODE 来查看其的配置参数。
## SHOW VGROUPS
```sql
SHOW [db_name.]VGROUPS;
```
显示当前系统中所有 VGROUP 或某个 db 的 VGROUPS 的信息。
## SHOW VNODES
```sql
SHOW VNODES [dnode_name];
```
显示当前系统中所有 VNODE 或某个 DNODE 的 VNODE 的信息。
...@@ -21,269 +21,133 @@ ...@@ -21,269 +21,133 @@
#include "taos.h" #include "taos.h"
static int running = 1; static int running = 1;
static void msg_process(TAOS_RES* msg) { static char dbName[64] = "tmqdb";
static char stbName[64] = "stb";
static char topicName[64] = "topicname";
static int32_t msg_process(TAOS_RES* msg) {
char buf[1024]; char buf[1024];
/*memset(buf, 0, 1024);*/ int32_t rows = 0;
printf("topic: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
tmq_raw_data raw = {0};
int32_t code = tmq_get_raw(msg, &raw);
if (code == 0) {
TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 5"); const char* topicName = tmq_get_topic_name(msg);
if (taos_errno(pRes) != 0) { const char* dbName = tmq_get_db_name(msg);
printf("error in create db, reason:%s\n", taos_errstr(pRes)); int32_t vgroupId = tmq_get_vgroup_id(msg);
return;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1"); printf("topic: %s\n", topicName);
if (taos_errno(pRes) != 0) { printf("db: %s\n", dbName);
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("vgroup id: %d\n", vgroupId);
return;
}
taos_free_result(pRes);
int32_t ret = tmq_write_raw(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret));
taos_close(pConn);
}
char* result = tmq_get_json_meta(msg);
if (result) {
printf("meta result: %s\n", result);
}
tmq_free_json_meta(result);
return;
}
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields); int32_t* length = taos_fetch_lengths(msg);
printf("%s\n", buf); int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg); const char* tbName = tmq_get_table_name(msg);
if (tbName) { rows++;
printf("from tb: %s\n", tbName); taos_print_row(buf, row, fields, numOfFields);
} printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf);
} }
return rows;
} }
int32_t init_env() { static int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
return -1; return -1;
} }
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 5"); TAOS_RES* pRes;
if (taos_errno(pRes) != 0) { // drop database if exists
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("create database\n");
return -1; pRes = taos_query(pConn, "drop database if exists tmqdb");
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
#if 0
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
if (taos_errno(pRes) != 0) {
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)"); // create database
pRes = taos_query(pConn, "create database tmqdb");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 rename column c3 cc3"); // create super table
printf("create super table\n");
pRes = taos_query(pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 comment 'hello'"); // create sub tables
printf("create sub tables\n");
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 drop column c1"); pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "drop table n1"); pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')"); // insert data
printf("insert data into sub tables\n");
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt2 using jt tags('')"); pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1"); pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
taos_close(pConn);
return 0; return 0;
} }
...@@ -295,179 +159,129 @@ int32_t create_topic() { ...@@ -295,179 +159,129 @@ int32_t create_topic() {
return -1; return -1;
} }
pRes = taos_query(pConn, "use abc1"); pRes = taos_query(pConn, "use tmqdb");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
// pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); // pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic2 as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
#if 0
pRes = taos_query(pConn, "insert into tu1 values(now, 1, 1.0, 'bi1')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu1 values(now+1d, 1, 1.0, 'bi1')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu2 values(now, 2, 2.0, 'bi2')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu2 values(now+1d, 2, 2.0, 'bi2')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
taos_close(pConn); taos_close(pConn);
return 0; return 0;
} }
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param); printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
} }
tmq_t* build_consumer() { tmq_t* build_consumer() {
#if 0 tmq_conf_res_t code;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2"); code = tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "client.id", "my app 1"); if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set(conf, "td.connect.user", "root"); code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set(conf, "msg.with.table.name", "true"); code = tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "enable.auto.commit", "true"); if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.user", "root");
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/ if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "msg.with.table.name", "true");
if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
return tmq; return tmq;
} }
tmq_list_t* build_topic_list() { tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topic_list, "topic_ctb_column"); int32_t code = tmq_list_append(topicList, "topicname");
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ if (code) {
return topic_list; return NULL;
}
return topicList;
} }
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
int32_t code; int32_t code;
if ((code = tmq_subscribe(tmq, topics))) { if ((code = tmq_subscribe(tmq, topicList))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
printf("subscribe err\n");
return; return;
} }
int32_t cnt = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t consumeDelay = 5000;
while (running) { while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, -1); TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, consumeDelay);
if (tmqmessage) { if (tmqmsg) {
cnt++; msgCnt++;
msg_process(tmqmessage); totalRows += msg_process(tmqmsg);
/*if (cnt >= 2) break;*/ taos_free_result(tmqmsg);
/*printf("get data\n");*/ } else {
taos_free_result(tmqmessage); break;
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
} }
} }
code = tmq_consumer_close(tmq); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
} }
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { int main(int argc, char* argv[]) {
static const int MIN_COMMIT_COUNT = 1;
int msg_count = 0;
int32_t code; int32_t code;
if ((code = tmq_subscribe(tmq, topics))) { if (init_env() < 0) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); return -1;
return;
} }
tmq_list_t* subList = NULL; if (create_topic() < 0) {
tmq_subscription(tmq, &subList); return -1;
char** subTopics = tmq_list_to_c_array(subList);
int32_t sz = tmq_list_get_size(subList);
printf("subscribed topics: ");
for (int32_t i = 0; i < sz; i++) {
printf("%s, ", subTopics[i]);
} }
printf("\n");
tmq_list_destroy(subList);
while (running) { tmq_t* tmq = build_consumer();
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); if (NULL == tmq) {
if (tmqmessage) { fprintf(stderr, "%% build_consumer() fail!\n");
msg_process(tmqmessage); return -1;
taos_free_result(tmqmessage); }
/*tmq_commit_sync(tmq, NULL);*/ tmq_list_t* topic_list = build_topic_list();
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ if (NULL == topic_list) {
return -1;
} }
basic_consume_loop(tmq, topic_list);
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
}
else {
fprintf(stderr, "%% unsubscribe\n");
} }
code = tmq_consumer_close(tmq); code = tmq_consumer_close(tmq);
if (code) if (code) {
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
if (argc > 1) {
printf("env init\n");
if (init_env() < 0) {
return -1;
} }
create_topic(); else {
fprintf(stderr, "%% Consumer closed\n");
} }
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list(); return 0;
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
} }
...@@ -202,11 +202,6 @@ typedef struct { ...@@ -202,11 +202,6 @@ typedef struct {
int8_t reserved; int8_t reserved;
} STaskSinkFetch; } STaskSinkFetch;
enum {
TASK_SOURCE__SCAN = 1,
TASK_SOURCE__PIPE,
};
enum { enum {
TASK_EXEC__NONE = 1, TASK_EXEC__NONE = 1,
TASK_EXEC__PIPE, TASK_EXEC__PIPE,
...@@ -225,11 +220,6 @@ enum { ...@@ -225,11 +220,6 @@ enum {
TASK_SINK__FETCH, TASK_SINK__FETCH,
}; };
enum {
TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
TASK_INPUT_TYPE__DATA_BLOCK,
};
enum { enum {
TASK_TRIGGER_STATUS__IN_ACTIVE = 1, TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
TASK_TRIGGER_STATUS__ACTIVE, TASK_TRIGGER_STATUS__ACTIVE,
......
...@@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeStreamThreads = tsNumOfCores; tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
......
...@@ -812,12 +812,14 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p ...@@ -812,12 +812,14 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
ASSERT(0);
goto _OVER; goto _OVER;
} }
// drop stream // drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
ASSERT(0);
goto _OVER; goto _OVER;
} }
} }
...@@ -832,6 +834,9 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p ...@@ -832,6 +834,9 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
code = 0; code = 0;
_OVER: _OVER:
if(code != 0) {
ASSERT(0);
}
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseVgroup(pMnode, pVgroup); mndReleaseVgroup(pMnode, pVgroup);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
...@@ -930,6 +935,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) { ...@@ -930,6 +935,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} else { } else {
terrno = TSDB_CODE_MND_SMA_NOT_EXIST; terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
ASSERT(0);
goto _OVER; goto _OVER;
} }
} }
......
...@@ -599,9 +599,11 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { ...@@ -599,9 +599,11 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
for (int i = 0; i < pSW->number; ++i) { for (int i = 0; i < pSW->number; ++i) {
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i); smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
if (metaGetTableEntryByUid(&mr, smaId) < 0) { if (metaGetTableEntryByUid(&mr, smaId) < 0) {
tDecoderClear(&mr.coder);
metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId); metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
continue; continue;
} }
tDecoderClear(&mr.coder);
pTSma = pSW->tSma + smaIdx; pTSma = pSW->tSma + smaIdx;
memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma)); memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
if (deepCopy) { if (deepCopy) {
......
...@@ -848,6 +848,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { ...@@ -848,6 +848,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
terrstr()); terrstr());
goto _err; goto _err;
} }
tDecoderClear(&mr.coder);
ASSERT(mr.me.type == TSDB_SUPER_TABLE); ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == suid); ASSERT(mr.me.uid == suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) { if (TABLE_IS_ROLLUP(mr.me.flags)) {
......
...@@ -83,7 +83,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -83,7 +83,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayClear(p->pDataBlock); taosArrayClear(p->pDataBlock);
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p); taosArrayPush(pInfo->pBlockLists, &p);
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else { } else {
...@@ -217,6 +216,8 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S ...@@ -217,6 +216,8 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
continue; continue;
} }
tDecoderClear(&mr.coder);
// TODO handle ntb case // TODO handle ntb case
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) { if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
continue; continue;
......
...@@ -758,6 +758,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con ...@@ -758,6 +758,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con
} }
} else if (mr.me.type == TSDB_CHILD_TABLE) { } else if (mr.me.type == TSDB_CHILD_TABLE) {
uint64_t suid = mr.me.ctbEntry.suid; uint64_t suid = mr.me.ctbEntry.suid;
tDecoderClear(&mr.coder);
code = metaGetTableEntryByUid(&mr, suid); code = metaGetTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr); qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr);
...@@ -2510,6 +2511,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -2510,6 +2511,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos); STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
int32_t code = metaGetTableEntryByUid(&mr, item->uid); int32_t code = metaGetTableEntryByUid(&mr, item->uid);
tDecoderClear(&mr.coder);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
......
...@@ -151,7 +151,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); ...@@ -151,7 +151,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
// meta section end // meta section end
// seek section // seek section
int walChangeWrite(SWal* pWal, int64_t ver); int64_t walChangeWrite(SWal* pWal, int64_t ver);
int walInitWriteFile(SWal* pWal); int walInitWriteFile(SWal* pWal);
// seek section end // seek section end
......
...@@ -74,7 +74,7 @@ int walInitWriteFile(SWal* pWal) { ...@@ -74,7 +74,7 @@ int walInitWriteFile(SWal* pWal) {
return 0; return 0;
} }
int walChangeWrite(SWal* pWal, int64_t ver) { int64_t walChangeWrite(SWal* pWal, int64_t ver) {
int code; int code;
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
......
...@@ -208,6 +208,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -208,6 +208,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
taosCloseFile(&pIdxFile); taosCloseFile(&pIdxFile);
taosCloseFile(&pLogFile); taosCloseFile(&pLogFile);
taosFsyncFile(pWal->pLogFile);
taosFsyncFile(pWal->pIdxFile);
walSaveMeta(pWal); walSaveMeta(pWal);
// unlock // unlock
......
...@@ -832,8 +832,9 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { ...@@ -832,8 +832,9 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
if (pNode) { if (pNode) {
SHashEntry *pe = pHashObj->hashList[slot]; SHashEntry *pe = pHashObj->hashList[slot];
uint16_t prevRef = atomic_load_16(&pNode->refCount); /*uint16_t prevRef = atomic_load_16(&pNode->refCount);*/
uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1); uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1);
#if 0
ASSERT(prevRef < afterRef); ASSERT(prevRef < afterRef);
// the reference count value is overflow, which will cause the delete node operation immediately. // the reference count value is overflow, which will cause the delete node operation immediately.
...@@ -845,6 +846,8 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { ...@@ -845,6 +846,8 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
} else { } else {
data = GET_HASH_NODE_DATA(pNode); data = GET_HASH_NODE_DATA(pNode);
} }
#endif
data = GET_HASH_NODE_DATA(pNode);
if (afterRef >= MAX_WARNING_REF_COUNT) { if (afterRef >= MAX_WARNING_REF_COUNT) {
uWarn("hash entry ref count is abnormally high: %d", afterRef); uWarn("hash entry ref count is abnormally high: %d", afterRef);
......
...@@ -199,25 +199,25 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3 ...@@ -199,25 +199,25 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_createDb_replica1.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_createDb_replica1.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica1_insertdatas.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica1_insertdatas.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica1_insertdatas_querys.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica1_insertdatas_querys.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_force_stop_all_dnodes.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_force_stop_all_dnodes.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_all_vnode.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_all_vnode.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_follower.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_follower.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_leader.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_leader.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_all_dnodes.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_all_dnodes.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_sync.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_sync.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync_force_stop.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync_force_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader_forece_stop.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader_forece_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_mnode3_insertdatas_querys.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_mnode3_insertdatas_querys.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower_force_stop.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower_force_stop.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader_force_stop.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader_force_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups_stopOne.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups_stopOne.py -N 4 -M 1
python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3 python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册