提交 f03f6b8e 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

......@@ -68,6 +68,7 @@ public class SubscribeDemo {
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
......
......@@ -28,7 +28,8 @@ function runConsumer() {
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
consumer.commit(msg);
// fixme(@xiaolei): commented temp, should be fixed.
//consumer.commit(msg);
console.log(`=======consumer ${i} done`)
}
......@@ -48,4 +49,4 @@ try {
cursor.close();
conn.close();
}, 2000);
}
\ No newline at end of file
}
......@@ -13,7 +13,7 @@ title: 通过 Docker 快速体验 TDengine
如果已经安装了 docker, 只需执行下面的命令。
```shell
docker run -d -p 6030:6030 -p 6041/6041 -p 6043-6049/6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine
docker run -d -p 6030:6030 -p 6041:6041 -p 6043-6049:6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine
```
注意:TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。
......
......@@ -29,6 +29,7 @@ echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" |
如果安装 Beta 版需要安装包仓库
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
```
......
---
sidebar_label: 数据订阅
description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
title: 数据订阅
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保 at least once 消费。
为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
## 主要数据结构和API
TMQ 的 API 中,与订阅相关的主要数据结构和API如下:
```c
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(int32_t code);
DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
};
typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
```
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码。
## 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
```sql
drop database if exists tmqdb;
create database tmqdb;
create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16) tags(t1 int, t3 varchar(16));
create table tmqdb.ctb0 using tmqdb.stb tags(0, "subtable0");
create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1");
insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
```
## 创建topic:
```sql
create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
```
TMQ支持多种订阅类型:
### 列订阅
语法:CREATE TOPIC topic_name as subquery
通过select语句订阅(包括select *,或select ts, c1等指定列描述订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)
- TOPIC一旦创建则schema确定
- 被订阅或用于计算的column和tag不可被删除、修改
- 若发生schema变更,新增的column不出现在结果中
### 超级表订阅
语法:CREATE TOPIC topic_name AS STABLE stbName
与select * from stbName订阅的区别是:
- 不会限制用户的schema变更
- 返回的是非结构化的数据:返回数据的schema会随之超级表的schema变化而变化
- 用户对于要处理的每一个数据块都可能有不同的schema,因此,必须重新获取schema
- 返回数据不带有tag
## 创建 consumer 以及consumer group
对于consumer, 目前支持的config包括:
| 参数名称 | 参数值 | 备注 |
| ---------------------------- | ------------------------------ | ------------------------------------------------------ |
| group.id | 最大长度:192 | |
| enable.auto.commit | 合法值:true, false | |
| auto.commit.interval.ms | | |
| auto.offset.reset | 合法值:earliest, latest, none | |
| td.connect.ip | 用于连接,同taos_connect的参数 | |
| td.connect.user | 用于连接,同taos_connect的参数 | |
| td.connect.pass | 用于连接,同taos_connect的参数 | |
| td.connect.port | 用于连接,同taos_connect的参数 | |
| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即consumer不会因为长时间不poll而认为离线 |
| experimental.snapshot.enable | 合法值:true, false | 从wal开始消费,还是从tsbs开始消费 |
| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 |
```sql
/* 根据需要,设置消费组(group.id)、自动提交(enable.auto.commit)、自动提交时间间隔(auto.commit.interval.ms)、用户名(td.connect.user)、密码(td.connect.pass)等参数 */
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
```
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
## 创建 topic 列表
单个consumer支持同时订阅多个topic。
```sql
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
```
## 启动订阅并开始消费
```
/* 启动订阅 */
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
/* 循环poll消息 */
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
msg_process(tmqmsg);
}
```
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。
## 结束消费
```sql
/* 取消订阅 */
tmq_unsubscribe(tmq);
/* 关闭消费 */
tmq_consumer_close(tmq);
```
## 删除topic
如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。
```sql
/* 删除topic */
drop topic topicName;
```
## 状态查看
1、topics:查询已经创建的topic
```sql
show topics;
```
2、consumers:查询consumer的状态及其订阅的topic
```sql
show consumers;
```
3、subscriptions:查询consumer与vgroup之间的分配关系
```sql
show subscriptions;
```
## 示例代码
本节展示各种语言的示例代码。
<Tabs>
<TabItem label="C" value="c">
```c
{{#include examples/c/tmq.c}}
```
</TabItem>
<TabItem label="Java" value="java">
<Java />
</TabItem>
<TabItem label="Go" value="Go">
<Go/>
</TabItem>
<TabItem label="Rust" value="Rust">
<Rust />
</TabItem>
<TabItem label="Python" value="Python">
```python
{{#include docs/examples/python/tmq_example.py}}
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
<Node/>
</TabItem>
<TabItem label="C#" value="C#">
<CSharp/>
</TabItem>
</Tabs>
此差异已折叠。
......@@ -103,7 +103,7 @@ SELECT d1001.* FROM d1001,d1003 WHERE d1001.ts = d1003.ts;
在超级表和子表的查询中可以指定 _标签列_,且标签列的值会与普通列的数据一起返回。
```sql
ELECT location, groupid, current FROM d1001 LIMIT 2;
SELECT location, groupid, current FROM d1001 LIMIT 2;
```
### 结果去重
......
---
sidebar_label: 消息队列
title: 消息队列
sidebar_label: 数据订阅
title: 数据订阅
---
TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。
......@@ -8,24 +8,17 @@ TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用
## 创建订阅主题
```sql
CREATE TOPIC [IF NOT EXISTS] topic_name AS {subquery | DATABASE db_name | STABLE stb_name };
CREATE TOPIC [IF NOT EXISTS] topic_name AS subquery;
```
订阅主题包括三种:列订阅、超级表订阅和数据库订阅。
**列订阅是**用 subquery 描述,支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下:
TOPIC 支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下:
1. TOPIC 一旦创建则返回结果的字段确定
2. 被订阅或用于计算的列不可被删除、修改
3. 列可以新增,但新增的列不出现在订阅结果字段中
4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
**超级表订阅和数据库订阅**规则如下:
1. 被订阅主体的 schema 变更不受限
2. 返回消息中 schema 是块级别的,每块的 schema 可能不一样
3. 列变更后写入的数据若未落盘,将以写入时的 schema 返回
4. 列变更后写入的数据若未已落盘,将以落盘时的 schema 返回
## 删除订阅主题
......
---
sidebar_label: 3.0 版本语法变更
title: 3.0 版本语法变更
description: "TDengine 3.0 版本的语法变更说明"
---
## SQL 基本元素变更
| # | **元素** | **差异性** | **说明** |
| - | :------- | :--------: | :------- |
| 1 | VARCHAR | <div style="width: 40pt">新增</div> | BINARY类型的别名。
| 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。
| 3 | _ROWTS伪列 | 新增 | 表示时间戳主键。是_C0伪列的别名。
| 4 | INFORMATION_SCHEMA | 新增 | 包含各种SCHEMA定义的系统数据库。
| 5 | PERFORMANCE_SCHEMA | 新增 | 包含运行信息的系统数据库。
| 6 | 连续查询 | 废除 | 不再支持连续查询。相关的各种语法和接口废除。
| 7 | 混合运算 | 增强 | 查询中的混合运算(标量运算和矢量运算混合)全面增强,SELECT的各个子句均全面支持符合语法语义的混合运算。
| 8 | 标签运算 | 新增 |在查询中,标签列可以像普通列一样参与各种运算,用于各种子句。
| 9 | 时间线子句和时间函数用于超级表查询 | 增强 |没有PARTITION BY时,超级表的数据会被合并成一条时间线。
## SQL 语句变更
在 TDengine 中,普通表的数据模型中可使用以下数据类型。
| # | **语句** | **差异性** | **说明** |
| - | :------- | :--------: | :------- |
| 1 | ALTER ACCOUNT | <div style="width: 40pt">废除</div> | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。
| 3 | ALTER DATABASE | 调整 | 废除<ul><li>QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。</li><li>BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。</li><li>CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。</li><li>COMP:3.0版本暂不支持修改。<br/>新增</li><li>CACHEMODEL:表示是否在内存中缓存子表的最近数据。</li><li>CACHESIZE:表示缓存子表最近数据的内存大小。</li><li>WAL_FSYNC_PERIOD:代替原FSYNC参数。</li><li>WAL_LEVEL:代替原WAL参数。<br/>调整</li><li>REPLICA:3.0.0版本暂不支持修改。</li><li>KEEP:3.0版本新增支持带单位的设置方式。</li></ul>
| 4 | ALTER STABLE | 调整 | 废除<ul><li>CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。<br/>新增</li><li>RENAME TAG:代替原CHANGE TAG子句。</li><li>COMMENT:修改超级表的注释。</li></ul>
| 5 | ALTER TABLE | 调整 | 废除<ul><li>CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。<br/>新增</li><li>RENAME TAG:代替原CHANGE TAG子句。</li><li>COMMENT:修改表的注释。</li><li>TTL:修改表的生命周期。</li></ul>
| 6 | ALTER USER | 调整 | 废除<ul><li>PRIVILEGE:修改用户权限。3.0版本使用GRANT和REVOKE来授予和回收权限。<br/>新增</li><li>ENABLE:启用或停用此用户。</li><li>SYSINFO:修改用户是否可查看系统信息。</li></ul>
| 7 | COMPACT VNODES | 暂不支持 | 整理指定VNODE的数据。3.0.0版本暂不支持。
| 8 | CREATE ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 9 | CREATE DATABASE | 调整 | 废除<ul><li>BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>CACHE:VNODE使用的内存块的大小。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。</li><li>DAYS:数据文件存储数据的时间跨度。3.0版本使用DURATION代替。</li><li>FSYNC:当 WAL 设置为 2 时,执行 fsync 的周期。3.0版本使用WAL_FSYNC_PERIOD代替。</li><li>QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。</li><li>UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。</li><li>WAL:WAL 级别。3.0版本使用WAL_LEVEL代替。<br/>新增</li><li>BUFFER:一个 VNODE 写入内存池大小。</li><li>CACHEMODEL:表示是否在内存中缓存子表的最近数据。</li><li>CACHESIZE:表示缓存子表最近数据的内存大小。</li><li>DURATION:代替原DAYS参数。新增支持带单位的设置方式。</li><li>PAGES:一个 VNODE 中元数据存储引擎的缓存页个数。</li><li>PAGESIZE:一个 VNODE 中元数据存储引擎的页大小。</li><li>RETENTIONS:表示数据的聚合周期和保存时长。</li><li>STRICT:表示数据同步的一致性要求。</li><li>SINGLE_STABLE:表示此数据库中是否只可以创建一个超级表。</li><li>VGROUPS:数据库中初始VGROUP的数目。</li><li>WAL_FSYNC_PERIOD:代替原FSYNC参数。</li><li>WAL_LEVEL:代替原WAL参数。</li><li>WAL_RETENTION_PERIOD:wal文件的额外保留策略,用于数据订阅。</li><li>WAL_RETENTION_SIZE:wal文件的额外保留策略,用于数据订阅。</li><li>WAL_ROLL_PERIOD:wal文件切换时长。</li><li>WAL_SEGMENT_SIZE:wal单个文件大小。<br/>调整</li><li>KEEP:3.0版本新增支持带单位的设置方式。</li></ul>
| 10 | CREATE DNODE | 调整 | 新增主机名和端口号分开指定语法<ul><li>CREATE DNODE dnode_host_name PORT port_val</li></ul>
| 11 | CREATE INDEX | 新增 | 创建SMA索引。
| 12 | CREATE MNODE | 新增 | 创建管理节点。
| 13 | CREATE QNODE | 新增 | 创建查询节点。
| 14 | CREATE STABLE | 调整 | 新增表参数语法<li>COMMENT:表注释。</li>
| 15 | CREATE STREAM | 新增 | 创建流。
| 16 | CREATE TABLE | 调整 | 新增表参数语法<ul><li>COMMENT:表注释。</li><li>WATERMARK:指定窗口的关闭时间。</li><li>MAX_DELAY:用于控制推送计算结果的最大延迟。</li><li>ROLLUP:指定的聚合函数,提供基于多层级的降采样聚合结果。</li><li>SMA:提供基于数据块的自定义预计算功能。</li><li>TTL:用来指定表的生命周期的参数。</li></ul>
| 17 | CREATE TOPIC | 新增 | 创建订阅主题。
| 18 | DROP ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 19 | DROP CONSUMER GROUP | 新增 | 删除消费组。
| 20 | DROP INDEX | 新增 | 删除索引。
| 21 | DROP MNODE | 新增 | 创建管理节点。
| 22 | DROP QNODE | 新增 | 创建查询节点。
| 23 | DROP STREAM | 新增 | 删除流。
| 24 | DROP TABLE | 调整 | 新增批量删除语法
| 25 | DROP TOPIC | 新增 | 删除订阅主题。
| 26 | EXPLAIN | 新增 | 查看查询语句的执行计划。
| 27 | GRANT | 新增 | 授予用户权限。
| 28 | KILL TRANSACTION | 新增 | 终止管理节点的事务。
| 29 | KILL STREAM | 废除 | 终止连续查询。3.0版本不再支持连续查询,而是用更通用的流计算来代替。
| 30 | MERGE VGROUP | 新增 | 合并VGROUP。
| 31 | REVOKE | 新增 | 回收用户权限。
| 32 | SELECT | 调整 | <ul><li>SELECT关闭隐式结果列,输出列均需要由SELECT子句来指定。</li><li>DISTINCT功能全面支持。2.x版本只支持对标签列去重,并且不可以和JOIN、GROUP BY等子句混用。</li><li>JOIN功能增强。增加支持:JOIN后WHERE条件中有OR条件;JOIN后的多表运算;JOIN后的多表GROUP BY。</li><li>FROM后子查询功能大幅增强。不限制子查询嵌套层数;支持子查询和UNION ALL混合使用;移除其他一些之前版本的语法限制。</li><li>WHERE后可以使用任意的标量表达式。</li><li>GROUP BY功能增强。支持任意标量表达式及其组合的分组。</li><li>SESSION可以用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。</li><li>STATE_WINDOW可以用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。</li><li>ORDER BY功能大幅增强。不再必须和GROUP BY子句一起使用;不再有排序表达式个数的限制;增加支持NULLS FIRST/LAST语法功能;支持符合语法语义的任意表达式。</li><li>新增PARTITION BY语法。替代原来的GROUP BY tags。</li></ul>
| 33 | SHOW ACCOUNTS | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 34 | SHOW APPS |新增 | 显示接入集群的应用(客户端)信息。
| 35 | SHOW CONSUMERS | 新增 | 显示当前数据库下所有活跃的消费者的信息。
| 36 | SHOW DATABASES | 调整 | 3.0版本只显示数据库名。
| 37 | SHOW FUNCTIONS | 调整 | 3.0版本只显示自定义函数名。
| 38 | SHOW LICENCE | 新增 | 和SHOW GRANTS 命令等效。
| 39 | SHOW INDEXES | 新增 | 显示已创建的索引。
| 40 | SHOW LOCAL VARIABLES | 新增 | 显示当前客户端配置参数的运行值。
| 41 | SHOW MODULES | 废除 | 显示当前系统中所安装的组件的信息。
| 42 | SHOW QNODES | 新增 | 显示当前系统中QNODE的信息。
| 43 | SHOW STABLES | 调整 | 3.0版本只显示超级表名。
| 44 | SHOW STREAMS | 调整 | 2.x版本此命令显示系统中已创建的连续查询的信息。3.0版本废除了连续查询,用流代替。此命令显示已创建的流。
| 45 | SHOW SUBSCRIPTIONS | 新增 | 显示当前数据库下的所有的订阅关系
| 46 | SHOW TABLES | 调整 | 3.0版本只显示表名。
| 47 | SHOW TABLE DISTRIBUTED | 新增 | 显示表的数据分布信息。代替2.x版本中的SELECT _block_dist() FROM { tb_name | stb_name }方式。
| 48 | SHOW TOPICS | 新增 | 显示当前数据库下的所有订阅主题。
| 49 | SHOW TRANSACTIONS | 新增 | 显示当前系统中正在执行的事务的信息。
| 50 | SHOW DNODE VARIABLES | 新增 |显示指定DNODE的配置参数。
| 51 | SHOW VNODES | 暂不支持 | 显示当前系统中VNODE的信息。3.0.0版本暂不支持。
| 52 | SPLIT VGROUP | 新增 | 拆分VGROUP。
| 53 | TRIM DATABASE | 新增 | 删除过期数据,并根据多级存储的配置归整数据。
## SQL 函数变更
| # | **函数** | **差异性** | **说明** |
| - | :------- | :--------: | :------- |
| 1 | TWA | <div style="width: 40pt">增强</div> | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 3 | LEASTSQUARES | 增强 | 可以用于超级表了。
| 4 | ELAPSED | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 5 | DIFF | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 6 | DERIVATIVE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 7 | CSUM | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 8 | MAVG | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 9 | SAMPLE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 10 | STATECOUNT | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 11 | STATEDURATION | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
......@@ -3,7 +3,7 @@ title: TAOS SQL
description: "TAOS SQL 支持的语法规则、主要查询功能、支持的 SQL 查询函数,以及常用技巧等内容"
---
本文档说明 TAOS SQL 支持的语法规则、主要查询功能、支持的 SQL 查询函数,以及常用技巧等内容。阅读本文档需要读者具有基本的 SQL 语言的基础。
本文档说明 TAOS SQL 支持的语法规则、主要查询功能、支持的 SQL 查询函数,以及常用技巧等内容。阅读本文档需要读者具有基本的 SQL 语言的基础。TDengine 3.0 版本相比 2.x 版本做了大量改进和优化,特别是查询引擎进行了彻底的重构,因此 SQL 语法相比 2.x 版本有很多变更。详细的变更内容请见 [3.0 版本语法变更](/taos-sql/changes) 章节
TAOS SQL 是用户对 TDengine 进行数据写入和查询的主要工具。TAOS SQL 提供标准的 SQL 语法,并针对时序数据和业务的特点优化和新增了许多语法和功能。TAOS SQL 语句的最大长度为 1M。TAOS SQL 不支持关键字的缩写,例如 DELETE 不能缩写为 DEL。
......
......@@ -2,7 +2,7 @@
title: REST API
---
为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API,即 REST API。为最大程度降低学习成本,不同于其他数据库 REST API 的设计方法,TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST 连接器的使用参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。
为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API,即 REST API。为最大程度降低学习成本,不同于其他数据库 REST API 的设计方法,TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST 连接器的使用参见 [视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。
:::note
与原生连接器的一个区别是,RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name,这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。
......@@ -20,8 +20,10 @@ RESTful 接口不依赖于任何 TDengine 的库,因此客户端不需要安
下面示例是列出所有的数据库,请把 h1.taosdata.com 和 6041(缺省值)替换为实际运行的 TDengine 服务 FQDN 和端口号:
```html
curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.taosdata.com:6041/rest/sql
```bash
curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" \
-d "select name, ntables, status from information_schema.ins_databases;" \
h1.taosdata.com:6041/rest/sql
```
返回值结果如下表示验证通过:
......@@ -35,188 +37,27 @@ curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.t
"VARCHAR",
64
],
[
"create_time",
"TIMESTAMP",
8
],
[
"vgroups",
"SMALLINT",
2
],
[
"ntables",
"BIGINT",
8
],
[
"replica",
"TINYINT",
1
],
[
"strict",
"VARCHAR",
4
],
[
"duration",
"VARCHAR",
10
],
[
"keep",
"VARCHAR",
32
],
[
"buffer",
"INT",
4
],
[
"pagesize",
"INT",
4
],
[
"pages",
"INT",
4
],
[
"minrows",
"INT",
4
],
[
"maxrows",
"INT",
4
],
[
"comp",
"TINYINT",
1
],
[
"precision",
"VARCHAR",
2
],
[
"status",
"VARCHAR",
10
],
[
"retention",
"VARCHAR",
60
],
[
"single_stable",
"BOOL",
1
],
[
"cachemodel",
"VARCHAR",
11
],
[
"cachesize",
"INT",
4
],
[
"wal_level",
"TINYINT",
1
],
[
"wal_fsync_period",
"INT",
4
],
[
"wal_retention_period",
"INT",
4
],
[
"wal_retention_size",
"BIGINT",
8
],
[
"wal_roll_period",
"INT",
4
],
[
"wal_seg_size",
"BIGINT",
8
]
],
"data": [
[
"information_schema",
null,
null,
14,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
"ready",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
16,
"ready"
],
[
"performance_schema",
null,
null,
3,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
"ready",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
9,
"ready"
]
],
"rows": 2
......@@ -231,21 +72,21 @@ http://<fqdn>:<port>/rest/sql/[db_name]
参数说明:
- fqnd: 集群中的任一台主机 FQDN 或 IP 地址
- port: 配置文件中 httpPort 配置项,缺省为 6041
- fqnd: 集群中的任一台主机 FQDN 或 IP 地址
- port: 配置文件中 httpPort 配置项,缺省为 6041
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。
例如:`http://h1.taos.com:6041/rest/sql/test` 是指向地址为 `h1.taos.com:6041` 的 URL,并将默认使用的数据库库名设置为 `test`。
HTTP 请求的 Header 里需带有身份认证信息,TDengine 支持 Basic 认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。
- [自定义身份认证信息](#自定义授权码)如下所示
- [自定义身份认证信息](#自定义授权码)如下所示
```text
Authorization: Taosd <TOKEN>
```
- Basic 身份认证信息如下所示
- Basic 身份认证信息如下所示
```text
Authorization: Basic <TOKEN>
......@@ -259,13 +100,13 @@ HTTP 请求的 BODY 里就是一个完整的 SQL 语句,SQL 语句中的数据
curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
```
或者
或者
```bash
curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
```
其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==`
其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==`
## HTTP 返回格式
......@@ -282,27 +123,9 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
### HTTP body 结构
<table>
<tr>
<th>执行结果</th>
<th>说明</th>
<th>样例</th>
</tr>
<tr>
<td>正确执行</td>
<td>
code:(int)0 代表成功
<br/>
<br/>
column_meta:([][3]any)列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
<br/>
<br/>
rows:(int)数据返回行数
<br/>
<br/>
data:([][]any)具体数据内容
</td>
<td>
#### 正确执行
样例:
```json
{
......@@ -313,23 +136,16 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
}
```
</td>
</tr>
<tr>
<td>正确查询</td>
<td>
code:(int)0 代表成功
<br/>
<br/>
column_meta:([][3]any) 列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
<br/>
<br/>
rows:(int)数据返回行数
<br/>
<br/>
data:([][]any)具体数据内容
</td>
<td>
说明:
- code:(`int`)0 代表成功。
- column_meta:(`[1][3]any`)只返回 `[["affected_rows", "INT", 4]]`。
- rows:(`int`)只返回 `1`。
- data:(`[][]any`)返回受影响行数。
#### 正确查询
样例:
```json
{
......@@ -385,17 +201,35 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
}
```
</td>
</tr>
<tr>
<td>错误</td>
<td>
code:(int)错误码
<br/>
<br/>
desc:(string)错误描述
</td>
<td>
说明:
- code:(`int`)0 代表成功。
- column_meta:(`[][3]any`) 列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)。
- rows:(`int`)数据返回行数。
- data:(`[][]any`)具体数据内容(时间格式仅支持 RFC3339,结果集为 0 时区)。
列类型使用如下字符串:
- "NULL"
- "BOOL"
- "TINYINT"
- "SMALLINT"
- "INT"
- "BIGINT"
- "FLOAT"
- "DOUBLE"
- "VARCHAR"
- "TIMESTAMP"
- "NCHAR"
- "TINYINT UNSIGNED"
- "SMALLINT UNSIGNED"
- "INT UNSIGNED"
- "BIGINT UNSIGNED"
- "JSON"
#### 错误
样例:
```json
{
......@@ -404,30 +238,10 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
}
```
</td>
</tr>
</table>
### 说明
- 时间格式仅支持 RFC3339,结果集为 0 时区
- 列类型使用如下字符串:
> "NULL"
> "BOOL"
> "TINYINT"
> "SMALLINT"
> "INT"
> "BIGINT"
> "FLOAT"
> "DOUBLE"
> "VARCHAR"
> "TIMESTAMP"
> "NCHAR"
> "TINYINT UNSIGNED"
> "SMALLINT UNSIGNED"
> "INT UNSIGNED"
> "BIGINT UNSIGNED"
> "JSON"
说明:
- code:(`int`)错误码。
- desc:(`string`)错误描述。
## 自定义授权码
......@@ -439,11 +253,9 @@ curl http://<fqnd>:<port>/rest/login/<username>/<password>
其中,`fqdn` 是 TDengine 数据库的 FQDN 或 IP 地址,`port` 是 TDengine 服务的端口号,`username` 为数据库用户名,`password` 为数据库密码,返回值为 JSON 格式,各字段含义如下:
- status:请求结果的标志位
- code:返回值代码
- desc:授权码
- status:请求结果的标志位。
- code:返回值代码。
- desc:授权码。
获取授权码示例:
......
......@@ -404,47 +404,3 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**支持版本**
该功能接口从 2.3.0.0 版本开始支持。
### 订阅和消费 API
订阅 API 目前支持订阅一张或多张表,并通过定期轮询的方式不断获取写入表中的最新数据。
- `TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)`
该函数负责启动订阅服务,成功时返回订阅对象,失败时返回 `NULL`,其参数为:
- taos:已经建立好的数据库连接
- restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
- topic:订阅的主题(即名称),此参数是订阅的唯一标识
- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
- fp:收到查询结果时的回调函数(稍后介绍函数原型),只在异步调用时使用,同步调用时此参数应该传 `NULL`
- param:调用回调函数时的附加参数,系统 API 将其原样传递到回调函数,不进行任何处理
- interval:轮询周期,单位为毫秒。异步调用时,将根据此参数周期性的调用回调函数,为避免对系统性能造成影响,不建议将此参数设置的过小;同步调用时,如两次调用 `taos_consume()` 的间隔小于此周期,API 将会阻塞,直到时间间隔超过此周期。
- `typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code)`
异步模式下,回调函数的原型,其参数为:
- tsub:订阅对象
- res:查询结果集,注意结果集中可能没有记录
- param:调用 `taos_subscribe()` 时客户程序提供的附加参数
- code:错误码
:::note
在这个回调函数里不可以做耗时过长的处理,尤其是对于返回的结果集中数据较多的情况,否则有可能导致客户端阻塞等异常状态。如果必须进行复杂计算,则建议在另外的线程中进行处理。
:::
- `TAOS_RES *taos_consume(TAOS_SUB *tsub)`
同步模式下,该函数用来获取订阅的结果。 用户应用程序将其置于一个循环之中。 如两次调用 `taos_consume()` 的间隔小于订阅的轮询周期,API 将会阻塞,直到时间间隔超过此周期。如果数据库有新记录到达,该 API 将返回该最新的记录,否则返回一个没有记录的空结果集。 如果返回值为 `NULL`,说明系统出错。 异步模式下,用户程序不应调用此 API。
:::note
在调用 `taos_consume()` 之后,用户应用应确保尽快调用 `taos_fetch_row()` 或 `taos_fetch_block()` 来处理订阅结果,否则服务端会持续缓存查询结果数据等待客户端读取,极端情况下会导致服务端内存消耗殆尽,影响服务稳定性。
:::
- `void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress)`
取消订阅。 如参数 `keepProgress` 不为 0,API 会保留订阅的进度信息,后续调用 `taos_subscribe()` 时可以基于此进度继续;否则将删除进度信息,后续只能重新开始读取数据。
......@@ -712,7 +712,7 @@ while(true) {
}
```
`poll` 每次调用获取一个消息。请按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(100)`),否则会给服务端造成不必要的压力。
`poll` 每次调用获取一个消息。
#### 关闭订阅
......@@ -765,11 +765,11 @@ public abstract class ConsumerLoop {
shutdownLatch.await();
}
static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
static class ResultBean {
public static class ResultBean {
private Timestamp ts;
private int speed;
......
......@@ -647,3 +647,173 @@ charset 的有效值是 UTF-8。
| 含义 | 是否启动 udf 服务 |
| 取值范围 | 0: 不启动;1:启动 |
| 缺省值 | 1 |
## 2.X 与 3.0 配置参数对比
| # | **参数** | **适用于 2.X 版本** | **适用于 3.0 版本** |
| --- | :-----------------: | --------------- | --------------- |
| 1 | firstEp | 是 | 是 |
| 2 | secondEp | 是 | 是 |
| 3 | fqdn | 是 | 是 |
| 4 | serverPort | 是 | 是 |
| 5 | maxShellConns | 是 | 是 |
| 6 | monitor | 是 | 是 |
| 7 | monitorFqdn | 否 | 是 |
| 8 | monitorPort | 否 | 是 |
| 9 | monitorInterval | 是 | 是 |
| 10 | monitorMaxLogs | 否 | 是 |
| 11 | monitorComp | 否 | 是 |
| 12 | telemetryReporting | 是 | 是 |
| 13 | telemetryInterval | 否 | 是 |
| 14 | telemetryServer | 否 | 是 |
| 15 | telemetryPort | 否 | 是 |
| 16 | queryPolicy | 否 | 是 |
| 17 | querySmaOptimize | 否 | 是 |
| 18 | queryBufferSize | 是 | 是 |
| 19 | maxNumOfDistinctRes | 是 | 是 |
| 20 | minSlidingTime | 是 | 是 |
| 21 | minIntervalTime | 是 | 是 |
| 22 | countAlwaysReturnValue | 是 | 是 |
| 23 | dataDir | 是 | 是 |
| 24 | minimalDataDirGB | 是 | 是 |
| 25 | supportVnodes | 否 | 是 |
| 26 | tempDir | 是 | 是 |
| 27 | minimalTmpDirGB | 是 | 是 |
| 28 | compressMsgSize | 是 | 是 |
| 29 | compressColData | 是 | 是 |
| 30 | smlChildTableName | 是 | 是 |
| 31 | smlTagName | 是 | 是 |
| 32 | smlDataFormat | 否 | 是 |
| 33 | statusInterval | 是 | 是 |
| 34 | shellActivityTimer | 是 | 是 |
| 35 | transPullupInterval | 否 | 是 |
| 36 | mqRebalanceInterval | 否 | 是 |
| 37 | ttlUnit | 否 | 是 |
| 38 | ttlPushInterval | 否 | 是 |
| 39 | numOfTaskQueueThreads | 否 | 是 |
| 40 | numOfRpcThreads | 否 | 是 |
| 41 | numOfCommitThreads | 是 | 是 |
| 42 | numOfMnodeReadThreads | 否 | 是 |
| 43 | numOfVnodeQueryThreads | 否 | 是 |
| 44 | numOfVnodeStreamThreads | 否 | 是 |
| 45 | numOfVnodeFetchThreads | 否 | 是 |
| 46 | numOfVnodeWriteThreads | 否 | 是 |
| 47 | numOfVnodeSyncThreads | 否 | 是 |
| 48 | numOfQnodeQueryThreads | 否 | 是 |
| 49 | numOfQnodeFetchThreads | 否 | 是 |
| 50 | numOfSnodeSharedThreads | 否 | 是 |
| 51 | numOfSnodeUniqueThreads | 否 | 是 |
| 52 | rpcQueueMemoryAllowed | 否 | 是 |
| 53 | logDir | 是 | 是 |
| 54 | minimalLogDirGB | 是 | 是 |
| 55 | numOfLogLines | 是 | 是 |
| 56 | asyncLog | 是 | 是 |
| 57 | logKeepDays | 是 | 是 |
| 58 | debugFlag | 是 | 是 |
| 59 | tmrDebugFlag | 是 | 是 |
| 60 | uDebugFlag | 是 | 是 |
| 61 | rpcDebugFlag | 是 | 是 |
| 62 | jniDebugFlag | 是 | 是 |
| 63 | qDebugFlag | 是 | 是 |
| 64 | cDebugFlag | 是 | 是 |
| 65 | dDebugFlag | 是 | 是 |
| 66 | vDebugFlag | 是 | 是 |
| 67 | mDebugFlag | 是 | 是 |
| 68 | wDebugFlag | 是 | 是 |
| 69 | sDebugFlag | 是 | 是 |
| 70 | tsdbDebugFlag | 是 | 是 |
| 71 | tqDebugFlag | 否 | 是 |
| 72 | fsDebugFlag | 是 | 是 |
| 73 | udfDebugFlag | 否 | 是 |
| 74 | smaDebugFlag | 否 | 是 |
| 75 | idxDebugFlag | 否 | 是 |
| 76 | tdbDebugFlag | 否 | 是 |
| 77 | metaDebugFlag | 否 | 是 |
| 78 | timezone | 是 | 是 |
| 79 | locale | 是 | 是 |
| 80 | charset | 是 | 是 |
| 81 | udf | 是 | 是 |
| 82 | enableCoreFile | 是 | 是 |
| 83 | arbitrator | 是 | 否 |
| 84 | numOfThreadsPerCore | 是 | 否 |
| 85 | numOfMnodes | 是 | 否 |
| 86 | vnodeBak | 是 | 否 |
| 87 | balance | 是 | 否 |
| 88 | balanceInterval | 是 | 否 |
| 89 | offlineThreshold | 是 | 否 |
| 90 | role | 是 | 否 |
| 91 | dnodeNopLoop | 是 | 否 |
| 92 | keepTimeOffset | 是 | 否 |
| 93 | rpcTimer | 是 | 否 |
| 94 | rpcMaxTime | 是 | 否 |
| 95 | rpcForceTcp | 是 | 否 |
| 96 | tcpConnTimeout | 是 | 否 |
| 97 | syncCheckInterval | 是 | 否 |
| 98 | maxTmrCtrl | 是 | 否 |
| 99 | monitorReplica | 是 | 否 |
| 100 | smlTagNullName | 是 | 否 |
| 101 | keepColumnName | 是 | 否 |
| 102 | ratioOfQueryCores | 是 | 否 |
| 103 | maxStreamCompDelay | 是 | 否 |
| 104 | maxFirstStreamCompDelay | 是 | 否 |
| 105 | retryStreamCompDelay | 是 | 否 |
| 106 | streamCompDelayRatio | 是 | 否 |
| 107 | maxVgroupsPerDb | 是 | 否 |
| 108 | maxTablesPerVnode | 是 | 否 |
| 109 | minTablesPerVnode | 是 | 否 |
| 110 | tableIncStepPerVnode | 是 | 否 |
| 111 | cache | 是 | 否 |
| 112 | blocks | 是 | 否 |
| 113 | days | 是 | 否 |
| 114 | keep | 是 | 否 |
| 115 | minRows | 是 | 否 |
| 116 | maxRows | 是 | 否 |
| 117 | quorum | 是 | 否 |
| 118 | comp | 是 | 否 |
| 119 | walLevel | 是 | 否 |
| 120 | fsync | 是 | 否 |
| 121 | replica | 是 | 否 |
| 122 | partitions | 是 | 否 |
| 123 | quorum | 是 | 否 |
| 124 | update | 是 | 否 |
| 125 | cachelast | 是 | 否 |
| 126 | maxSQLLength | 是 | 否 |
| 127 | maxWildCardsLength | 是 | 否 |
| 128 | maxRegexStringLen | 是 | 否 |
| 129 | maxNumOfOrderedRes | 是 | 否 |
| 130 | maxConnections | 是 | 否 |
| 131 | mnodeEqualVnodeNum | 是 | 否 |
| 132 | http | 是 | 否 |
| 133 | httpEnableRecordSql | 是 | 否 |
| 134 | httpMaxThreads | 是 | 否 |
| 135 | restfulRowLimit | 是 | 否 |
| 136 | httpDbNameMandatory | 是 | 否 |
| 137 | httpKeepAlive | 是 | 否 |
| 138 | enableRecordSql | 是 | 否 |
| 139 | maxBinaryDisplayWidth | 是 | 否 |
| 140 | stream | 是 | 否 |
| 141 | retrieveBlockingModel | 是 | 否 |
| 142 | tsdbMetaCompactRatio | 是 | 否 |
| 143 | defaultJSONStrType | 是 | 否 |
| 144 | walFlushSize | 是 | 否 |
| 145 | keepTimeOffset | 是 | 否 |
| 146 | flowctrl | 是 | 否 |
| 147 | slaveQuery | 是 | 否 |
| 148 | adjustMaster | 是 | 否 |
| 149 | topicBinaryLen | 是 | 否 |
| 150 | telegrafUseFieldNum | 是 | 否 |
| 151 | deadLockKillQuery | 是 | 否 |
| 152 | clientMerge | 是 | 否 |
| 153 | sdbDebugFlag | 是 | 否 |
| 154 | odbcDebugFlag | 是 | 否 |
| 155 | httpDebugFlag | 是 | 否 |
| 156 | monDebugFlag | 是 | 否 |
| 157 | cqDebugFlag | 是 | 否 |
| 158 | shortcutFlag | 是 | 否 |
| 159 | probeSeconds | 是 | 否 |
| 160 | probeKillSeconds | 是 | 否 |
| 161 | probeInterval | 是 | 否 |
| 162 | lossyColumns | 是 | 否 |
| 163 | fPrecision | 是 | 否 |
| 164 | dPrecision | 是 | 否 |
| 165 | maxRange | 是 | 否 |
| 166 | range | 是 | 否 |
......@@ -3,7 +3,7 @@ sidebar_label: 发布历史
title: 发布历史
---
import Release from "/components/Release";
import Release from "/components/ReleaseV3";
<Release versionPrefix="3.0" />
......@@ -45,10 +45,9 @@ static int32_t msg_process(TAOS_RES* msg) {
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf);
printf("row content: %s\n", buf);
}
return rows;
......@@ -167,7 +166,7 @@ int32_t create_topic() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -199,9 +198,7 @@ tmq_t* build_consumer() {
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "msg.with.table.name", "true");
code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -220,14 +217,7 @@ tmq_list_t* build_topic_list() {
return topicList;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
int32_t code;
if ((code = tmq_subscribe(tmq, topicList))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
return;
}
void basic_consume_loop(tmq_t* tmq) {
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
......@@ -237,8 +227,8 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
msgCnt++;
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
/*} else {*/
/*break;*/
} else {
break;
}
}
......@@ -267,14 +257,12 @@ int main(int argc, char* argv[]) {
return -1;
}
basic_consume_loop(tmq, topic_list);
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
} else {
fprintf(stderr, "%% unsubscribe\n");
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
}
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
code = tmq_consumer_close(tmq);
if (code) {
......
......@@ -131,10 +131,10 @@ DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT int taos_init(void);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type);
const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
......@@ -244,33 +244,37 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
/* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable
enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
};
typedef struct tmq_raw_data{
void* raw;
typedef struct tmq_raw_data {
void *raw;
uint32_t raw_len;
uint16_t raw_type;
} tmq_raw_data;
typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char* tbname);
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT void tmq_free_json_meta(char* jsonMeta);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
/* ------------------------------ TMQ END -------------------------------- */
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);
DLL_EXPORT void tmq_free_json_meta(char *jsonMeta);
/* ---------------------------- TAOSX END -------------------------------- */
typedef enum {
TSDB_SRV_STATUS_UNAVAILABLE = 0,
......
......@@ -53,6 +53,8 @@ typedef struct SParseContext {
int8_t schemalessType;
const char* svrVer;
bool nodeOffline;
SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos
} SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
......@@ -84,8 +86,8 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
int32_t rowNum);
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields);
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, TAOS_MULTI_BIND* bind,
char* msgBuf, int32_t msgBufLen);
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen);
......
......@@ -275,12 +275,8 @@ typedef struct SStreamTask {
int32_t nodeId;
SEpSet epSet;
// used for task source and sink,
// while task agg should have processedVer for each child
int64_t recoverSnapVer;
int64_t startVer;
int64_t checkpointVer;
int64_t processedVer;
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......
......@@ -622,6 +622,7 @@ int32_t* taosGetErrno();
//tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
#ifdef __cplusplus
}
......
......@@ -29,11 +29,11 @@ int32_t taosOpenRef(int32_t max, void (*fp)(void *));
// close the reference set, refId is the return value by taosOpenRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
int32_t taosCloseRef(int32_t refId);
int32_t taosCloseRef(int32_t rsetId);
// add ref, p is the pointer to resource or pointer ID
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
int64_t taosAddRef(int32_t refId, void *p);
int64_t taosAddRef(int32_t rsetId, void *p);
// remove ref, rid is the reference ID returned by taosAddRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
......
......@@ -689,11 +689,11 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (TDMT_VND_SUBMIT == pRequest->type) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, res.numOfRows);
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
}
......@@ -800,8 +800,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
break;
}
case TDMT_VND_SUBMIT: {
atomic_add_fetch_64((int64_t *)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
}
......@@ -832,9 +832,9 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
if (pResult) {
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
if (TDMT_VND_SUBMIT == pRequest->type) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, pResult->numOfRows);
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
}
}
......@@ -877,14 +877,14 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
if (pQuery->pRoot) {
pRequest->stmtType = pQuery->pRoot->type;
}
if (pQuery->pRoot && !pRequest->inRetry) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
if (QUERY_NODE_VNODE_MODIF_STMT == pQuery->pRoot->type) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
}
}
......@@ -1467,9 +1467,9 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
if (pResultInfo->numOfRows == 0) {
return NULL;
......@@ -2006,7 +2006,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
bool inEscape = false;
int32_t code = 0;
void *pIter = NULL;
void* pIter = NULL;
int32_t vIdx = 0;
int32_t vPos[2];
......
......@@ -192,6 +192,7 @@ void taos_free_result(TAOS_RES *res) {
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
taosMemoryFree(pRspObj->metaRsp.metaRsp);
......
此差异已折叠。
此差异已折叠。
......@@ -128,19 +128,19 @@ typedef struct STsdbReader STsdbReader;
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
......
......@@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
......@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode);
#ifdef __cplusplus
}
......
......@@ -144,6 +144,7 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
// tq
int tqInit();
......@@ -169,10 +170,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq);
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, SBatchDeleteReq* pDeleteReq);
// sma
int32_t smaInit();
......
......@@ -201,9 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
}
SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq =
tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
......
......@@ -14,6 +14,7 @@
*/
#include "tq.h"
#include "vnd.h"
#if 0
void tqTmrRspFunc(void* param, void* tmrId) {
......@@ -212,9 +213,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
walApplyVer(pTq->pVnode->pWal, ver);
if (msgType == TDMT_VND_SUBMIT) {
if (vnodeIsRoleLeader(pTq->pVnode) && msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen);
......
......@@ -25,8 +25,7 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
/*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/
int64_t groupId = 0;
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
char* name = buildCtbNameByGroupId(stbFullName, groupId);
tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
SMetaReader mr = {0};
......@@ -49,8 +48,8 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0;
}
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL;
......@@ -153,7 +152,7 @@ SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// assign data
// TODO
ret = rpcMallocCont(cap);
ret->header.vgId = vgId;
ret->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz);
......@@ -234,8 +233,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq);
SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
......
......@@ -247,6 +247,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
walApplyVer(pVnode->pWal, version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
......
......@@ -764,6 +764,8 @@ void vnodeSyncStart(SVnode *pVnode) {
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }
bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) {
vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
......
......@@ -128,7 +128,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
assert(w.ekey >= pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey) {
if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
return true;
}
......
......@@ -22,6 +22,7 @@ extern "C" {
#include "catalog.h"
#include "os.h"
#include "parser.h"
#include "query.h"
#define parserFatal(param, ...) qFatal("PARSER: " param, ##__VA_ARGS__)
......@@ -44,18 +45,37 @@ typedef struct SParseTablesMetaReq {
SHashObj* pTables;
} SParseTablesMetaReq;
typedef enum ECatalogReqType {
CATALOG_REQ_TYPE_META = 1,
CATALOG_REQ_TYPE_VGROUP,
CATALOG_REQ_TYPE_BOTH
} ECatalogReqType;
typedef struct SInsertTablesMetaReq {
char dbFName[TSDB_DB_FNAME_LEN];
SArray* pTableMetaPos;
SArray* pTableMetaReq; // element is SName
SArray* pTableVgroupPos;
SArray* pTableVgroupReq; // element is SName
} SInsertTablesMetaReq;
typedef struct SParseMetaCache {
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
SArray* pDnodes; // element is SEpSet
bool dnodeRequired;
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
SArray* pDnodes; // element is SEpSet
bool dnodeRequired;
SHashObj* pInsertTables; // key is dbName, element is SInsertTablesMetaReq*, for insert
const char* pUser;
const SArray* pTableMetaData; // pRes = STableMeta*
const SArray* pTableVgroupData; // pRes = SVgroupInfo*
int32_t sqlTableNum;
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
......@@ -72,8 +92,9 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
bool insertValuesStmt);
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
......@@ -100,6 +121,12 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
STableMeta** pMeta);
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
SVgroupInfo* pVgroup);
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
#ifdef __cplusplus
......
......@@ -73,6 +73,9 @@ typedef struct SInsertParseContext {
SStmtCallback* pStmtCb;
SParseMetaCache* pMetaCache;
char sTableName[TSDB_TABLE_NAME_LEN];
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
int64_t memElapsed;
int64_t parRowElapsed;
} SInsertParseContext;
typedef struct SInsertParseSyntaxCxt {
......@@ -203,10 +206,11 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass)
return catalogChkAuth(pBasicCtx->pCatalog, &conn, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
}
static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
static int32_t getTableSchema(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, bool isStb,
STableMeta** pTableMeta) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (pBasicCtx->async) {
return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
return getTableMetaFromCacheForInsert(pBasicCtx->pTableMetaPos, pCxt->pMetaCache, tbNo, pTableMeta);
}
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
......@@ -219,10 +223,10 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is
return catalogGetTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
}
static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
static int32_t getTableVgroup(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, SVgroupInfo* pVg) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (pBasicCtx->async) {
return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
return getTableVgroupFromCacheForInsert(pBasicCtx->pTableVgroupPos, pCxt->pMetaCache, tbNo, pVg);
}
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
......@@ -231,28 +235,22 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroup
return catalogGetTableHashVgroup(pBasicCtx->pCatalog, &conn, pTbName, pVg);
}
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
bool pass = false;
CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
if (!pass) {
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta));
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname, bool isStb) {
CHECK_CODE(getTableSchema(pCxt, tbNo, name, isStb, &pCxt->pTableMeta));
if (!isStb) {
SVgroupInfo vg;
CHECK_CODE(getTableVgroup(pCxt, name, &vg));
CHECK_CODE(getTableVgroup(pCxt, tbNo, name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
}
return TSDB_CODE_SUCCESS;
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, name, dbFname, false);
static int32_t getTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, tbNo, name, dbFname, false);
}
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, name, dbFname, true);
static int32_t getSTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, tbNo, name, dbFname, true);
}
static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) {
......@@ -1028,13 +1026,13 @@ end:
return code;
}
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
int32_t len, STableMeta* pMeta) {
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, int32_t tbNo, SName* pTableName,
const char* pName, int32_t len, STableMeta* pMeta) {
SVgroupInfo vg;
CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg));
CHECK_CODE(getTableVgroup(pCxt, tbNo, pTableName, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pMeta->uid = 0;
pMeta->uid = tbNo;
pMeta->vgId = vg.vgId;
pMeta->tableType = TSDB_CHILD_TABLE;
......@@ -1084,7 +1082,7 @@ static int32_t ignoreAutoCreateTableClause(SInsertParseContext* pCxt) {
}
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* tbFName) {
int32_t len = strlen(tbFName);
STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
if (NULL != pMeta) {
......@@ -1102,11 +1100,11 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
tNameGetFullDbName(&sname, dbFName);
strcpy(pCxt->sTableName, sname.tname);
CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName));
CHECK_CODE(getSTableMeta(pCxt, tbNo, &sname, dbFName));
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
}
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, tbNo, name, tbFName, len, pCxt->pTableMeta));
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
......@@ -1195,7 +1193,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
tdSRowEnd(pBuilder);
*gotRow = true;
#ifdef TD_DEBUG_PRINT_ROW
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(schema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
......@@ -1214,7 +1212,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
(*numOfRows) = 0;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
// char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
SToken sToken;
while (1) {
int32_t index = 0;
......@@ -1232,7 +1230,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
}
bool gotRow = false;
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, pCxt->tmpTokenBuf));
if (gotRow) {
pDataBlock->size += extendedRowSize; // len;
}
......@@ -1347,7 +1345,9 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
}
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
taosMemoryFreeClear(pCxt->pTableMeta);
if (!pCxt->pComCxt->async) {
taosMemoryFreeClear(pCxt->pTableMeta);
}
destroyBoundColumnInfo(&pCxt->tags);
tdDestroySVCreateTbReq(&pCxt->createTblReq);
}
......@@ -1365,6 +1365,20 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyBlockArrayList(pCxt->pVgDataBlocks);
}
static int32_t parseTableName(SInsertParseContext* pCxt, SToken* pTbnameToken, SName* pName, char* pDbFName,
char* pTbFName) {
int32_t code = createSName(pName, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
if (TSDB_CODE_SUCCESS == code) {
tNameExtractFullName(pName, pTbFName);
code = taosHashPut(pCxt->pTableNameHashObj, pTbFName, strlen(pTbFName), pName, sizeof(SName));
}
if (TSDB_CODE_SUCCESS == code) {
tNameGetFullDbName(pName, pDbFName);
code = taosHashPut(pCxt->pDbFNameHashObj, pDbFName, strlen(pDbFName), pDbFName, TSDB_DB_FNAME_LEN);
}
return code;
}
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
......@@ -1372,7 +1386,9 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t tbNum = 0;
SName name;
char tbFName[TSDB_TABLE_FNAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
bool autoCreateTbl = false;
// for each table
......@@ -1415,20 +1431,15 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
SToken tbnameToken = sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
SName name;
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&name, dbFName);
CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)));
if (!pCxt->pComCxt->async || TK_USING == sToken.type) {
CHECK_CODE(parseTableName(pCxt, &tbnameToken, &name, dbFName, tbFName));
}
bool existedUsing = false;
// USING clause
if (TK_USING == sToken.type) {
existedUsing = true;
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
NEXT_TOKEN(pCxt->pSql, sToken);
autoCreateTbl = true;
}
......@@ -1438,22 +1449,31 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// pSql -> field1_name, ...)
pBoundColsStart = pCxt->pSql;
CHECK_CODE(ignoreBoundColumns(pCxt));
// CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
NEXT_TOKEN(pCxt->pSql, sToken);
}
if (TK_USING == sToken.type) {
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
if (pCxt->pComCxt->async) {
CHECK_CODE(parseTableName(pCxt, &tbnameToken, &name, dbFName, tbFName));
}
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
NEXT_TOKEN(pCxt->pSql, sToken);
autoCreateTbl = true;
} else if (!existedUsing) {
CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName));
}
STableDataBlocks* dataBuf = NULL;
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
&dataBuf, NULL, &pCxt->createTblReq));
if (pCxt->pComCxt->async) {
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid),
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL,
&pCxt->createTblReq));
} else {
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
&dataBuf, NULL, &pCxt->createTblReq));
}
if (NULL != pBoundColsStart) {
char* pCurrPos = pCxt->pSql;
......@@ -1532,7 +1552,9 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
.totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
.pStmtCb = pContext->pStmtCb,
.pMetaCache = pMetaCache};
.pMetaCache = pMetaCache,
.memElapsed = 0,
.parRowElapsed = 0};
if (pContext->pStmtCb && *pQuery) {
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
......@@ -1547,7 +1569,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
} else {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
context.pTableBlockHashObj =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
}
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
......@@ -1656,24 +1678,24 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
return TSDB_CODE_SUCCESS;
}
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) {
SName name;
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo,
pCxt->pMetaCache));
return TSDB_CODE_SUCCESS;
}
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, int32_t tableNo, SToken* pTbToken) {
SName name;
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, CATALOG_REQ_TYPE_VGROUP, tableNo, pCxt->pMetaCache));
return TSDB_CODE_SUCCESS;
}
static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
bool hasData = false;
bool hasData = false;
int32_t tableNo = 0;
// for each table
while (1) {
SToken sToken;
......@@ -1702,9 +1724,9 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
// USING clause
if (TK_USING == sToken.type) {
existedUsing = true;
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken));
NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
CHECK_CODE(skipUsingClause(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
}
......@@ -1717,15 +1739,17 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
if (TK_USING == sToken.type && !existedUsing) {
existedUsing = true;
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken));
NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
CHECK_CODE(skipUsingClause(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
} else {
CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken));
} else if (!existedUsing) {
CHECK_CODE(collectTableMetaKey(pCxt, false, tableNo, &tbnameToken));
}
++tableNo;
if (TK_VALUES == sToken.type) {
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
CHECK_CODE(skipValuesClause(pCxt));
......
......@@ -476,9 +476,11 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) {
static int32_t buildTableReqFromDb(SHashObj* pDbsHash, SArray** pDbs) {
if (NULL != pDbsHash) {
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SParseTablesMetaReq* p = taosHashIterate(pDbsHash, NULL);
while (NULL != p) {
......@@ -530,7 +532,62 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
return TSDB_CODE_SUCCESS;
}
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
static int32_t buildCatalogReqForInsert(SParseContext* pCxt, const SParseMetaCache* pMetaCache,
SCatalogReq* pCatalogReq) {
int32_t ndbs = taosHashGetSize(pMetaCache->pInsertTables);
pCatalogReq->pTableMeta = taosArrayInit(ndbs, sizeof(STablesReq));
if (NULL == pCatalogReq->pTableMeta) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCatalogReq->pTableHash = taosArrayInit(ndbs, sizeof(STablesReq));
if (NULL == pCatalogReq->pTableHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCatalogReq->pUser = taosArrayInit(ndbs, sizeof(SUserAuthInfo));
if (NULL == pCatalogReq->pUser) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pTableMetaPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
pCxt->pTableVgroupPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
int32_t metaReqNo = 0;
int32_t vgroupReqNo = 0;
SInsertTablesMetaReq* p = taosHashIterate(pMetaCache->pInsertTables, NULL);
while (NULL != p) {
STablesReq req = {0};
strcpy(req.dbFName, p->dbFName);
TSWAP(req.pTables, p->pTableMetaReq);
taosArrayPush(pCatalogReq->pTableMeta, &req);
req.pTables = NULL;
TSWAP(req.pTables, p->pTableVgroupReq);
taosArrayPush(pCatalogReq->pTableHash, &req);
int32_t ntables = taosArrayGetSize(p->pTableMetaPos);
for (int32_t i = 0; i < ntables; ++i) {
taosArrayInsert(pCxt->pTableMetaPos, *(int32_t*)taosArrayGet(p->pTableMetaPos, i), &metaReqNo);
++metaReqNo;
}
ntables = taosArrayGetSize(p->pTableVgroupPos);
for (int32_t i = 0; i < ntables; ++i) {
taosArrayInsert(pCxt->pTableVgroupPos, *(int32_t*)taosArrayGet(p->pTableVgroupPos, i), &vgroupReqNo);
++vgroupReqNo;
}
SUserAuthInfo auth = {0};
strcpy(auth.user, pCxt->pUser);
strcpy(auth.dbFName, p->dbFName);
auth.type = AUTH_TYPE_WRITE;
taosArrayPush(pCatalogReq->pUser, &auth);
p = taosHashIterate(pMetaCache->pInsertTables, p);
}
return TSDB_CODE_SUCCESS;
}
int32_t buildCatalogReqForQuery(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
......@@ -560,6 +617,13 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
return code;
}
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
if (NULL != pMetaCache->pInsertTables) {
return buildCatalogReqForInsert(pCxt, pMetaCache, pCatalogReq);
}
return buildCatalogReqForQuery(pMetaCache, pCatalogReq);
}
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) {
if (NULL == *pHash) {
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
......@@ -647,7 +711,8 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas
return TSDB_CODE_SUCCESS;
}
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t putMetaDataToCacheForQuery(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
SParseMetaCache* pMetaCache) {
int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup);
......@@ -677,6 +742,30 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
return code;
}
int32_t putMetaDataToCacheForInsert(const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t ndbs = taosArrayGetSize(pMetaData->pUser);
for (int32_t i = 0; i < ndbs; ++i) {
SMetaRes* pRes = taosArrayGet(pMetaData->pUser, i);
if (TSDB_CODE_SUCCESS != pRes->code) {
return pRes->code;
}
if (!(*(bool*)pRes->pRes)) {
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
}
pMetaCache->pTableMetaData = pMetaData->pTableMeta;
pMetaCache->pTableVgroupData = pMetaData->pTableHash;
return TSDB_CODE_SUCCESS;
}
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
bool insertValuesStmt) {
if (insertValuesStmt) {
return putMetaDataToCacheForInsert(pMetaData, pMetaCache);
}
return putMetaDataToCacheForQuery(pCatalogReq, pMetaData, pMetaCache);
}
static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) {
if (NULL == *pTables) {
*pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
......@@ -977,6 +1066,82 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
return TSDB_CODE_SUCCESS;
}
static int32_t reserveTableReqInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SInsertTablesMetaReq* pReq) {
switch (reqType) {
case CATALOG_REQ_TYPE_META:
taosArrayPush(pReq->pTableMetaReq, pName);
taosArrayPush(pReq->pTableMetaPos, &tableNo);
break;
case CATALOG_REQ_TYPE_VGROUP:
taosArrayPush(pReq->pTableVgroupReq, pName);
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
break;
case CATALOG_REQ_TYPE_BOTH:
taosArrayPush(pReq->pTableMetaReq, pName);
taosArrayPush(pReq->pTableMetaPos, &tableNo);
taosArrayPush(pReq->pTableVgroupReq, pName);
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
break;
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static int32_t reserveTableReqInDbCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SHashObj* pDbs) {
SInsertTablesMetaReq req = {.pTableMetaReq = taosArrayInit(4, sizeof(SName)),
.pTableMetaPos = taosArrayInit(4, sizeof(int32_t)),
.pTableVgroupReq = taosArrayInit(4, sizeof(SName)),
.pTableVgroupPos = taosArrayInit(4, sizeof(int32_t))};
tNameGetFullDbName(pName, req.dbFName);
int32_t code = reserveTableReqInCacheForInsert(pName, reqType, tableNo, &req);
if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pDbs, pName->dbname, strlen(pName->dbname), &req, sizeof(SInsertTablesMetaReq));
}
return code;
}
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SParseMetaCache* pMetaCache) {
if (NULL == pMetaCache->pInsertTables) {
pMetaCache->pInsertTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pMetaCache->pInsertTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pMetaCache->sqlTableNum = tableNo;
SInsertTablesMetaReq* pReq = taosHashGet(pMetaCache->pInsertTables, pName->dbname, strlen(pName->dbname));
if (NULL == pReq) {
return reserveTableReqInDbCacheForInsert(pName, reqType, tableNo, pMetaCache->pInsertTables);
}
return reserveTableReqInCacheForInsert(pName, reqType, tableNo, pReq);
}
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
STableMeta** pMeta) {
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableMetaPos, tableNo);
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableMetaData, reqIndex);
if (TSDB_CODE_SUCCESS == pRes->code) {
*pMeta = pRes->pRes;
if (NULL == *pMeta) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return pRes->code;
}
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
SVgroupInfo* pVgroup) {
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableVgroupPos, tableNo);
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableVgroupData, reqIndex);
if (TSDB_CODE_SUCCESS == pRes->code) {
memcpy(pVgroup, pRes->pRes, sizeof(SVgroupInfo));
}
return pRes->code;
}
void destoryParseTablesMetaReqHash(SHashObj* pHash) {
SParseTablesMetaReq* p = taosHashIterate(pHash, NULL);
while (NULL != p) {
......
......@@ -185,7 +185,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq(&metaCache, pCatalogReq);
code = buildCatalogReq(pCxt, &metaCache, pCatalogReq);
}
destoryParseMetaCache(&metaCache, true);
terrno = code;
......@@ -195,7 +195,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) {
SParseMetaCache metaCache = {0};
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache);
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pQuery->pRoot) {
code = parseInsertSql(pCxt, &pQuery, &metaCache);
......
......@@ -13,21 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <functional>
#include <gtest/gtest.h>
#include "mockCatalogService.h"
#include "os.h"
#include "parInt.h"
#include "parTestUtil.h"
using namespace std;
using namespace std::placeholders;
using namespace testing;
namespace {
string toString(int32_t code) { return tstrerror(code); }
} // namespace
namespace ParserTest {
// syntax:
// INSERT INTO
......@@ -36,259 +28,60 @@ string toString(int32_t code) { return tstrerror(code); }
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
class InsertTest : public Test {
protected:
InsertTest() : res_(nullptr) {}
~InsertTest() { reset(); }
void setDatabase(const string& acctId, const string& db) {
acctId_ = acctId;
db_ = db;
}
void bind(const char* sql) {
reset();
cxt_.acctId = atoi(acctId_.c_str());
cxt_.db = (char*)db_.c_str();
strcpy(sqlBuf_, sql);
cxt_.sqlLen = strlen(sql);
sqlBuf_[cxt_.sqlLen] = '\0';
cxt_.pSql = sqlBuf_;
}
int32_t run() {
code_ = parseInsertSql(&cxt_, &res_, nullptr);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
}
return code_;
}
int32_t runAsync() {
cxt_.async = true;
bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
new SParseMetaCache(), std::bind(_destoryParseMetaCache, _1, cref(request)));
code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get());
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq);
code_ = buildCatalogReq(metaCache.get(), catalogReq.get());
if (code_ != TSDB_CODE_SUCCESS) {
cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get());
metaCache.reset(new SParseMetaCache());
request = false;
code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
if (code_ != TSDB_CODE_SUCCESS) {
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
code_ = parseInsertSql(&cxt_, &res_, metaCache.get());
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSql code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
return code_;
}
void dumpReslut() {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
cout << "payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType
<< ", numOfVgs:" << num << endl;
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
SSubmitReq* submit = (SSubmitReq*)vg->pData;
cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl;
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
cout << "Block:" << i << endl;
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion)
<< ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen)
<< ", numOfRows:" << ntohl(blk->numOfRows) << endl;
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}
}
void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
ASSERT_GE(num, 0);
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
ASSERT_EQ(vg->numOfTables, numOfTables);
ASSERT_GE(vg->size, 0);
SSubmitReq* submit = (SSubmitReq*)vg->pData;
ASSERT_GE(ntohl(submit->length), 0);
ASSERT_GE(ntohl(submit->numOfBlocks), 0);
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}
}
private:
static const int max_err_len = 1024;
static const int max_sql_len = 1024 * 1024;
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
destoryParseMetaCache(pMetaCache, request);
delete pMetaCache;
}
void reset() {
memset(&cxt_, 0, sizeof(cxt_));
memset(errMagBuf_, 0, max_err_len);
cxt_.pMsg = errMagBuf_;
cxt_.msgLen = max_err_len;
code_ = TSDB_CODE_SUCCESS;
qDestroyQuery(res_);
res_ = nullptr;
}
SVnodeModifOpStmt* getVnodeModifStmt(SQuery* pQuery) { return (SVnodeModifOpStmt*)pQuery->pRoot; }
string acctId_;
string db_;
char errMagBuf_[max_err_len];
char sqlBuf_[max_sql_len];
SParseContext cxt_;
int32_t code_;
SQuery* res_;
};
class ParserInsertTest : public ParserTestBase {};
// INSERT INTO tb_name [(field1_name, ...)] VALUES (field1_value, ...)
TEST_F(InsertTest, singleTableSingleRowTest) {
setDatabase("root", "test");
bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 1);
TEST_F(ParserInsertTest, singleTableSingleRowTest) {
useDb("root", "test");
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
run("INSERT INTO t1 VALUES (now, 1, 'beijing', 3, 4, 5)");
bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 1);
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO t1 (ts, c1, c2, c3, c4, c5) VALUES (now, 1, 'beijing', 3, 4, 5)");
}
// INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...)
TEST_F(InsertTest, singleTableMultiRowTest) {
setDatabase("root", "test");
bind(
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
"(now+2s, 3, 'guangzhou', 9, 10, 11)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 3);
TEST_F(ParserInsertTest, singleTableMultiRowTest) {
useDb("root", "test");
bind(
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
run("INSERT INTO t1 VALUES (now, 1, 'beijing', 3, 4, 5)"
"(now+1s, 2, 'shanghai', 6, 7, 8)"
"(now+2s, 3, 'guangzhou', 9, 10, 11)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
}
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
TEST_F(InsertTest, multiTableSingleRowTest) {
setDatabase("root", "test");
TEST_F(ParserInsertTest, multiTableSingleRowTest) {
useDb("root", "test");
bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(2, 1);
bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO st1s1 VALUES (now, 1, 'beijing') st1s2 VALUES (now, 10, '131028')");
}
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
TEST_F(InsertTest, multiTableMultiRowTest) {
setDatabase("root", "test");
bind(
"insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
" st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(2, 3, 2);
TEST_F(ParserInsertTest, multiTableMultiRowTest) {
useDb("root", "test");
bind(
"insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
" st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO "
"st1s1 VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou') "
"st1s2 VALUES (now, 10, '131028')(now+1s, 20, '132028')");
}
// INSERT INTO
// tb1_name USING st1_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
// tb2_name USING st2_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
TEST_F(InsertTest, autoCreateTableTest) {
setDatabase("root", "test");
bind(
"insert into st1s1 using st1 tags(1, 'wxy', now) "
"values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 3);
TEST_F(ParserInsertTest, autoCreateTableTest) {
useDb("root", "test");
bind(
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
run("INSERT INTO st1s1 USING st1 TAGS(1, 'wxy', now) "
"VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')");
bind(
"insert into st1s1 using st1 tags(1, 'wxy', now) "
"values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO st1s1 USING st1 (tag1, tag2) TAGS(1, 'wxy') (ts, c1, c2) "
"VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')");
bind(
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO st1s1 (ts, c1, c2) USING st1 (tag1, tag2) TAGS(1, 'wxy') "
"VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')");
bind(
"insert into st1s1 using st1 tags(1, 'wxy', now) values (now, 1, \"beijing\")"
"st1s1 using st1 tags(1, 'wxy', now) values (now+1s, 2, \"shanghai\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
run("INSERT INTO "
"st1s1 USING st1 (tag1, tag2) TAGS(1, 'wxy') (ts, c1, c2) VALUES (now, 1, 'beijing') "
"st1s2 (ts, c1, c2) USING st1 TAGS(2, 'abc', now) VALUES (now+1s, 2, 'shanghai')");
}
TEST_F(InsertTest, toleranceTest) {
setDatabase("root", "test");
bind("insert into");
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
bind("insert into t");
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
bind("insert into");
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
bind("insert into t");
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
}
} // namespace ParserTest
......@@ -225,16 +225,17 @@ class ParserTestBaseImpl {
DO_WITH_THROW(collectMetaKey, pCxt, pQuery, pMetaCache);
}
void doBuildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
DO_WITH_THROW(buildCatalogReq, pMetaCache, pCatalogReq);
void doBuildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
DO_WITH_THROW(buildCatalogReq, pCxt, pMetaCache, pCatalogReq);
}
void doGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) {
DO_WITH_THROW(g_mockCatalogService->catalogGetAllMeta, pCatalogReq, pMetaData);
}
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache);
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
bool isInsertValues) {
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache, isInsertValues);
}
void doAuthenticate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
......@@ -261,7 +262,9 @@ class ParserTestBaseImpl {
void doParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq) {
DO_WITH_THROW(qParseSqlSyntax, pCxt, pQuery, pCatalogReq);
ASSERT_NE(*pQuery, nullptr);
res_.parsedAst_ = toString((*pQuery)->pRoot);
if (nullptr != (*pQuery)->pRoot) {
res_.parsedAst_ = toString((*pQuery)->pRoot);
}
}
void doAnalyseSqlSemantic(SParseContext* pCxt, const SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
......@@ -270,6 +273,17 @@ class ParserTestBaseImpl {
res_.calcConstAst_ = toString(pQuery->pRoot);
}
void doParseInsertSql(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(parseInsertSql, pCxt, pQuery, pMetaCache);
ASSERT_NE(*pQuery, nullptr);
res_.parsedAst_ = toString((*pQuery)->pRoot);
}
void doParseInsertSyntax(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(parseInsertSyntax, pCxt, pQuery, pMetaCache);
ASSERT_NE(*pQuery, nullptr);
}
string toString(const SNode* pRoot) {
char* pStr = NULL;
int32_t len = 0;
......@@ -287,15 +301,20 @@ class ParserTestBaseImpl {
SParseContext cxt = {0};
setParseContext(sql, &cxt);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParse(&cxt, query.get());
SQuery* pQuery = *(query.get());
if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParseInsertSql(&cxt, query.get(), nullptr);
} else {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParse(&cxt, query.get());
SQuery* pQuery = *(query.get());
doAuthenticate(&cxt, pQuery, nullptr);
doAuthenticate(&cxt, pQuery, nullptr);
doTranslate(&cxt, pQuery, nullptr);
doTranslate(&cxt, pQuery, nullptr);
doCalculateConstant(&cxt, pQuery);
doCalculateConstant(&cxt, pQuery);
}
if (g_dump) {
dump();
......@@ -338,17 +357,22 @@ class ParserTestBaseImpl {
setParseContext(sql, &cxt, true);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParse(&cxt, query.get());
SQuery* pQuery = *(query.get());
bool request = true;
bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request)));
doCollectMetaKey(&cxt, pQuery, metaCache.get());
bool isInsertValues = qIsInsertValuesSql(cxt.pSql, cxt.sqlLen);
if (isInsertValues) {
doParseInsertSyntax(&cxt, query.get(), metaCache.get());
} else {
doParse(&cxt, query.get());
doCollectMetaKey(&cxt, *(query.get()), metaCache.get());
}
SQuery* pQuery = *(query.get());
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq);
doBuildCatalogReq(metaCache.get(), catalogReq.get());
doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get());
string err;
thread t1([&]() {
......@@ -358,13 +382,17 @@ class ParserTestBaseImpl {
metaCache.reset(new SParseMetaCache());
request = false;
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues);
doAuthenticate(&cxt, pQuery, metaCache.get());
if (isInsertValues) {
doParseInsertSql(&cxt, query.get(), metaCache.get());
} else {
doAuthenticate(&cxt, pQuery, metaCache.get());
doTranslate(&cxt, pQuery, metaCache.get());
doTranslate(&cxt, pQuery, metaCache.get());
doCalculateConstant(&cxt, pQuery);
doCalculateConstant(&cxt, pQuery);
}
} catch (const TerminateFlag& e) {
// success and terminate
} catch (const runtime_error& e) {
......
......@@ -1002,7 +1002,7 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
int32_t code =
nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, COLLECT_COL_TYPE_ALL, &pPartition->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pPartition->node.pTargets) {
code = nodesListMakeStrictAppend(&pPartition->node.pTargets, nodesListGetNode(pCxt->pCurrRoot->pTargets, 0));
code = nodesListMakeStrictAppend(&pPartition->node.pTargets, nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0)));
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -642,7 +642,6 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar
int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
trimFn(input, output, type, charLen);
varDataSetLen(output, len);
colDataAppend(pOutputData, i, output, false);
output += varDataTLen(output);
}
......
......@@ -293,7 +293,7 @@ int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst);
int transSockInfo2Str(struct sockaddr* sockname, char* dst);
int64_t transAllocHandle();
......
......@@ -826,11 +826,11 @@ void cliConnCb(uv_connect_t* req, int status) {
int addrlen = sizeof(peername);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
transGetSockDebugInfo(&peername, pConn->dst);
transSockInfo2Str(&peername, pConn->dst);
addrlen = sizeof(sockname);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
transGetSockDebugInfo(&sockname, pConn->src);
transSockInfo2Str(&sockname, pConn->src);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle);
......
......@@ -77,7 +77,7 @@ void transFreeMsg(void* msg) {
}
taosMemoryFree((char*)msg - sizeof(STransMsgHead));
}
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) {
int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[20] = {0};
......
......@@ -698,7 +698,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
transUnrefSrvHandle(pConn);
return;
}
transGetSockDebugInfo(&peername, pConn->dst);
transSockInfo2Str(&peername, pConn->dst);
addrlen = sizeof(sockname);
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&sockname, &addrlen)) {
......@@ -706,7 +706,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
transUnrefSrvHandle(pConn);
return;
}
transGetSockDebugInfo(&sockname, pConn->src);
transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
pConn->clientIp = addr.sin_addr.s_addr;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册