提交 30fa0e29 编写于 作者: W wenzhouwww@live.cn

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -119,7 +119,6 @@ TDengine的主要功能如下:
- [用 InfluxDB 开源的性能测试工具对比 InfluxDB 和 TDengine](https://www.taosdata.com/blog/2020/01/13/1105.html)
- [TDengine 与 OpenTSDB 对比测试](https://www.taosdata.com/blog/2019/08/21/621.html)
- [TDengine 与 Cassandra 对比测试](https://www.taosdata.com/blog/2019/08/14/573.html)
- [TDengine 与 InfluxDB 对比测试](https://www.taosdata.com/blog/2019/07/19/419.html)
- [TDengine VS InfluxDB ,写入性能大 PK !](https://www.taosdata.com/2021/11/05/3248.html)
- [TDengine 和 InfluxDB 查询性能对比测试报告](https://www.taosdata.com/2022/02/22/5969.html)
- [TDengine 与 InfluxDB、OpenTSDB、Cassandra、MySQL、ClickHouse 等数据库的对比测试报告](https://www.taosdata.com/downloads/TDengine_Testing_Report_cn.pdf)
---
title: 性能优化
---
因数据行 [update](/train-faq/faq/#update)、表删除、数据过期等原因,TDengine 的磁盘存储文件有可能出现数据碎片,影响查询操作的性能表现。从 2.1.3.0 版本开始,新增 SQL 指令 COMPACT 来启动碎片重整过程:
```sql
COMPACT VNODES IN (vg_id1, vg_id2, ...)
```
COMPACT 命令对指定的一个或多个 VGroup 启动碎片重整,系统会通过任务队列尽快安排重整操作的具体执行。COMPACT 指令所需的 VGroup id,可以通过 `SHOW VGROUPS;` 指令的输出结果获取;而且在 `SHOW VGROUPS;` 中会有一个 compacting 列,值为 2 时表示对应的 VGroup 处于排队等待进行重整的状态,值为 1 时表示正在进行碎片重整,为 0 时则表示并没有处于重整状态(未要求进行重整或已经完成重整)。
需要注意的是,碎片重整操作会大幅消耗磁盘 I/O。因此在重整进行期间,有可能会影响节点的写入和查询性能,甚至在极端情况下导致短时间的阻写。
## 存储参数优化
不同应用场景的数据往往具有不同的数据特征,比如保留天数、副本数、采集频次、记录大小、采集点的数量、压缩等都可完全不同。为获得在存储上的最高效率,TDengine 提供如下存储相关的系统配置参数(既可以作为 create database 指令的参数,也可以写在 taos.cfg 配置文件中用来设定创建新数据库时所采用的默认值):
| # | 配置参数名称 | 单位 | 含义 | **取值范围** | **缺省值** |
| --- | ------------ | ---- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------- | ---------- |
| 1 | days | 天 | 一个数据文件存储数据的时间跨度 | 1-3650 | 10 |
| 2 | keep | 天 | (可通过 alter database 修改)数据库中数据保留的天数。 | 1-36500 | 3650 |
| 3 | cache | MB | 内存块的大小 | 1-128 | 16 |
| 4 | blocks | | (可通过 alter database 修改)每个 VNODE(TSDB)中有多少个 cache 大小的内存块。因此一个 VNODE 使用的内存大小粗略为(cache \* blocks)。 | 3-10000 | 6 |
| 5 | quorum | | (可通过 alter database 修改)多副本环境下指令执行的确认数要求 | 1-2 | 1 |
| 6 | minRows | | 文件块中记录的最小条数 | 10-1000 | 100 |
| 7 | maxRows | | 文件块中记录的最大条数 | 200-10000 | 4096 |
| 8 | comp | | (可通过 alter database 修改)文件压缩标志位 | 0:关闭,1:一阶段压缩,2:两阶段压缩 | 2 |
| 9 | walLevel | | (作为 database 的参数时名为 wal;在 taos.cfg 中作为参数时需要写作 walLevel)WAL 级别 | 1:写 WAL,但不执行 fsync;2:写 WAL, 而且执行 fsync | 1 |
| 10 | fsync | 毫秒 | 当 wal 设置为 2 时,执行 fsync 的周期。设置为 0,表示每次写入,立即执行 fsync。 | | 3000 |
| 11 | replica | | (可通过 alter database 修改)副本个数 | 1-3 | 1 |
| 12 | precision | | 时间戳精度标识(2.1.2.0 版本之前、2.0.20.7 版本之前在 taos.cfg 文件中不支持此参数。)(从 2.1.5.0 版本开始,新增对纳秒时间精度的支持) | ms 表示毫秒,us 表示微秒,ns 表示纳秒 | ms |
| 13 | update | | 是否允许数据更新(从 2.1.7.0 版本开始此参数支持 0 ~ 2 的取值范围,在此之前取值只能是 [0, 1];而 2.0.8.0 之前的版本在 SQL 指令中不支持此参数。) | 0:不允许;1:允许更新整行;2:允许部分列更新。 | 0 |
| 14 | cacheLast | | (可通过 alter database 修改)是否在内存中缓存子表的最近数据(从 2.1.2.0 版本开始此参数支持 0 ~ 3 的取值范围,在此之前取值只能是 [0, 1];而 2.0.11.0 之前的版本在 SQL 指令中不支持此参数。)(2.1.2.0 版本之前、2.0.20.7 版本之前在 taos.cfg 文件中不支持此参数。) | 0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非 NULL 值;3:同时打开缓存最近行和列功能 | 0 |
对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine 允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述 SQL:
```sql
CREATE DATABASE demo DAYS 10 CACHE 32 BLOCKS 8 REPLICA 3 UPDATE 1;
```
该 SQL 创建了一个库 demo, 每个数据文件存储 10 天数据,内存块为 32 兆字节,每个 VNODE 占用 8 个内存块,副本数为 3,允许更新,而其他参数与系统配置完全一致。
一个数据库创建成功后,仅部分参数可以修改并实时生效,其余参数不能修改:
| **参数名** | **能否修改** | **范围** | **修改语法示例** |
| ----------- | ------------ | ---------------------------------------------------------- | -------------------------------------- |
| name | | | |
| create time | | | |
| ntables | | | |
| vgroups | | | |
| replica | **YES** | 在线 dnode 数目为:<br/>1:1-1;<br/>2:1-2;<br/>\>=3:1-3 | ALTER DATABASE <dbname\> REPLICA _n_ |
| quorum | **YES** | 1-2 | ALTER DATABASE <dbname\> QUORUM _n_ |
| days | | | |
| keep | **YES** | days-365000 | ALTER DATABASE <dbname\> KEEP _n_ |
| cache | | | |
| blocks | **YES** | 3-1000 | ALTER DATABASE <dbname\> BLOCKS _n_ |
| minrows | | | |
| maxrows | | | |
| wal | | | |
| fsync | | | |
| comp | **YES** | 0-2 | ALTER DATABASE <dbname\> COMP _n_ |
| precision | | | |
| status | | | |
| update | | | |
| cachelast | **YES** | 0 \| 1 \| 2 \| 3 | ALTER DATABASE <dbname\> CACHELAST _n_ |
**说明:**在 2.1.3.0 版本之前,通过 ALTER DATABASE 语句修改这些参数后,需要重启服务器才能生效。
TDengine 集群中加入一个新的 dnode 时,涉及集群相关的一些参数必须与已有集群的配置相同,否则不能成功加入到集群中。会进行校验的参数如下:
- numOfMnodes:系统中管理节点个数。默认值:3。(2.0 版本从 2.0.20.11 开始、2.1 及以上版本从 2.1.6.0 开始,numOfMnodes 默认值改为 1。)
- mnodeEqualVnodeNum: 一个 mnode 等同于 vnode 消耗的个数。默认值:4。
- offlineThreshold: dnode 离线阈值,超过该时间将导致该 dnode 从集群中删除。单位为秒,默认值:86400\*10(即 10 天)。
- statusInterval: dnode 向 mnode 报告状态时长。单位为秒,默认值:1。
- maxTablesPerVnode: 每个 vnode 中能够创建的最大表个数。默认值:1000000。
- maxVgroupsPerDb: 每个数据库中能够使用的最大 vgroup 个数。
- arbitrator: 系统中裁决器的 endpoint,缺省为空。
- timezone、locale、charset 的配置见客户端配置。(2.0.20.0 及以上的版本里,集群中加入新节点已不要求 locale 和 charset 参数取值一致)
- balance:是否启用负载均衡。0:否,1:是。默认值:1。
- flowctrl:是否启用非阻塞流控。0:否,1:是。默认值:1。
- slaveQuery:是否启用 slave vnode 参与查询。0:否,1:是。默认值:1。
- adjustMaster:是否启用 vnode master 负载均衡。0:否,1:是。默认值:1。
为方便调试,可通过 SQL 语句临时调整每个 dnode 的日志配置,系统重启后会失效:
```sql
ALTER DNODE <dnode_id> <config>
```
- dnode_id: 可以通过 SQL 语句"SHOW DNODES"命令获取
- config: 要调整的日志参数,在如下列表中取值
> resetlog 截断旧日志文件,创建一个新日志文件
> debugFlag < 131 | 135 | 143 > 设置 debugFlag 为 131、135 或者 143
例如:
```
alter dnode 1 debugFlag 135;
```
此差异已折叠。
---
sidebar_label: taosd 的设计
title: taosd的设计
---
逻辑上,TDengine 系统包含 dnode,taosc 和 App,dnode 是服务器侧执行代码 taosd 的一个运行实例,因此 taosd 是 TDengine 的核心,本文对 taosd 的设计做一简单的介绍,模块内的实现细节请见其他文档。
## 系统模块图
taosd 包含 rpc,dnode,vnode,tsdb,query,cq,sync,wal,mnode,http,monitor 等模块,具体如下图:
![modules.png](/img/architecture/modules.png)
taosd 的启动入口是 dnode 模块,dnode 然后启动其他模块,包括可选配置的 http,monitor 模块。taosc 或 dnode 之间交互的消息都是通过 rpc 模块进行,dnode 模块根据接收到的消息类型,将消息分发到 vnode 或 mnode 的消息队列,或由 dnode 模块自己消费。dnode 的工作线程(worker)消费消息队列里的消息,交给 mnode 或 vnode 进行处理。下面对各个模块做简要说明。
## RPC 模块
该模块负责 taosd 与 taosc,以及其他数据节点之间的通讯。TDengine 没有采取标准的 HTTP 或 gRPC 等第三方工具,而是实现了自己的通讯模块 RPC。
考虑到物联网场景下,数据写入的包一般不大,因此除支持 TCP 连接之外,RPC 还支持 UDP 连接。当数据包小于 15K 时,RPC 将采用 UDP 方式进行连接,否则将采用 TCP 连接。对于查询类的消息,RPC 不管包的大小,总是采取 TCP 连接。对于 UDP 连接,RPC 实现了自己的超时、重传、顺序检查等机制,以保证数据可靠传输。
RPC 模块还提供数据压缩功能,如果数据包的字节数超过系统配置参数 compressMsgSize,RPC 在传输中将自动压缩数据,以节省带宽。
为保证数据的安全和数据的 integrity,RPC 模块采用 MD5 做数字签名,对数据的真实性和完整性进行认证。
## DNODE 模块
该模块是整个 taosd 的入口,它具体负责如下任务:
- 系统的初始化,包括
- 从文件 taos.cfg 读取系统配置参数,从文件 dnodeCfg.json 读取数据节点的配置参数;
- 启动 RPC 模块,并建立起与 taosc 通讯的 server 连接,与其他数据节点通讯的 server 连接;
- 启动并初始化 dnode 的内部管理,该模块将扫描该数据节点已有的 vnode ,并打开它们;
- 初始化可配置的模块,如 mnode,http,monitor 等。
- 数据节点的管理,包括
- 定时的向 mnode 发送 status 消息,报告自己的状态;
- 根据 mnode 的指示,创建、改变、删除 vnode;
- 根据 mnode 的指示,修改自己的配置参数;
- 消息的分发、消费,包括
- 为每一个 vnode 和 mnode 的创建并维护一个读队列、一个写队列;
- 将从 taosc 或其他数据节点来的消息,根据消息类型,将其直接分发到不同的消息队列,或由自己的管理模块直接消费;
- 维护一个读的线程池,消费读队列的消息,交给 vnode 或 mnode 处理。为支持高并发,一个读线程(worker)可以消费多个队列的消息,一个读队列可以由多个 worker 消费;
- 维护一个写的线程池,消费写队列的消息,交给 vnode 或 mnode 处理。为保证写操作的序列化,一个写队列只能由一个写线程负责,但一个写线程可以负责多个写队列。
taosd 的消息消费由 dnode 通过读写线程池进行控制,是系统的中枢。该模块内的结构体图如下:
![dnode.png](/img/architecture/dnode.png)
## VNODE 模块
vnode 是一独立的数据存储查询逻辑单元,但因为一个 vnode 只能容许一个 DB ,因此 vnode 内部没有 account,DB,user 等概念。为实现更好的模块化、封装以及未来的扩展,它有很多子模块,包括负责存储的 TSDB,负责查询的 query,负责数据复制的 sync,负责数据库日志的的 WAL,负责连续查询的 cq(continuous query),负责事件触发的流计算的 event 等模块,这些子模块只与 vnode 模块发生关系,与其他模块没有任何调用关系。模块图如下:
![vnode.png](/img/architecture/vnode.png)
vnode 模块向下,与 dnodeVRead,dnodeVWrite 发生互动,向上,与子模块发生互动。它主要的功能有:
- 协调各个子模块的互动。各个子模块之间都不直接调用,都需要通过 vnode 模块进行;
- 对于来自 taosc 或 mnode 的写操作,vnode 模块将其分解为写日志(WAL),转发(sync),本地存储(TSDB)子模块的操作;
- 对于查询操作,分发到 query 模块进行。
一个数据节点里有多个 vnode,因此 vnode 模块是有多个运行实例的。每个运行实例是完全独立的。
vnode 与其子模块是通过 API 直接调用,而不是通过消息队列传递。而且各个子模块只与 vnode 模块有交互,不与 dnode,rpc 等模块发生任何直接关联。
## MNODE 模块
mnode 是整个系统的大脑,负责整个系统的资源调度,负责 meta data 的管理与存储。
一个运行的系统里,只有一个 mnode,但它有多个副本(由系统配置参数 numOfMnodes 控制)。这些副本分布在不同的 dnode 里,目的是保证系统的高可靠运行。副本之间的数据复制是采用同步而非异步的方式,以确保数据的一致性,确保数据不会丢失。这些副本会自动选举一个 Master,其他副本是 slave。所有数据更新类的操作,都只能在 master 上进行,而查询类的可以在 slave 节点上进行。代码实现上,同步模块与 vnode 共享,但 mnode 被分配一个特殊的 vgroup ID: 1,而且 quorum 大于 1。整个集群系统是由多个 dnode 组成的,运行的 mnode 的副本数不可能超过 dnode 的个数,但不会超过配置的副本数。如果某个 mnode 副本宕机一段时间,只要超过半数的 mnode 副本仍在运行,运行的 mnode 会自动根据整个系统的资源情况,在其他 dnode 里再启动一个 mnode,以保证运行的副本数。
各个 dnode 通过信息交换,保存有 mnode 各个副本的 End Point 列表,并向其中的 master 节点定时(间隔由系统配置参数 statusInterval 控制)发送 status 消息,消息体里包含该 dnode 的 CPU、内存、剩余存储空间、vnode 个数,以及各个 vnode 的状态(存储空间、原始数据大小、记录条数、角色等)。这样 mnode 就了解整个系统的资源情况,如果用户创建新的表,就可以决定需要在哪个 dnode 创建;如果增加或删除 dnode,或者监测到某 dnode 数据过热、或离线太长,就可以决定需要挪动那些 vnode,以实现负载均衡。
mnode 里还负责 account,user,DB,stable,table,vgroup,dnode 的创建、删除与更新。mnode 不仅把这些 entity 的 meta data 保存在内存,还做持久化存储。但为节省内存,各个表的标签值不保存在 mnode(保存在 vnode),而且子表不维护自己的 schema,而是与 stable 共享。为减小 mnode 的查询压力,taosc 会缓存 table、stable 的 schema。对于查询类的操作,各个 slave mnode 也可以提供,以减轻 master 压力。
## TSDB 模块
TSDB 模块是 vnode 中的负责快速高并发地存储和读取属于该 vnode 的表的元数据及采集的时序数据的引擎。除此之外,TSDB 还提供了表结构的修改、表标签值的修改等功能。TSDB 提供 API 供 vnode 和 query 等模块调用。TSDB 中存储了两类数据,1:元数据信息;2:时序数据
### 元数据信息
TSDB 中存储的元数据包含属于其所在的 vnode 中表的类型,schema 的定义等。对于超级表和超级表下的子表而言,又包含了 tag 的 schema 定义以及子表的 tag 值等。对于元数据信息而言,TSDB 就相当于一个全内存的 KV 型数据库,属于该 vnode 的表对象全部在内存中,方便快速查询表的信息。除此之外,TSDB 还对其中的子表,按照 tag 的第一列取值做了全内存的索引,大大加快了对于标签的过滤查询。TSDB 中的元数据的最新状态在落盘时,会以追加(append-only)的形式,写入到 meta 文件中。meta 文件只进行追加操作,即便是元数据的删除,也会以一条记录的形式写入到文件末尾。TSDB 也提供了对于元数据的修改操作,如表 schema 的修改,tag schema 的修改以及 tag 值的修改等。
### 时序数据
每个 TSDB 在创建时,都会事先分配一定量的内存缓冲区,且内存缓冲区的大小可配可修改。表采集的时序数据,在写入 TSDB 时,首先以追加的方式写入到分配的内存缓冲区中,同时建立基于时间戳的内存索引,方便快速查询。当内存缓冲区的数据积累到一定的程度时(达到内存缓冲区总大小的 1/3),则会触发落盘操作,将缓冲区中的数据持久化到硬盘文件上。时序数据在内存缓冲区中是以行(row)的形式存储的。
而时序数据在写入到 TSDB 的数据文件时,是以列(column)的形式存储的。TSDB 中的数据文件包含多个数据文件组,每个数据文件组中又包含 .head、.data 和 .last 三个文件,如(v2f1801.head、v2f1801.data、v2f1801.last)数据文件组。TSDB 中的数据文件组是按照时间跨度进行分片的,默认是 10 天一个文件组,且可通过配置文件及建库选项进行配置。分片的数据文件组又按照编号递增排列,方便快速定位某一时间段的时序数据,高效定位数据文件组。时序数据在 TSDB 的数据文件中是以块的形式进行列式存储的,每个块中只包含一张表的数据,且数据在一个块中是按照时间顺序递增排列的。在一个数据文件组中,.head 文件负责存储数据块的索引及统计信息,如每个块的位置,压缩算法,时间戳范围等。存储在 .head 文件中一张表的索引信息是按照数据块中存储的数据的时间递增排列的,方便进行折半查找等工作。.head 和 .last 文件是存储真实数据块的文件,若数据块中的数据累计到一定程度,则会写入 .data 文件中,否则,会写入 .last 文件中,等待下次落盘时合并数据写入 .data 文件中,从而大大减少文件中块的个数,避免数据的过度碎片化。
## Query 模块
该模块负责整体系统的查询处理。客户端调用该该模块进行 SQL 语法解析,并将查询或写入请求发送到 vnode ,同时负责针对超级表的查询进行二阶段的聚合操作。在 vnode 端,该模块调用 TSDB 模块读取系统中存储的数据进行查询处理。query 模块还定义了系统能够支持的全部查询函数,查询函数的实现机制与查询框架无耦合,可以在不修改查询流程的情况下动态增加查询函数。详细的设计请参见《TDengine 2.0 查询模块设计》。
## SYNC 模块
该模块实现数据的多副本复制,包括 vnode 与 mnode 的数据复制,支持异步和同步两种复制方式,以满足 meta data 与时序数据不同复制的需求。因为它为 mnode 与 vnode 共享,系统为 mnode 副本预留了一个特殊的 vgroup ID:1。因此 vnode group 的 ID 是从 2 开始的。
每个 vnode/mnode 模块实例会有一对应的 sync 模块实例,他们是一一对应的。详细设计请见[TDengine 2.0 数据复制模块设计](/tdinternal/replica/)
## WAL 模块
该模块负责将新插入的数据写入 write ahead log(WAL),为 vnode,mnode 共享。以保证服务器 crash 或其他故障,能从 WAL 中恢复数据。
每个 vnode/mnode 模块实例会有一对应的 WAL 模块实例,是完全一一对应的。WAL 的落盘操作由两个参数 walLevel,fsync 控制。看具体场景,如果要 100% 保证数据不会丢失,需要将 walLevel 配置为 2,fsync 设置为 0,每条数据插入请求,都会实时落盘后,才会给应用确认
## HTTP 模块
该模块负责处理系统对外的 RESTful 接口,可以通过配置,由 dnode 启动或停止 。(仅 2.2 及之前的版本中存在)
该模块将接收到的 RESTful 请求,做了各种合法性检查后,将其变成标准的 SQL 语句,通过 taosc 的异步接口,将请求发往整个系统中的任一 dnode 。收到处理后的结果后,再翻译成 HTTP 协议,返回给应用。
如果 HTTP 模块启动,就意味着启动了一个 taosc 的实例。任一一个 dnode 都可以启动该模块,以实现对 RESTful 请求的分布式处理。
## Monitor 模块
该模块负责检测一个 dnode 的运行状态,可以通过配置,由 dnode 启动或停止。原则上,每个 dnode 都应该启动一个 monitor 实例。
Monitor 采集 TDengine 里的关键操作,比如创建、删除、更新账号、表、库等,而且周期性的收集 CPU、内存、网络等资源的使用情况(采集周期由系统配置参数 monitorInterval 控制)。获得这些数据后,monitor 模块将采集的数据写入系统的日志库(DB 名字由系统配置参数 monitorDbName 控制)。
Monitor 模块使用 taosc 来将采集的数据写入系统,因此每个 monitor 实例,都有一个 taosc 运行实例。
---
title: TSZ 压缩算法
---
TSZ 压缩算法是 TDengine 为浮点数据类型提供更加丰富的压缩功能,可以实现浮点数的有损至无损全状态压缩,相比原来在 TDengine 中原有压缩算法,TSZ 压缩算法压缩选项更丰富,压缩率更高,即使切到无损状态下对浮点数压缩,压缩率也会比原来的压缩算法高一倍。
## 适合场景
TSZ 压缩算法压缩率比原来的要高,但压缩时间会更长,即开启 TSZ 压缩算法写入速度会有一些下降,通常情况下会有 20% 左右的下降。影响写入速度是因为需要更多的 CPU 计算,所以从原始数据到压缩好数据的交付时间变长,导致写入速度变慢。如果您的服务器 CPU 配置很高的话,这个影响会变小甚至没有。
另外如果设备产生了大量的高精度浮点数,存储占用的空间非常庞大,但实际使用并不需要那么高的精度时,可以通过 TSZ 压缩的有损压缩功能,把精度压缩至指定的长度,节约存储空间。
总结:采集到了大量浮点数,存储时占用空间过大或出有存储空间不足,需要超高压缩率的场景。
## 使用步骤
- 检查版本支持,2.4.0.10 及之后 TDengine 的版本都支持此功能
- 配置选项开启功能,在 TDengine 的配置文件 taos.cfg 增加一行以下内容,打开 TSZ 功能
```TSZ
lossyColumns float|double
```
- 根据自己需要配置其它选项,如果不配置都会按默认值处理。
- 重启服务,配置生效。
- 确认功能已开启,在服务启动过程中输出的信息如果有前面配置的内容,表明功能已生效:
```TSZ Test
02/22 10:49:27.607990 00002933 UTL lossyColumns float|double
```
## 注意事项
- 确认版本是否支持
- 除了服务器启动时的输出的配置成功信息外,不再会有其它的信息输出是使用的哪种压缩算法,可以通过配置前后数据库文件大小来比较效果
- 如果浮点数类型列较少,看整体数据文件大小效果会不太明显
- 此压缩产生的数据文件中浮点数据部分将不能被 2.4.0.10 以下的版本解析,即不向下兼容,使用时避免更换回旧版本,以免数据不能被读取出来。
- 在使用过程中允许反复开启和关闭 TSZ 压缩选项的操作,前后两种压缩算法产生的数据都能正常读取。
---
title: 物联网大数据
description: "物联网、工业互联网大数据的特点;物联网大数据平台应具备的功能和特点;通用大数据架构为什么不适合处理物联网数据;物联网、车联网、工业互联网大数据平台,为什么推荐使用 TDengine"
---
- [物联网、工业互联网大数据的特点](https://www.taosdata.com/blog/2019/07/09/105.html)
- [物联网大数据平台应具备的功能和特点](https://www.taosdata.com/blog/2019/07/29/542.html)
- [通用大数据架构为什么不适合处理物联网数据?](https://www.taosdata.com/blog/2019/07/09/107.html)
- [物联网、车联网、工业互联网大数据平台,为什么推荐使用 TDengine?](https://www.taosdata.com/blog/2019/07/09/109.html)
---
title: 视频教程
---
## 技术公开课
- [技术公开课:开源、高效的物联网大数据平台,TDengine 内核技术剖析](https://www.taosdata.com/blog/2020/12/25/2126.html)
## 视频教程
- [TDengine 视频教程 - 快速上手](https://www.taosdata.com/blog/2020/11/11/1941.html)
- [TDengine 视频教程 - 数据建模](https://www.taosdata.com/blog/2020/11/11/1945.html)
- [TDengine 视频教程 - 集群搭建](https://www.taosdata.com/blog/2020/11/11/1961.html)
- [TDengine 视频教程 - Go Connector](https://www.taosdata.com/blog/2020/11/11/1951.html)
- [TDengine 视频教程 - JDBC Connector](https://www.taosdata.com/blog/2020/11/11/1955.html)
- [TDengine 视频教程 - Node.js Connector](https://www.taosdata.com/blog/2020/11/11/1957.html)
- [TDengine 视频教程 - Python Connector](https://www.taosdata.com/blog/2020/11/11/1963.html)
- [TDengine 视频教程 - RESTful Connector](https://www.taosdata.com/blog/2020/11/11/1965.html)
- [TDengine 视频教程 - “零”代码运维监控](https://www.taosdata.com/blog/2020/11/11/1959.html)
## 微课堂
关注 TDengine 视频号, 有精心制作的微课堂。
<img src="/img/shi-pin-hao.png" width={350} />
......@@ -19,25 +19,24 @@ import InstallOnLinux from "../../14-reference/03-connector/\_windows_install.md
import VerifyLinux from "../../14-reference/03-connector/\_verify_linux.mdx";
import VerifyWindows from "../../14-reference/03-connector/\_verify_windows.mdx";
Any application programs running on any kind of platforms can access TDengine through the REST API provided by TDengine. For the details, please refer to [REST API](/reference/rest-api/). Besides, application programs can use the connectors of multiple programming languages to access TDengine, including C/C++, Java, Python, Go, Node.js, C#, and Rust. This chapter describes how to establish connection to TDengine and briefly introduce how to install and use connectors. For details about the connectors, please refer to [Connectors](/reference/connector/)
Any application programs running on any kind of platforms can access TDengine through the REST API provided by TDengine. For the details, please refer to [REST API](/reference/rest-api/). Besides, application programs can use the connectors of multiple programming languages to access TDengine, including C/C++, Java, Python, Go, Node.js, C#, and Rust. This chapter describes how to establish connection to TDengine and briefly introduces how to install and use connectors. For details about the connectors, please refer to [Connectors](/reference/connector/)
## Establish Connection
There are two ways for a connector to establish connections to TDengine:
1. Connection through the REST API provided by taosAdapter component, this way is called "REST connection" hereinafter.
1. Connection through the REST API provided by the taosAdapter component, this way is called "REST connection" hereinafter.
2. Connection through the TDengine client driver (taosc), this way is called "Native connection" hereinafter.
Either way, same or similar APIs are provided by connectors to access database or execute SQL statements, no obvious difference can be observed.
Key differences:
1. With REST connection, it's not necessary to install TDengine client driver (taosc), it's more friendly for cross-platform with the cost of 30% performance downgrade. When taosc has an upgrade, application does not need to make changes.
2. With native connection, full compatibility of TDengine can be utilized, like [Parameter Binding](/reference/connector/cpp#parameter-binding-api), [Subscription](/reference/connector/cpp#subscription-and-consumption-api), etc. But taosc has to be installed, some platforms may not be supported.
1. The TDengine client driver (taosc) has the highest performance with all the features of TDengine like [Parameter Binding](/reference/connector/cpp#parameter-binding-api), [Subscription](/reference/connector/cpp#subscription-and-consumption-api), etc.
2. The TDengine client driver (taosc) is not supported across all platforms, and applications built on taosc may need to be modified when updating taosc to newere versions.
3. The REST connection is more accessible with cross-platform support, however it results in a 30% performance downgrade.
## Install Client Driver taosc
If choosing to use native connection and the application is not on the same host as TDengine server, TDengine client driver taosc needs to be installed on the host where the application is. If choosing to use REST connection or the application is on the same host as server side, this step can be skipped. It's better to use same version of taosc as the server.
If you are choosing to use native connection and the application is not on the same host as TDengine server, the TDengine client driver taosc needs to be installed on the application host. If choosing to use the REST connection or the application is on the same host as TDengine server, this step can be skipped. It's better to use same version of taosc as the server.
### Install
......
......@@ -2,11 +2,11 @@
title: Data Model
---
The data model employed by TDengine is similar to relational database, you need to create databases and tables. For a specific application, the design of databases, STables (abbreviated for super table), and tables need to be considered. This chapter will explain the big picture without syntax details.
The data model employed by TDengine is similar to a relational database, you need to create databases and tables. Design the data model based on your own application scenarios and you should design the STable (abbreviation for super table) schema to fit your data. This chapter will explain the big picture without getting into syntax details.
## Create Database
The characteristics of data from different data collection points may be different, such as collection frequency, days to keep, number of replicas, data block size, whether it's allowed to update data, etc. For TDengine to operate with the best performance, it's strongly suggested to put the data with different characteristics into different databases because different storage policy can be set for each database. When creating a database, there are a lot of parameters that can be configured, such as the days to keep data, the number of replicas, the number of memory blocks, time precision, the minimum and maximum number of rows in each data block, compress or not, the time range of the data in single data file, etc. Below is an example of the SQL statement for creating a database.
The characteristics of data from different data collection points may be different, such as collection frequency, days to keep, number of replicas, data block size, whether it's allowed to update data, etc. For TDengine to operate with the best performance, it's strongly suggested to put the data with different characteristics into different databases because different storage policies can be set for each database. When creating a database, there are a lot of parameters that can be configured, such as the days to keep data, the number of replicas, the number of memory blocks, time precision, the minimum and maximum number of rows in each data block, compress or not, the time range of the data in single data file, etc. Below is an example of the SQL statement for creating a database.
```sql
CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 6 UPDATE 1;
......@@ -14,7 +14,7 @@ CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 6 UPDATE 1;
In the above SQL statement, a database named "power" will be created, the data in it will be kept for 365 days, which means the data older than 365 days will be deleted automatically, a new data file will be created every 10 days, the number of memory blocks is 6, data is allowed to be updated. For more details please refer to [Database](/taos-sql/database).
After creating a database, the current database in use can be switched using SQL command `USE`, for example below SQL statement switches the current database to `power`. Without current database specified, table name must be preceded with the corresponding database name.
After creating a database, the current database in use can be switched using SQL command `USE`, for example below SQL statement switches the current database to `power`. Without the current database specified, table name must be preceded with the corresponding database name.
```sql
USE power;
......@@ -23,14 +23,14 @@ USE power;
:::note
- Any table or STable must belong to a database. To create a table or STable, the database it belongs to must be ready.
- JOIN operation can't be performed tables from two different databases.
- JOIN operations can't be performed on tables from two different databases.
- Timestamp needs to be specified when inserting rows or querying historical rows.
:::
## Create STable
In a time-series application, there may be multiple kinds of data collection points. For example, in the electrical power system there are meters, transformers, bus bars, switches, etc. For easy and efficient aggregation of multiple tables, one STable needs to be created for each kind of data collection point. For example, for the meters in [table 1](/tdinternal/arch#model_table1), below SQL statement can be used to create the super table.
In a time-series application, there may be multiple kinds of data collection points. For example, in the electrical power system there are meters, transformers, bus bars, switches, etc. For easy and efficient aggregation of multiple tables, one STable needs to be created for each kind of data collection point. For example, for the meters in [table 1](/tdinternal/arch#model_table1), the below SQL statement can be used to create the super table.
```sql
CREATE STable meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
......@@ -41,11 +41,11 @@ If you are using versions prior to 2.0.15, the `STable` keyword needs to be repl
:::
Similar to creating a regular table, when creating a STable, name and schema need to be provided too. In the STable schema, the first column must be timestamp (like ts in the example), and other columns (like current, voltage and phase in the example) are the data collected. The type of a column can be integer, float, double, string ,etc. Besides, the schema for tags need to be provided, like location and groupId in the example. The type of a tag can be integer, float, string, etc. The static properties of a data collection point can be defined as tags, like the location, device type, device group ID, manager ID, etc. Tags in the schema can be added, removed or updated. Please refer to [STable](/taos-sql/stable) for more details.
Similar to creating a regular table, when creating a STable, the name and schema need to be provided. In the STable schema, the first column must be timestamp (like ts in the example), and the other columns (like current, voltage and phase in the example) are the data collected. The column type can be integer, float, double, string ,etc. Besides, the schema for tags need to be provided, like location and groupId in the example. The tag type can be integer, float, string, etc. The static properties of a data collection point can be defined as tags, like the location, device type, device group ID, manager ID, etc. Tags in the schema can be added, removed or updated. Please refer to [STable](/taos-sql/stable) for more details.
For each kind of data collection points, a corresponding STable must be created. There may be many STables in an application. For electrical power system, we need to create a STable respectively for meters, transformers, busbars, switches. There may be multiple kinds of data collection points on a single device, for example there may be one data collection point for electrical data like current and voltage and another point for environmental data like temperature, humidity and wind direction, multiple STables are required for such kind of device.
For each kind of data collection point, a corresponding STable must be created. There may be many STables in an application. For electrical power system, we need to create a STable respectively for meters, transformers, busbars, switches. There may be multiple kinds of data collection points on a single device, for example there may be one data collection point for electrical data like current and voltage and another point for environmental data like temperature, humidity and wind direction, multiple STables are required for such kind of device.
At most 4096 (or 1024 prior to version 2.1.7.0) columns are allowed in a STable. If there are more than 4096 of metrics to bo collected for a data collection point, multiple STables are required for such kind of data collection point. There can be multiple databases in system, while one or more STables can exist in a database.
At most 4096 (or 1024 prior to version 2.1.7.0) columns are allowed in a STable. If there are more than 4096 of metrics to be collected for a data collection point, multiple STables are required. There can be multiple databases in a system, while one or more STables can exist in a database.
## Create Table
......@@ -57,7 +57,7 @@ CREATE TABLE d1001 USING meters TAGS ("Beijing.Chaoyang", 2);
In the above SQL statement, "d1001" is the table name, "meters" is the STable name, followed by the value of tag "Location" and the value of tag "groupId", which are "Beijing.Chaoyang" and "2" respectively in the example. The tag values can be updated after the table is created. Please refer to [Tables](/taos-sql/table) for details.
In TDengine system, it's recommended to create a table for a data collection point via STable. Table created via STable is called subtable in some parts of TDengine document. All SQL commands applied on regular table can be applied on subtable.
In TDengine system, it's recommended to create a table for a data collection point via STable. A table created via STable is called subtable in some parts of the TDengine documentation. All SQL commands applied on regular tables can be applied on subtables.
:::warning
It's not recommended to create a table in a database while using a STable from another database as template.
......@@ -67,7 +67,7 @@ It's suggested to use the global unique ID of a data collection point as the tab
## Create Table Automatically
In some circumstances, it's not sure whether the table already exists when inserting rows. The table can be created automatically using the SQL statement below, and nothing will happen if the table already exist.
In some circumstances, it's unknown whether the table already exists when inserting rows. The table can be created automatically using the SQL statement below, and nothing will happen if the table already exist.
```sql
INSERT INTO d1001 USING meters TAGS ("Beijng.Chaoyang", 2) VALUES (now, 10.2, 219, 0.32);
......@@ -79,6 +79,6 @@ For more details please refer to [Create Table Automatically](/taos-sql/insert#a
## Single Column vs Multiple Column
Multiple columns data model is supported in TDengine. As long as multiple metrics are collected by same data collection point at same time, i.e. the timestamp are identical, these metrics can be put in single stable as columns. However, there is another kind of design, i.e. single column data model, a table is created for each metric, which means a STable is required for each kind of metric. For example, 3 STables are required for current, voltage and phase.
A multiple columns data model is supported in TDengine. As long as multiple metrics are collected by the same data collection point at the same time, i.e. the timestamp are identical, these metrics can be put in a single STable as columns. However, there is another kind of design, i.e. single column data model, a table is created for each metric, which means a STable is required for each kind of metric. For example, 3 STables are required for current, voltage and phase.
It's recommended to use multiple column data model as much as possible because it's better in the performance of inserting or querying rows. In some cases, however, the metrics to be collected vary frequently and correspondingly the STable schema needs to be changed frequently too. In such case, it's more convenient to use single column data model.
It's recommended to use a multiple column data model as much as possible because it's better in the performance of inserting or querying rows. In some cases, however, the metrics to be collected vary frequently and correspondingly the STable schema needs to be changed frequently too. In such case, it's more convenient to use single column data model.
---
title: Performance Optimization
---
After a TDengine cluster has been running for long enough time, because of updating data, deleting tables and deleting expired data, there may be fragments in data files and query performance may be impacted. To resolve the problem of fragments, from version 2.1.3.0 a new SQL command `COMPACT` can be used to defragment the data files.
```sql
COMPACT VNODES IN (vg_id1, vg_id2, ...)
```
`COMPACT` can be used to defragment one or more vgroups. The defragmentation work will be put in task queue for scheduling execution by TDengine. `SHOW VGROUPS` command can be used to get the vgroup ids to be used in `COMPACT` command. There is a column `compacting` in the output of `SHOW GROUPS` to indicate the compacting status of the vgroup: 2 means the vgroup is waiting in task queue for compacting, 1 means compacting is in progress, and 0 means the vgroup has nothing to do with compacting.
Please be noted that a lot of disk I/O is required for defragementation operation, during which the performance may be impacted significantly for data insertion and query, data insertion may be blocked shortly in extreme cases.
## Optimize Storage Parameters
The data in different use cases may have different characteristics, such as the days to keep, number of replicas, collection interval, record size, number of collection points, compression or not, etc. To achieve best efficiency in storage, the parameters in below table can be used, all of them can be either configured in `taos.cfg` as default configuration or in the command `create database`. For detailed definition of these parameters please refer to [Configuration Parameters](/reference/config/).
| # | Parameter | Unit | Definition | **Value Range** | **Default Value** |
| --- | --------- | ---- | ------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------- | ----------------- |
| 1 | days | Day | The time range of the data stored in a single data file | 1-3650 | 10 |
| 2 | keep | Day | The number of days the data is kept in the database | 1-36500 | 3650 |
| 3 | cache | MB | The size of each memory block | 1-128 | 16 |
| 4 | blocks | None | The number of memory blocks used by each vnode | 3-10000 | 6 |
| 5 | quorum | None | The number of required confirmation in case of multiple replicas | 1-2 | 1 |
| 6 | minRows | None | The minimum number of rows in a data file | 10-1000 | 100 |
| 7 | maxRows | None | The maximum number of rows in a daa file | 200-10000 | 4096 |
| 8 | comp | None | Whether to compress the data | 0:uncompressed; 1: One Phase compression; 2: Two Phase compression | 2 |
| 9 | walLevel | None | wal sync level (named as "wal" in create database ) | 1:wal enabled without fsync; 2:wal enabled with fsync | 1 |
| 10 | fsync | ms | The time to wait for invoking fsync when walLevel is set to 2; 0 means no wait | 3000 |
| 11 | replica | none | The number of replications | 1-3 | 1 |
| 12 | precision | none | Time precision | ms: millisecond; us: microsecond;ns: nanosecond | ms |
| 13 | update | none | Whether to allow updating data | 0: not allowed; 1: a row must be updated as whole; 2: a part of columns in a row can be updated | 0 |
| 14 | cacheLast | none | Whether the latest data of a table is cached in memory | 0: not cached; 1: the last row is cached; 2: the latest non-NULL value of each column is cached | 0 |
For a specific use case, there may be multiple kinds of data with different characteristics, it's best to put data with same characteristics in same database. So there may be multiple databases in a system while each database can be configured with different storage parameters to achieve best performance. The above parameters can be used when creating a database to override the default setting in configuration file.
```sql
CREATE DATABASE demo DAYS 10 CACHE 32 BLOCKS 8 REPLICA 3 UPDATE 1;
```
The above SQL statement creates a database named as `demo`, in which each data file stores data across 10 days, the size of each memory block is 32 MB and each vnode is allocated with 8 blocks, the replica is set to 3, update operation is allowed, and all other parameters not specified in the command follow the default configuration in `taos.cfg`.
Once a database is created, only some parameters can be changed and be effective immediately while others are can't.
| **Parameter** | **Alterable** | **Value Range** | **Syntax** |
| ------------- | ------------- | ---------------- | -------------------------------------- |
| name | | | |
| create time | | | |
| ntables | | | |
| vgroups | | | |
| replica | **YES** | 1-3 | ALTER DATABASE <dbname\> REPLICA _n_ |
| quorum | **YES** | 1-2 | ALTER DATABASE <dbname\> QUORUM _n_ |
| days | | | |
| keep | **YES** | days-365000 | ALTER DATABASE <dbname\> KEEP _n_ |
| cache | | | |
| blocks | **YES** | 3-1000 | ALTER DATABASE <dbname\> BLOCKS _n_ |
| minrows | | | |
| maxrows | | | |
| wal | | | |
| fsync | | | |
| comp | **YES** | 0-2 | ALTER DATABASE <dbname\> COMP _n_ |
| precision | | | |
| status | | | |
| update | | | |
| cachelast | **YES** | 0 \| 1 \| 2 \| 3 | ALTER DATABASE <dbname\> CACHELAST _n_ |
**Explanation:** Prior to version 2.1.3.0, `taosd` server process needs to be restarted for these parameters to take in effect if they are changed using `ALTER DATABASE`.
When trying to join a new dnode into a running TDengine cluster, all the parameters related to cluster in the new dnode configuration must be consistent with the cluster, otherwise it can't join the cluster. The parameters that are checked when joining a dnode are as below. For detailed definition of these parameters please refer to [Configuration Parameters](/reference/config/).
- numOfMnodes
- mnodeEqualVnodeNum
- offlineThreshold
- statusInterval
- maxTablesPerVnode
- maxVgroupsPerDb
- arbitrator
- timezone
- balance
- flowctrl
- slaveQuery
- adjustMaster
For the convenience of debugging, the log setting of a dnode can be changed temporarily. The temporary change will be lost once the server is restarted.
```sql
ALTER DNODE <dnode_id> <config>
```
- dnode_id: from output of "SHOW DNODES"
- config: the parameter to be changed, as below
- resetlog: close the old log file and create the new on
- debugFlag: 131 (INFO/ERROR/WARNING), 135 (DEBUG), 143 (TRACE)
For example
```
alter dnode 1 debugFlag 135;
```
---
title: IoT Big Data
description: "Characteristics of IoT Big Data, why general big data platform does not work well for IoT? The required features for an IoT Big Data Platform"
---
- [Characteristics of IoT Big Data](https://tdengine.com/2019/07/09/86.html)
- [Why don’t General Big Data Platforms Fit IoT Scenarios?](https://tdengine.com/2019/07/09/92.html)
- [Why TDengine is the Best Choice for IoT Big Data Processing?](https://tdengine.com/2019/07/09/94.html)
- [Why Redis, Kafka, Spark aren’t Needed if TDengine is Used in the IoT Platform?](https://tdengine.com/2019/07/09/96.html)
......@@ -1210,9 +1210,10 @@ typedef struct {
} SRetrieveMetaTableRsp;
typedef struct SExplainExecInfo {
uint64_t startupCost;
uint64_t totalCost;
double startupCost;
double totalCost;
uint64_t numOfRows;
uint32_t verboseLen;
void* verboseInfo;
} SExplainExecInfo;
......@@ -1221,6 +1222,18 @@ typedef struct {
SExplainExecInfo* subplanInfo;
} SExplainRsp;
typedef struct STableScanAnalyzeInfo {
uint64_t totalRows;
uint64_t totalCheckedRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockStatis;
uint32_t skipBlocks;
uint32_t filterOutBlocks;
double elapsedTime;
uint64_t filterTime;
} STableScanAnalyzeInfo;
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
......
......@@ -24,7 +24,7 @@ int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp);
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int64_t startTs);
int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
void qExplainFreeCtx(SExplainCtx *pCtx);
......@@ -17,6 +17,7 @@ serverName="taosd"
clientName="taos"
uninstallScript="rmtaos"
configFile="taos.cfg"
tarName="taos.tar.gz"
osType=Linux
pagMode=full
......@@ -242,6 +243,11 @@ function install_examples() {
function update_TDengine() {
# Start to update
if [ ! -e ${tarName} ]; then
echo "File ${tarName} does not exist"
exit 1
fi
tar -zxf ${tarName}
echo -e "${GREEN}Start to update ${productName} client...${NC}"
# Stop the client shell if running
if pidof ${clientName} &> /dev/null; then
......@@ -264,42 +270,49 @@ function update_TDengine() {
echo
echo -e "\033[44;32;1m${productName} client is updated successfully!${NC}"
rm -rf $(tar -tf ${tarName})
}
function install_TDengine() {
# Start to install
echo -e "${GREEN}Start to install ${productName} client...${NC}"
install_main_path
install_log
install_header
install_lib
install_jemalloc
if [ "$verMode" == "cluster" ]; then
install_connector
fi
install_examples
install_bin
install_config
# Start to install
if [ ! -e ${tarName} ]; then
echo "File ${tarName} does not exist"
exit 1
fi
tar -zxf ${tarName}
echo -e "${GREEN}Start to install ${productName} client...${NC}"
echo
echo -e "\033[44;32;1m${productName} client is installed successfully!${NC}"
install_main_path
install_log
install_header
install_lib
install_jemalloc
if [ "$verMode" == "cluster" ]; then
install_connector
fi
install_examples
install_bin
install_config
echo
echo -e "\033[44;32;1m${productName} client is installed successfully!${NC}"
rm -rf $(tar -tf ${tarName})
rm -rf $(tar -tf ${tarName})
}
## ==============================Main program starts from here============================
# Install or updata client and client
# if server is already install, don't install client
if [ -e ${bin_dir}/${serverName} ]; then
echo -e "\033[44;32;1mThere are already installed ${productName} server, so don't need install client!${NC}"
exit 0
fi
if [ -e ${bin_dir}/${serverName} ]; then
echo -e "\033[44;32;1mThere are already installed ${productName} server, so don't need install client!${NC}"
exit 0
fi
if [ -x ${bin_dir}/${clientName} ]; then
update_flag=1
update_TDengine
else
install_TDengine
fi
if [ -x ${bin_dir}/${clientName} ]; then
update_flag=1
update_TDengine
else
install_TDengine
fi
......@@ -567,7 +567,6 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -606,7 +605,7 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result(pRes);
for(int32_t i = 0; i < 100000; i += 20) {
for(int32_t i = 0; i < 1000000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
......@@ -626,7 +625,7 @@ TEST(testCase, projection_query_tables) {
printf("start to insert next table\n");
for(int32_t i = 0; i < 100000; i += 20) {
for(int32_t i = 0; i < 1000000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
......@@ -693,6 +692,8 @@ TEST(testCase, projection_query_stables) {
taos_close(pConn);
}
#endif
TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -705,7 +706,7 @@ TEST(testCase, agg_query_tables) {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "select tbname from st1");
pRes = taos_query(pConn, "explain analyze select count(*) from tu interval(1s)");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
......
......@@ -3318,9 +3318,11 @@ int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
if (tEncodeI32(&encoder, pRsp->numOfPlans) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
SExplainExecInfo *info = &pRsp->subplanInfo[i];
if (tEncodeU64(&encoder, info->startupCost) < 0) return -1;
if (tEncodeU64(&encoder, info->totalCost) < 0) return -1;
if (tEncodeDouble(&encoder, info->startupCost) < 0) return -1;
if (tEncodeDouble(&encoder, info->totalCost) < 0) return -1;
if (tEncodeU64(&encoder, info->numOfRows) < 0) return -1;
if (tEncodeU32(&encoder, info->verboseLen) < 0) return -1;
if (tEncodeBinary(&encoder, info->verboseInfo, info->verboseLen) < 0) return -1;
}
tEndEncode(&encoder);
......@@ -3341,9 +3343,11 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
if (pRsp->subplanInfo == NULL) return -1;
}
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].startupCost) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1;
if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].startupCost) < 0) return -1;
if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1;
if (tDecodeU32(&decoder, &pRsp->subplanInfo[i].verboseLen) < 0) return -1;
if (tDecodeBinary(&decoder, (uint8_t**) &pRsp->subplanInfo[i].verboseInfo, &pRsp->subplanInfo[i].verboseLen) < 0) return -1;
}
tEndDecode(&decoder);
......
......@@ -58,8 +58,14 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from mnode-sync queue", pMsg);
pMsg->info.node = pMgmt->pMnode;
mndProcessSyncMsg(pMsg);
int32_t code = mndProcessSyncMsg(pMsg);
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
......
......@@ -138,7 +138,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->dbId = pCreate->dbUid;
pCfg->szPage = pCreate->pageSize * 1024;
pCfg->szCache = pCreate->pages;
pCfg->szBuf = pCreate->buffer * 1024 * 1024;
pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
pCfg->isWeak = true;
pCfg->tsdbCfg.compression = pCreate->compression;
pCfg->tsdbCfg.precision = pCreate->precision;
......
......@@ -419,6 +419,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup,
topicObj.refConsumerCnt);
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
mndReleaseTopic(pMnode, pTopic);
......
......@@ -417,7 +417,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 2. redo log: subscribe and vg assignment
// subscribe
if (mndSetSubRedoLogs(pMnode, pTrans, pOutput->pSub) != 0) {
if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
goto REB_FAIL;
}
......@@ -479,6 +479,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
// TODO is that correct?
pTopic->refConsumerCnt = topicObj.refConsumerCnt;
mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup,
topicObj.refConsumerCnt);
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
}
}
......
......@@ -141,6 +141,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
terrno = TSDB_CODE_APP_ERROR;
}
rpcFreeCont(rsp.pCont);
if (code != 0) return code;
return pMgmt->errCode;
}
......
......@@ -681,6 +681,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
return -1;
}
sdbFreeRaw(pRaw);
mDebug("trans:%d, sync finished", pTrans->id);
return 0;
}
......
......@@ -346,95 +346,65 @@ void mndStop(SMnode *pMnode) {
}
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
void *ahandle = pMsg->info.ahandle;
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync);
assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pMnode->syncMgmt.sync);
SyncTerm currentTerm = syncGetMyTerm(pMnode->syncMgmt.sync);
SMsgHead *pHead = pMsg->pCont;
char logBuf[512];
char *syncNodeStr = sync2SimpleStr(pMnode->syncMgmt.sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
SRpcMsg *pRpcMsg = pMsg;
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
SMnode *pMnode = pMsg->info.node;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
int32_t code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
if (!syncEnvIsStart()) {
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
} else {
mError("==mndProcessSyncMsg== error msg type:%d", pRpcMsg->msgType);
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
if (pSyncNode == NULL) {
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
syncNodeRelease(pSyncNode);
char logBuf[512];
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else {
mError("==mndProcessSyncMsg== error syncEnv stop");
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
return ret;
return code;
}
int32_t mndProcessMsg(SRpcMsg *pMsg) {
......
......@@ -60,7 +60,7 @@ extern "C" {
#define EXPLAIN_GROUPS_FORMAT "groups=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
#define EXPLAIN_EXECINFO_FORMAT "cost=%" PRIu64 "..%" PRIu64 " rows=%" PRIu64
#define EXPLAIN_EXECINFO_FORMAT "cost=%.3f..%.3f rows=%" PRIu64
typedef struct SExplainGroup {
int32_t nodeNum;
......
......@@ -381,6 +381,35 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
// basic analyze output
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
EXPLAIN_ROW_NEW(level + 1, "I/O: ");
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
for (int32_t i = 0; i < nodeNum; ++i) {
SExplainExecInfo * execInfo = taosArrayGet(pResNode->pExecInfo, i);
STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
EXPLAIN_ROW_APPEND("total_blocks=%d", pScanInfo->totalBlocks);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND("load_blocks=%d", pScanInfo->loadBlocks);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND("load_block_SMAs=%d", pScanInfo->loadBlockStatis);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND("total_rows=%" PRIu64, pScanInfo->totalRows);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND("check_rows=%" PRIu64, pScanInfo->totalCheckedRows);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
......@@ -390,8 +419,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey,
pTblScanNode->scanRange.ekey);
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
......@@ -637,6 +665,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......
......@@ -86,43 +86,12 @@ typedef struct STableQueryInfo {
// SVariant tag;
} STableQueryInfo;
typedef enum {
QUERY_PROF_BEFORE_OPERATOR_EXEC = 0,
QUERY_PROF_AFTER_OPERATOR_EXEC,
QUERY_PROF_QUERY_ABORT
} EQueryProfEventType;
typedef struct {
EQueryProfEventType eventType;
int64_t eventTime;
union {
uint8_t operatorType; // for operator event
int32_t abortCode; // for query abort event
};
} SQueryProfEvent;
typedef struct {
uint8_t operatorType;
int64_t sumSelfTime;
int64_t sumRunTimes;
} SOperatorProfResult;
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct SFileBlockLoadRecorder {
uint64_t totalRows;
uint64_t totalCheckedRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockStatis;
uint32_t skipBlocks;
uint32_t filterOutBlocks;
uint64_t elapsedTime;
} SFileBlockLoadRecorder;
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
typedef struct STaskCostInfo {
int64_t created;
......@@ -152,8 +121,8 @@ typedef struct STaskCostInfo {
} STaskCostInfo;
typedef struct SOperatorCostInfo {
uint64_t openCost;
uint64_t totalCost;
double openCost;
double totalCost;
} SOperatorCostInfo;
// The basic query information extracted from the SQueryInfo tree to support the
......@@ -200,7 +169,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggS
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain);
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id
......@@ -264,14 +233,14 @@ enum {
};
typedef struct SOperatorFpSet {
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_get_explain_fn_t getExplainFn;
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn;
} SOperatorFpSet;
typedef struct SOperatorInfo {
......@@ -656,7 +625,7 @@ typedef struct SJoinOperatorInfo {
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain);
__optr_decode_fn_t decode, __optr_explain_fn_t explain);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
......@@ -775,10 +744,6 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
......
......@@ -30,13 +30,6 @@
#include "tlosertree.h"
#include "ttypes.h"
typedef struct STaskMgmt {
TdThreadMutex lock;
SCacheObj *qinfoPool; // query handle pool
int32_t vgId;
bool closed;
} STaskMgmt;
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL);
......@@ -131,7 +124,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
// error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pTaskInfo, ret);
pTaskInfo->code = ret;
cleanUpUdfs();
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
......@@ -141,16 +133,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (NULL == *pRes) {
*useconds = pTaskInfo->cost.elapsedTime;
}
......
......@@ -124,6 +124,8 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput)
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000)/1000.0;
if (pOperator->pTaskInfo != NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
}
......@@ -137,7 +139,7 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain) {
__optr_decode_fn_t decode, __optr_explain_fn_t explain) {
SOperatorFpSet fpSet = {
._openFn = openFn,
.getNextFn = nextFn,
......@@ -2140,102 +2142,6 @@ int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock
return pBlock->info.rows;
}
void publishOperatorProfEvent(SOperatorInfo* pOperator, EQueryProfEventType eventType) {
SQueryProfEvent event = {0};
event.eventType = eventType;
event.eventTime = taosGetTimestampUs();
event.operatorType = pOperator->operatorType;
// if (pQInfo->summary.queryProfEvents) {
// taosArrayPush(pQInfo->summary.queryProfEvents, &event);
// }
}
void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code) {
SQueryProfEvent event;
event.eventType = QUERY_PROF_QUERY_ABORT;
event.eventTime = taosGetTimestampUs();
event.abortCode = code;
if (pTaskInfo->cost.queryProfEvents) {
taosArrayPush(pTaskInfo->cost.queryProfEvents, &event);
}
}
typedef struct {
uint8_t operatorType;
int64_t beginTime;
int64_t endTime;
int64_t selfTime;
int64_t descendantsTime;
} SOperatorStackItem;
static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* event, SArray* opStack,
SHashObj* profResults) {
item->endTime = event->eventTime;
item->selfTime = (item->endTime - item->beginTime) - (item->descendantsTime);
for (int32_t j = 0; j < taosArrayGetSize(opStack); ++j) {
SOperatorStackItem* ancestor = taosArrayGet(opStack, j);
ancestor->descendantsTime += item->selfTime;
}
uint8_t operatorType = item->operatorType;
SOperatorProfResult* result = taosHashGet(profResults, &operatorType, sizeof(operatorType));
if (result != NULL) {
result->sumRunTimes++;
result->sumSelfTime += item->selfTime;
} else {
SOperatorProfResult opResult;
opResult.operatorType = operatorType;
opResult.sumSelfTime = item->selfTime;
opResult.sumRunTimes = 1;
taosHashPut(profResults, &(operatorType), sizeof(operatorType), &opResult, sizeof(opResult));
}
}
void calculateOperatorProfResults(void) {
// if (pQInfo->summary.queryProfEvents == NULL) {
// // qDebug("QInfo:0x%"PRIx64" query prof events array is null", pQInfo->qId);
// return;
// }
//
// if (pQInfo->summary.operatorProfResults == NULL) {
// // qDebug("QInfo:0x%"PRIx64" operator prof results hash is null", pQInfo->qId);
// return;
// }
SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem));
if (opStack == NULL) {
return;
}
#if 0
size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents);
SHashObj* profResults = pQInfo->summary.operatorProfResults;
for (int i = 0; i < size; ++i) {
SQueryProfEvent* event = taosArrayGet(pQInfo->summary.queryProfEvents, i);
if (event->eventType == QUERY_PROF_BEFORE_OPERATOR_EXEC) {
SOperatorStackItem opItem;
opItem.operatorType = event->operatorType;
opItem.beginTime = event->eventTime;
opItem.descendantsTime = 0;
taosArrayPush(opStack, &opItem);
} else if (event->eventType == QUERY_PROF_AFTER_OPERATOR_EXEC) {
SOperatorStackItem* item = taosArrayPop(opStack);
assert(item->operatorType == event->operatorType);
doOperatorExecProfOnce(item, event, opStack, profResults);
} else if (event->eventType == QUERY_PROF_QUERY_ABORT) {
SOperatorStackItem* item;
while ((item = taosArrayPop(opStack)) != NULL) {
doOperatorExecProfOnce(item, event, opStack, profResults);
}
}
}
#endif
taosArrayDestroy(opStack);
}
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
STaskCostInfo* pSummary = &pTaskInfo->cost;
......@@ -2268,15 +2174,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
if (pSummary->operatorProfResults) {
SOperatorProfResult* opRes = taosHashIterate(pSummary->operatorProfResults, NULL);
while (opRes != NULL) {
// qDebug("QInfo:0x%" PRIx64 " :cost summary: operator : %d, exec times: %" PRId64 ", self time: %" PRId64,
// pQInfo->qId, opRes->operatorType, opRes->sumRunTimes, opRes->sumSelfTime);
opRes = taosHashIterate(pSummary->operatorProfResults, opRes);
}
}
}
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
......@@ -3528,14 +3425,13 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SOperatorInfo* downstream = pOperator->pDownstream[0];
int64_t st = taosGetTimestampUs();
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -3581,6 +3477,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0;
return TSDB_CODE_SUCCESS;
}
......@@ -3595,6 +3493,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
doSetOperatorCompleted(pOperator);
return NULL;
}
......@@ -3604,7 +3503,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator);
}
return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL;
size_t rows = blockDataGetNumOfRows(pInfo->pRes);//pInfo->pRes : NULL;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pInfo->pRes;
}
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
......@@ -3830,22 +3732,25 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
#endif
int64_t st = 0;
int32_t order = 0;
int32_t scanFlag = 0;
if (pOperator->cost.openCost == 0) {
st = taosGetTimestampUs();
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
doSetOperatorCompleted(pOperator);
break;
}
#if 0
// Return result of the previous group in the firstly.
if (false) {
if (pRes->info.rows > 0) {
......@@ -3855,6 +3760,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs);
}
}
#endif
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
......@@ -3881,8 +3787,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pProjectInfo->curOutput += pInfo->pRes->info.rows;
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs);
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
size_t rows = pInfo->pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st)/ 1000.0;
}
return (rows > 0)? pInfo->pRes:NULL;
}
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
......@@ -3939,10 +3851,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) {
assert(pBlock != NULL);
}
......@@ -5222,16 +5131,21 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
}
}
(*pRes)[*resNum].numOfRows = operatorInfo->resultInfo.totalRows;
(*pRes)[*resNum].startupCost = operatorInfo->cost.openCost;
(*pRes)[*resNum].totalCost = operatorInfo->cost.totalCost;
SExplainExecInfo* pInfo = &(*pRes)[*resNum];
pInfo->numOfRows = operatorInfo->resultInfo.totalRows;
pInfo->startupCost = operatorInfo->cost.openCost;
pInfo->totalCost = operatorInfo->cost.totalCost;
if (operatorInfo->fpSet.getExplainFn) {
int32_t code = (*operatorInfo->fpSet.getExplainFn)(operatorInfo, &(*pRes)->verboseInfo);
int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen);
if (code) {
qError("operator getExplainFn failed, error:%s", tstrerror(code));
qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
return code;
}
} else {
pInfo->verboseLen = 0;
pInfo->verboseInfo = NULL;
}
++(*resNum);
......
......@@ -270,24 +270,29 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
}
return (pRes->info.rows == 0)? NULL:pRes;
}
int32_t order = TSDB_ORDER_ASC;
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->pScalarExprInfo != NULL) {
......@@ -297,7 +302,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
doHashGroupbyAgg(pOperator, pBlock);
}
......@@ -319,7 +323,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
break;
}
......@@ -328,7 +332,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
}
return (pRes->info.rows == 0)? NULL:pRes;
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pRes;
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
......@@ -574,9 +581,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......
......@@ -98,9 +98,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
// todo extract method
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
SOperatorInfo* ds1 = pOperator->pDownstream[0];
publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC);
pJoinInfo->leftPos = 0;
if (pJoinInfo->pLeft == NULL) {
......@@ -111,9 +109,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
SOperatorInfo* ds2 = pOperator->pDownstream[1];
publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC);
pJoinInfo->rightPos = 0;
if (pJoinInfo->pRight == NULL) {
......
......@@ -253,9 +253,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
addTagPseudoColumnData(pTableScanInfo, pBlock);
}
// todo record the filter time cost
int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
int64_t et = taosGetTimestampMs();
pTableScanInfo->readRecorder.filterTime += (et - st);
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
......@@ -347,6 +350,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
int64_t st = taosGetTimestampUs();
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
......@@ -366,6 +371,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue;
}
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st)/1000.0;
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
return pBlock;
}
......@@ -452,6 +461,15 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
return interval;
}
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
STableScanInfo* pTableScanInfo = pOptr->info;
*pRecorder = pTableScanInfo->readRecorder;
*pOptrExplain = pRecorder;
*len = sizeof(SFileBlockLoadRecorder);
return 0;
}
static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
taosMemoryFree(pTableScanInfo->pResBlock);
......@@ -509,14 +527,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, NULL);
static int32_t cost = 0;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo);
// for non-blocking operator, the open cost is always 0
pOperator->cost.openCost = 0;
pOperator->cost.totalCost = ++cost;
pOperator->resultInfo.totalRows = ++cost;
return pOperator;
}
......@@ -1604,18 +1618,20 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
STR_TO_VARSTR(str, mr.me.name);
colDataAppend(pDst, count, str, false);
} else { // it is a tag value
if(pDst->info.type == TSDB_DATA_TYPE_JSON){
const uint8_t *tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return NULL;
if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
const uint8_t* tmp = mr.me.ctbEntry.pTags;
// TODO opt perf by realloc memory
char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if (data == NULL) {
qError("%s failed to malloc memory, size:%d", GET_TASKID(pTaskInfo), kvRowLen(tmp) + 1);
longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp));
memcpy(data + 1, tmp, kvRowLen(tmp));
colDataAppend(pDst, count, data, false);
taosMemoryFree(data);
}else{
} else {
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
colDataAppend(pDst, count, p, (p == NULL));
}
......
......@@ -782,13 +782,11 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
int32_t scanFlag = MAIN_SCAN;
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -821,6 +819,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
closeAllResultRows(&pInfo->binfo.resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order);
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return TSDB_CODE_SUCCESS;
}
......@@ -946,10 +946,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -998,7 +995,10 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator);
}
return pBlock->info.rows == 0 ? NULL : pBlock;
size_t rows = pBlock->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pBlock;
}
}
......@@ -1092,10 +1092,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SArray* pUpdated = NULL;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -1426,9 +1423,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -1473,9 +1468,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -1703,12 +1696,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
......
......@@ -318,11 +318,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
sTrace("syncPropose msgType:%d ", pMsg->msgType);
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
rpcFreeCont(pMsg->pCont);
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) return TAOS_SYNC_PROPOSE_OTHER_ERROR;
assert(rid == pSyncNode->rid);
......@@ -335,14 +332,13 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) {
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
ret = TAOS_SYNC_PROPOSE_SUCCESS;
} else {
sTrace("syncPropose pSyncNode->FpEqMsg is NULL");
}
syncClientRequestDestroy(pSyncMsg);
ret = TAOS_SYNC_PROPOSE_SUCCESS;
} else {
sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
......
......@@ -210,6 +210,7 @@ void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) {
SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncTimeout* pMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -436,6 +437,7 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -695,6 +697,7 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) {
SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncPingReply* pMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -861,6 +864,7 @@ void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg
// step 3. RpcMsg => SyncClientRequest, from queue
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncClientRequest* pMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -986,6 +990,7 @@ void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) {
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -1134,6 +1139,7 @@ void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -1281,6 +1287,7 @@ void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg
SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -1444,6 +1451,7 @@ void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesR
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......
......@@ -68,6 +68,7 @@
./test.sh -f tsim/stream/basic1.sim
# ---- transaction
./test.sh -f tsim/trans/lossdata1.sim
./test.sh -f tsim/trans/create_db.sim
# ---- tmq
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======= backup sdbdata
system sh/exec.sh -n dnode1 -s stop
system cp ../../../../sim/dnode1/data/mnode/data/sdb.data ../../../../sim/dnode1/data/mnode/data/sdb.data.bak1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== create user1
sql create user user1 PASS 'user1'
sql create user user2 PASS 'user2'
sql show users
if $rows != 3 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop
print ======= restore backup data
system cp ../../../../sim/dnode1/data/mnode/data/sdb.data.bak1 ../../../../sim/dnode1/data/mnode/data/sdb.data
system sh/exec.sh -n dnode1 -s start
sql connect
sql show users
if $rows != 3 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop
\ No newline at end of file
......@@ -93,7 +93,7 @@ class TDTestCase:
tdLog.info(shellCmd)
os.system(shellCmd)
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName)
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
......@@ -147,8 +147,7 @@ class TDTestCase:
parameterDict["dbName"],\
parameterDict["vgroups"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"])
parameterDict["ctbNum"])
self.insert_data(tsql,\
parameterDict["dbName"],\
......@@ -322,6 +321,75 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase2a(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 2a: Produce while two consumers to subscribe one db, inclue 1 stb")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2a', \
'vgroups': 4, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.initConsumerTable()
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
tdSql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict['dbName'], parameterDict['stbName']))
tdLog.info("create topics from db")
topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 1
keyList = 'group.id:cgrp2,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 10
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
expectRows = 2
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt * 2:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 2a end ...... ")
def tmqCase3(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb")
tdLog.info("step 1: create database, stb, ctb and insert data")
......@@ -745,6 +813,7 @@ class TDTestCase:
self.tmqCase1(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath)
self.tmqCase2a(cfgPath, buildPath)
self.tmqCase3(cfgPath, buildPath)
self.tmqCase4(cfgPath, buildPath)
self.tmqCase5(cfgPath, buildPath)
......
......@@ -360,7 +360,7 @@ class TDTestCase:
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'rowsPerTbl': 50000, \
'batchNum': 13, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
......@@ -391,13 +391,13 @@ class TDTestCase:
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
time.sleep(2)
time.sleep(5)
tdLog.info("drop som child table of stb1")
dropTblNum = 4
tdSql.query("drop table if exists %s.%s_9"%(parameterDict["dbName"], parameterDict["stbName"]))
tdSql.query("drop table if exists %s.%s_8"%(parameterDict["dbName"], parameterDict["stbName"]))
tdSql.query("drop table if exists %s.%s_7"%(parameterDict["dbName"], parameterDict["stbName"]))
tdSql.query("drop table if exists %s.%s_1"%(parameterDict["dbName"], parameterDict["stbName"]))
tdSql.query("drop table if exists %s.%s_2"%(parameterDict["dbName"], parameterDict["stbName"]))
tdSql.query("drop table if exists %s.%s_3"%(parameterDict["dbName"], parameterDict["stbName"]))
tdSql.query("drop table if exists %s.%s_4"%(parameterDict["dbName"], parameterDict["stbName"]))
tdLog.info("drop some child tables, then start to check consume result")
expectRows = 1
......@@ -1380,14 +1380,6 @@ class TDTestCase:
self.tmqCase3(cfgPath, buildPath)
self.tmqCase4(cfgPath, buildPath)
self.tmqCase5(cfgPath, buildPath)
#self.tmqCase6(cfgPath, buildPath)
#self.tmqCase7(cfgPath, buildPath)
#self.tmqCase8(cfgPath, buildPath)
#self.tmqCase9(cfgPath, buildPath)
#self.tmqCase10(cfgPath, buildPath)
#self.tmqCase11(cfgPath, buildPath)
#self.tmqCase12(cfgPath, buildPath)
#self.tmqCase13(cfgPath, buildPath)
def stop(self):
tdSql.close()
......
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class actionType(Enum):
CREATE_DATABASE = 0
CREATE_STABLE = 1
CREATE_CTABLE = 2
INSERT_DATA = 3
class TDTestCase:
hostname = socket.gethostname()
#rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
#tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def newcur(self,cfg,host,port):
user = "root"
password = "taosdata"
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
cur=con.cursor()
print(cur)
return cur
def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
def initConsumerInfoTable(self,cdbName='cdb'):
tdLog.info("drop consumeinfo table")
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql)
tdSql.query(sql)
def selectConsumeResult(self,expectRows,cdbName='cdb'):
resultList=[]
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == expectRows:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3))
return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
shellCmd = 'nohup '
if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, dbName,stbName):
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName))
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
return
def create_ctables(self,tsql, dbName,stbName,ctbNum):
tsql.execute("use %s" %dbName)
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
if (i > 0) and (i%100 == 0):
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs == 0:
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
rowsOfSql += 1
if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsOfSql = 0
if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareEnv(self, **parameterDict):
# create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
if parameterDict["actionType"] == actionType.CREATE_DATABASE:
self.create_database(tsql, parameterDict["dbName"])
elif parameterDict["actionType"] == actionType.CREATE_STABLE:
self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"])
elif parameterDict["actionType"] == actionType.CREATE_CTABLE:
self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
elif parameterDict["actionType"] == actionType.INSERT_DATA:
self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],parameterDict["batchNum"])
else:
tdLog.exit("not support's action: ", parameterDict["actionType"])
return
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 1: ")
self.initConsumerTable()
auotCtbNum = 5
auotCtbPrefix = 'autoCtb'
# create and start thread
parameterDict = {'cfg': '', \
'actionType': 0, \
'dbName': 'db1', \
'dropFlag': 1, \
'vgroups': 4, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * (auotCtbNum + parameterDict["ctbNum"])
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 100
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
# add some new child tables using auto ctreating mode
time.sleep(1)
for index in range(auotCtbNum):
tdSql.query("create table %s.%s_%d using %s.%s tags(%d)"%(parameterDict["dbName"], auotCtbPrefix, index, parameterDict["dbName"], parameterDict["stbName"], index))
self.insert_data(tdSql,parameterDict["dbName"],auotCtbPrefix,auotCtbNum,parameterDict["rowsPerTbl"],parameterDict["batchNum"])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 2: ")
self.initConsumerTable()
auotCtbNum = 10
auotCtbPrefix = 'autoCtb'
# create and start thread
parameterDict = {'cfg': '', \
'actionType': 0, \
'dbName': 'db2', \
'dropFlag': 1, \
'vgroups': 4, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"])
self.create_stable(tdSql, parameterDict["dbName"], 'stb2')
tdLog.info("create topics from stb0/stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb2 = 'topic_stb2'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict['dbName'], 'stb2'))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * (auotCtbNum + parameterDict["ctbNum"])
topicList = '%s, %s'%(topicFromStb1,topicFromStb2)
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 100
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
# add some new child tables using auto ctreating mode
time.sleep(1)
for index in range(auotCtbNum):
tdSql.query("create table %s.%s_%d using %s.%s tags(%d)"%(parameterDict["dbName"], auotCtbPrefix, index, parameterDict["dbName"], 'stb2', index))
self.insert_data(tdSql,parameterDict["dbName"],auotCtbPrefix,auotCtbNum,parameterDict["rowsPerTbl"],parameterDict["batchNum"])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -66,3 +66,5 @@ python3 ./test.py -f 7-tmq/subscribeDb.py
python3 ./test.py -f 7-tmq/subscribeDb1.py
python3 ./test.py -f 7-tmq/subscribeStb.py
python3 ./test.py -f 7-tmq/subscribeStb1.py
python3 ./test.py -f 7-tmq/subscribeStb2.py
Subproject commit 2c4a1c83322b983881aea93ec2b51e7df826125a
Subproject commit 0aad27d725f4ee6b18daf1db0c07d933aed16eea
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册