提交 dda98164 编写于 作者: S slzhou

Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fixbugs

......@@ -68,6 +68,7 @@ public class SubscribeDemo {
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
......
......@@ -28,7 +28,8 @@ function runConsumer() {
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
consumer.commit(msg);
// fixme(@xiaolei): commented temp, should be fixed.
//consumer.commit(msg);
console.log(`=======consumer ${i} done`)
}
......@@ -48,4 +49,4 @@ try {
cursor.close();
conn.close();
}, 2000);
}
\ No newline at end of file
}
---
sidebar_label: 数据订阅
description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
title: 数据订阅
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保 at least once 消费。
为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
## 主要数据结构和API
TMQ 的 API 中,与订阅相关的主要数据结构和API如下:
```c
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(int32_t code);
DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
};
typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
```
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码。
## 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
```sql
drop database if exists tmqdb;
create database tmqdb;
create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16) tags(t1 int, t3 varchar(16));
create table tmqdb.ctb0 using tmqdb.stb tags(0, "subtable0");
create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1");
insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
```
## 创建topic:
```sql
create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
```
TMQ支持多种订阅类型:
### 列订阅
语法:CREATE TOPIC topic_name as subquery
通过select语句订阅(包括select *,或select ts, c1等指定列描述订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)
- TOPIC一旦创建则schema确定
- 被订阅或用于计算的column和tag不可被删除、修改
- 若发生schema变更,新增的column不出现在结果中
### 超级表订阅
语法:CREATE TOPIC topic_name AS STABLE stbName
与select * from stbName订阅的区别是:
- 不会限制用户的schema变更
- 返回的是非结构化的数据:返回数据的schema会随之超级表的schema变化而变化
- 用户对于要处理的每一个数据块都可能有不同的schema,因此,必须重新获取schema
- 返回数据不带有tag
## 创建 consumer 以及consumer group
对于consumer, 目前支持的config包括:
| 参数名称 | 参数值 | 备注 |
| ---------------------------- | ------------------------------ | ------------------------------------------------------ |
| group.id | 最大长度:192 | |
| enable.auto.commit | 合法值:true, false | |
| auto.commit.interval.ms | | |
| auto.offset.reset | 合法值:earliest, latest, none | |
| td.connect.ip | 用于连接,同taos_connect的参数 | |
| td.connect.user | 用于连接,同taos_connect的参数 | |
| td.connect.pass | 用于连接,同taos_connect的参数 | |
| td.connect.port | 用于连接,同taos_connect的参数 | |
| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即consumer不会因为长时间不poll而认为离线 |
| experimental.snapshot.enable | 合法值:true, false | 从wal开始消费,还是从tsbs开始消费 |
| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 |
```sql
/* 根据需要,设置消费组(group.id)、自动提交(enable.auto.commit)、自动提交时间间隔(auto.commit.interval.ms)、用户名(td.connect.user)、密码(td.connect.pass)等参数 */
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
```
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
## 创建 topic 列表
单个consumer支持同时订阅多个topic。
```sql
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
```
## 启动订阅并开始消费
```
/* 启动订阅 */
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
/* 循环poll消息 */
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
msg_process(tmqmsg);
}
```
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。
## 结束消费
```sql
/* 取消订阅 */
tmq_unsubscribe(tmq);
/* 关闭消费 */
tmq_consumer_close(tmq);
```
## 删除topic
如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。
```sql
/* 删除topic */
drop topic topicName;
```
## 状态查看
1、topics:查询已经创建的topic
```sql
show topics;
```
2、consumers:查询consumer的状态及其订阅的topic
```sql
show consumers;
```
3、subscriptions:查询consumer与vgroup之间的分配关系
```sql
show subscriptions;
```
## 示例代码
本节展示各种语言的示例代码。
<Tabs>
<TabItem label="C" value="c">
```c
{{#include examples/c/tmq.c}}
```
</TabItem>
<TabItem label="Java" value="java">
<Java />
</TabItem>
<TabItem label="Go" value="Go">
<Go/>
</TabItem>
<TabItem label="Rust" value="Rust">
<Rust />
</TabItem>
<TabItem label="Python" value="Python">
```python
{{#include docs/examples/python/tmq_example.py}}
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
<Node/>
</TabItem>
<TabItem label="C#" value="C#">
<CSharp/>
</TabItem>
</Tabs>
此差异已折叠。
......@@ -103,7 +103,7 @@ SELECT d1001.* FROM d1001,d1003 WHERE d1001.ts = d1003.ts;
在超级表和子表的查询中可以指定 _标签列_,且标签列的值会与普通列的数据一起返回。
```sql
ELECT location, groupid, current FROM d1001 LIMIT 2;
SELECT location, groupid, current FROM d1001 LIMIT 2;
```
### 结果去重
......
---
sidebar_label: 3.0 版本语法变更
title: 3.0 版本语法变更
description: "TDengine 3.0 版本的语法变更说明"
---
## SQL 基本元素变更
| # | **元素** | **差异性** | **说明** |
| - | :------- | :--------: | :------- |
| 1 | VARCHAR | <div style="width: 40pt">新增</div> | BINARY类型的别名。
| 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。
| 3 | _ROWTS伪列 | 新增 | 表示时间戳主键。是_C0伪列的别名。
| 4 | INFORMATION_SCHEMA | 新增 | 包含各种SCHEMA定义的系统数据库。
| 5 | PERFORMANCE_SCHEMA | 新增 | 包含运行信息的系统数据库。
| 6 | 连续查询 | 废除 | 不再支持连续查询。相关的各种语法和接口废除。
| 7 | 混合运算 | 增强 | 查询中的混合运算(标量运算和矢量运算混合)全面增强,SELECT的各个子句均全面支持符合语法语义的混合运算。
| 8 | 标签运算 | 新增 |在查询中,标签列可以像普通列一样参与各种运算,用于各种子句。
| 9 | 时间线子句和时间函数用于超级表查询 | 增强 |没有PARTITION BY时,超级表的数据会被合并成一条时间线。
## SQL 语句变更
在 TDengine 中,普通表的数据模型中可使用以下数据类型。
| # | **语句** | **差异性** | **说明** |
| - | :------- | :--------: | :------- |
| 1 | ALTER ACCOUNT | <div style="width: 40pt">废除</div> | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。
| 3 | ALTER DATABASE | 调整 | 废除<ul><li>QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。</li><li>BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。</li><li>CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。</li><li>COMP:3.0版本暂不支持修改。<br/>新增</li><li>CACHEMODEL:表示是否在内存中缓存子表的最近数据。</li><li>CACHESIZE:表示缓存子表最近数据的内存大小。</li><li>WAL_FSYNC_PERIOD:代替原FSYNC参数。</li><li>WAL_LEVEL:代替原WAL参数。<br/>调整</li><li>REPLICA:3.0.0版本暂不支持修改。</li><li>KEEP:3.0版本新增支持带单位的设置方式。</li></ul>
| 4 | ALTER STABLE | 调整 | 废除<ul><li>CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。<br/>新增</li><li>RENAME TAG:代替原CHANGE TAG子句。</li><li>COMMENT:修改超级表的注释。</li></ul>
| 5 | ALTER TABLE | 调整 | 废除<ul><li>CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。<br/>新增</li><li>RENAME TAG:代替原CHANGE TAG子句。</li><li>COMMENT:修改表的注释。</li><li>TTL:修改表的生命周期。</li></ul>
| 6 | ALTER USER | 调整 | 废除<ul><li>PRIVILEGE:修改用户权限。3.0版本使用GRANT和REVOKE来授予和回收权限。<br/>新增</li><li>ENABLE:启用或停用此用户。</li><li>SYSINFO:修改用户是否可查看系统信息。</li></ul>
| 7 | COMPACT VNODES | 暂不支持 | 整理指定VNODE的数据。3.0.0版本暂不支持。
| 8 | CREATE ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 9 | CREATE DATABASE | 调整 | 废除<ul><li>BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>CACHE:VNODE使用的内存块的大小。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。</li><li>DAYS:数据文件存储数据的时间跨度。3.0版本使用DURATION代替。</li><li>FSYNC:当 WAL 设置为 2 时,执行 fsync 的周期。3.0版本使用WAL_FSYNC_PERIOD代替。</li><li>QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。</li><li>UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。</li><li>WAL:WAL 级别。3.0版本使用WAL_LEVEL代替。<br/>新增</li><li>BUFFER:一个 VNODE 写入内存池大小。</li><li>CACHEMODEL:表示是否在内存中缓存子表的最近数据。</li><li>CACHESIZE:表示缓存子表最近数据的内存大小。</li><li>DURATION:代替原DAYS参数。新增支持带单位的设置方式。</li><li>PAGES:一个 VNODE 中元数据存储引擎的缓存页个数。</li><li>PAGESIZE:一个 VNODE 中元数据存储引擎的页大小。</li><li>RETENTIONS:表示数据的聚合周期和保存时长。</li><li>STRICT:表示数据同步的一致性要求。</li><li>SINGLE_STABLE:表示此数据库中是否只可以创建一个超级表。</li><li>VGROUPS:数据库中初始VGROUP的数目。</li><li>WAL_FSYNC_PERIOD:代替原FSYNC参数。</li><li>WAL_LEVEL:代替原WAL参数。</li><li>WAL_RETENTION_PERIOD:wal文件的额外保留策略,用于数据订阅。</li><li>WAL_RETENTION_SIZE:wal文件的额外保留策略,用于数据订阅。</li><li>WAL_ROLL_PERIOD:wal文件切换时长。</li><li>WAL_SEGMENT_SIZE:wal单个文件大小。<br/>调整</li><li>KEEP:3.0版本新增支持带单位的设置方式。</li></ul>
| 10 | CREATE DNODE | 调整 | 新增主机名和端口号分开指定语法<ul><li>CREATE DNODE dnode_host_name PORT port_val</li></ul>
| 11 | CREATE INDEX | 新增 | 创建SMA索引。
| 12 | CREATE MNODE | 新增 | 创建管理节点。
| 13 | CREATE QNODE | 新增 | 创建查询节点。
| 14 | CREATE STABLE | 调整 | 新增表参数语法<li>COMMENT:表注释。</li>
| 15 | CREATE STREAM | 新增 | 创建流。
| 16 | CREATE TABLE | 调整 | 新增表参数语法<ul><li>COMMENT:表注释。</li><li>WATERMARK:指定窗口的关闭时间。</li><li>MAX_DELAY:用于控制推送计算结果的最大延迟。</li><li>ROLLUP:指定的聚合函数,提供基于多层级的降采样聚合结果。</li><li>SMA:提供基于数据块的自定义预计算功能。</li><li>TTL:用来指定表的生命周期的参数。</li></ul>
| 17 | CREATE TOPIC | 新增 | 创建订阅主题。
| 18 | DROP ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 19 | DROP CONSUMER GROUP | 新增 | 删除消费组。
| 20 | DROP INDEX | 新增 | 删除索引。
| 21 | DROP MNODE | 新增 | 创建管理节点。
| 22 | DROP QNODE | 新增 | 创建查询节点。
| 23 | DROP STREAM | 新增 | 删除流。
| 24 | DROP TABLE | 调整 | 新增批量删除语法
| 25 | DROP TOPIC | 新增 | 删除订阅主题。
| 26 | EXPLAIN | 新增 | 查看查询语句的执行计划。
| 27 | GRANT | 新增 | 授予用户权限。
| 28 | KILL TRANSACTION | 新增 | 终止管理节点的事务。
| 29 | KILL STREAM | 废除 | 终止连续查询。3.0版本不再支持连续查询,而是用更通用的流计算来代替。
| 30 | MERGE VGROUP | 新增 | 合并VGROUP。
| 31 | REVOKE | 新增 | 回收用户权限。
| 32 | SELECT | 调整 | <ul><li>SELECT关闭隐式结果列,输出列均需要由SELECT子句来指定。</li><li>DISTINCT功能全面支持。2.x版本只支持对标签列去重,并且不可以和JOIN、GROUP BY等子句混用。</li><li>JOIN功能增强。增加支持:JOIN后WHERE条件中有OR条件;JOIN后的多表运算;JOIN后的多表GROUP BY。</li><li>FROM后子查询功能大幅增强。不限制子查询嵌套层数;支持子查询和UNION ALL混合使用;移除其他一些之前版本的语法限制。</li><li>WHERE后可以使用任意的标量表达式。</li><li>GROUP BY功能增强。支持任意标量表达式及其组合的分组。</li><li>SESSION可以用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。</li><li>STATE_WINDOW可以用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。</li><li>ORDER BY功能大幅增强。不再必须和GROUP BY子句一起使用;不再有排序表达式个数的限制;增加支持NULLS FIRST/LAST语法功能;支持符合语法语义的任意表达式。</li><li>新增PARTITION BY语法。替代原来的GROUP BY tags。</li></ul>
| 33 | SHOW ACCOUNTS | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 34 | SHOW APPS |新增 | 显示接入集群的应用(客户端)信息。
| 35 | SHOW CONSUMERS | 新增 | 显示当前数据库下所有活跃的消费者的信息。
| 36 | SHOW DATABASES | 调整 | 3.0版本只显示数据库名。
| 37 | SHOW FUNCTIONS | 调整 | 3.0版本只显示自定义函数名。
| 38 | SHOW LICENCE | 新增 | 和SHOW GRANTS 命令等效。
| 39 | SHOW INDEXES | 新增 | 显示已创建的索引。
| 40 | SHOW LOCAL VARIABLES | 新增 | 显示当前客户端配置参数的运行值。
| 41 | SHOW MODULES | 废除 | 显示当前系统中所安装的组件的信息。
| 42 | SHOW QNODES | 新增 | 显示当前系统中QNODE的信息。
| 43 | SHOW STABLES | 调整 | 3.0版本只显示超级表名。
| 44 | SHOW STREAMS | 调整 | 2.x版本此命令显示系统中已创建的连续查询的信息。3.0版本废除了连续查询,用流代替。此命令显示已创建的流。
| 45 | SHOW SUBSCRIPTIONS | 新增 | 显示当前数据库下的所有的订阅关系
| 46 | SHOW TABLES | 调整 | 3.0版本只显示表名。
| 47 | SHOW TABLE DISTRIBUTED | 新增 | 显示表的数据分布信息。代替2.x版本中的SELECT _block_dist() FROM { tb_name | stb_name }方式。
| 48 | SHOW TOPICS | 新增 | 显示当前数据库下的所有订阅主题。
| 49 | SHOW TRANSACTIONS | 新增 | 显示当前系统中正在执行的事务的信息。
| 50 | SHOW DNODE VARIABLES | 新增 |显示指定DNODE的配置参数。
| 51 | SHOW VNODES | 暂不支持 | 显示当前系统中VNODE的信息。3.0.0版本暂不支持。
| 52 | SPLIT VGROUP | 新增 | 拆分VGROUP。
| 53 | TRIM DATABASE | 新增 | 删除过期数据,并根据多级存储的配置归整数据。
## SQL 函数变更
| # | **函数** | **差异性** | **说明** |
| - | :------- | :--------: | :------- |
| 1 | TWA | <div style="width: 40pt">增强</div> | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 3 | LEASTSQUARES | 增强 | 可以用于超级表了。
| 4 | ELAPSED | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 5 | DIFF | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 6 | DERIVATIVE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 7 | CSUM | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 8 | MAVG | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 9 | SAMPLE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 10 | STATECOUNT | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 11 | STATEDURATION | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
......@@ -3,7 +3,7 @@ title: TAOS SQL
description: "TAOS SQL 支持的语法规则、主要查询功能、支持的 SQL 查询函数,以及常用技巧等内容"
---
本文档说明 TAOS SQL 支持的语法规则、主要查询功能、支持的 SQL 查询函数,以及常用技巧等内容。阅读本文档需要读者具有基本的 SQL 语言的基础。
本文档说明 TAOS SQL 支持的语法规则、主要查询功能、支持的 SQL 查询函数,以及常用技巧等内容。阅读本文档需要读者具有基本的 SQL 语言的基础。TDengine 3.0 版本相比 2.x 版本做了大量改进和优化,特别是查询引擎进行了彻底的重构,因此 SQL 语法相比 2.x 版本有很多变更。详细的变更内容请见 [3.0 版本语法变更](/taos-sql/changes) 章节
TAOS SQL 是用户对 TDengine 进行数据写入和查询的主要工具。TAOS SQL 提供标准的 SQL 语法,并针对时序数据和业务的特点优化和新增了许多语法和功能。TAOS SQL 语句的最大长度为 1M。TAOS SQL 不支持关键字的缩写,例如 DELETE 不能缩写为 DEL。
......
......@@ -765,11 +765,11 @@ public abstract class ConsumerLoop {
shutdownLatch.await();
}
static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
static class ResultBean {
public static class ResultBean {
private Timestamp ts;
private int speed;
......
......@@ -647,3 +647,173 @@ charset 的有效值是 UTF-8。
| 含义 | 是否启动 udf 服务 |
| 取值范围 | 0: 不启动;1:启动 |
| 缺省值 | 1 |
## 2.X 与 3.0 配置参数对比
| # | **参数** | **适用于 2.X 版本** | **适用于 3.0 版本** |
| --- | :-----------------: | --------------- | --------------- |
| 1 | firstEp | 是 | 是 |
| 2 | secondEp | 是 | 是 |
| 3 | fqdn | 是 | 是 |
| 4 | serverPort | 是 | 是 |
| 5 | maxShellConns | 是 | 是 |
| 6 | monitor | 是 | 是 |
| 7 | monitorFqdn | 否 | 是 |
| 8 | monitorPort | 否 | 是 |
| 9 | monitorInterval | 是 | 是 |
| 10 | monitorMaxLogs | 否 | 是 |
| 11 | monitorComp | 否 | 是 |
| 12 | telemetryReporting | 是 | 是 |
| 13 | telemetryInterval | 否 | 是 |
| 14 | telemetryServer | 否 | 是 |
| 15 | telemetryPort | 否 | 是 |
| 16 | queryPolicy | 否 | 是 |
| 17 | querySmaOptimize | 否 | 是 |
| 18 | queryBufferSize | 是 | 是 |
| 19 | maxNumOfDistinctRes | 是 | 是 |
| 20 | minSlidingTime | 是 | 是 |
| 21 | minIntervalTime | 是 | 是 |
| 22 | countAlwaysReturnValue | 是 | 是 |
| 23 | dataDir | 是 | 是 |
| 24 | minimalDataDirGB | 是 | 是 |
| 25 | supportVnodes | 否 | 是 |
| 26 | tempDir | 是 | 是 |
| 27 | minimalTmpDirGB | 是 | 是 |
| 28 | compressMsgSize | 是 | 是 |
| 29 | compressColData | 是 | 是 |
| 30 | smlChildTableName | 是 | 是 |
| 31 | smlTagName | 是 | 是 |
| 32 | smlDataFormat | 否 | 是 |
| 33 | statusInterval | 是 | 是 |
| 34 | shellActivityTimer | 是 | 是 |
| 35 | transPullupInterval | 否 | 是 |
| 36 | mqRebalanceInterval | 否 | 是 |
| 37 | ttlUnit | 否 | 是 |
| 38 | ttlPushInterval | 否 | 是 |
| 39 | numOfTaskQueueThreads | 否 | 是 |
| 40 | numOfRpcThreads | 否 | 是 |
| 41 | numOfCommitThreads | 是 | 是 |
| 42 | numOfMnodeReadThreads | 否 | 是 |
| 43 | numOfVnodeQueryThreads | 否 | 是 |
| 44 | numOfVnodeStreamThreads | 否 | 是 |
| 45 | numOfVnodeFetchThreads | 否 | 是 |
| 46 | numOfVnodeWriteThreads | 否 | 是 |
| 47 | numOfVnodeSyncThreads | 否 | 是 |
| 48 | numOfQnodeQueryThreads | 否 | 是 |
| 49 | numOfQnodeFetchThreads | 否 | 是 |
| 50 | numOfSnodeSharedThreads | 否 | 是 |
| 51 | numOfSnodeUniqueThreads | 否 | 是 |
| 52 | rpcQueueMemoryAllowed | 否 | 是 |
| 53 | logDir | 是 | 是 |
| 54 | minimalLogDirGB | 是 | 是 |
| 55 | numOfLogLines | 是 | 是 |
| 56 | asyncLog | 是 | 是 |
| 57 | logKeepDays | 是 | 是 |
| 58 | debugFlag | 是 | 是 |
| 59 | tmrDebugFlag | 是 | 是 |
| 60 | uDebugFlag | 是 | 是 |
| 61 | rpcDebugFlag | 是 | 是 |
| 62 | jniDebugFlag | 是 | 是 |
| 63 | qDebugFlag | 是 | 是 |
| 64 | cDebugFlag | 是 | 是 |
| 65 | dDebugFlag | 是 | 是 |
| 66 | vDebugFlag | 是 | 是 |
| 67 | mDebugFlag | 是 | 是 |
| 68 | wDebugFlag | 是 | 是 |
| 69 | sDebugFlag | 是 | 是 |
| 70 | tsdbDebugFlag | 是 | 是 |
| 71 | tqDebugFlag | 否 | 是 |
| 72 | fsDebugFlag | 是 | 是 |
| 73 | udfDebugFlag | 否 | 是 |
| 74 | smaDebugFlag | 否 | 是 |
| 75 | idxDebugFlag | 否 | 是 |
| 76 | tdbDebugFlag | 否 | 是 |
| 77 | metaDebugFlag | 否 | 是 |
| 78 | timezone | 是 | 是 |
| 79 | locale | 是 | 是 |
| 80 | charset | 是 | 是 |
| 81 | udf | 是 | 是 |
| 82 | enableCoreFile | 是 | 是 |
| 83 | arbitrator | 是 | 否 |
| 84 | numOfThreadsPerCore | 是 | 否 |
| 85 | numOfMnodes | 是 | 否 |
| 86 | vnodeBak | 是 | 否 |
| 87 | balance | 是 | 否 |
| 88 | balanceInterval | 是 | 否 |
| 89 | offlineThreshold | 是 | 否 |
| 90 | role | 是 | 否 |
| 91 | dnodeNopLoop | 是 | 否 |
| 92 | keepTimeOffset | 是 | 否 |
| 93 | rpcTimer | 是 | 否 |
| 94 | rpcMaxTime | 是 | 否 |
| 95 | rpcForceTcp | 是 | 否 |
| 96 | tcpConnTimeout | 是 | 否 |
| 97 | syncCheckInterval | 是 | 否 |
| 98 | maxTmrCtrl | 是 | 否 |
| 99 | monitorReplica | 是 | 否 |
| 100 | smlTagNullName | 是 | 否 |
| 101 | keepColumnName | 是 | 否 |
| 102 | ratioOfQueryCores | 是 | 否 |
| 103 | maxStreamCompDelay | 是 | 否 |
| 104 | maxFirstStreamCompDelay | 是 | 否 |
| 105 | retryStreamCompDelay | 是 | 否 |
| 106 | streamCompDelayRatio | 是 | 否 |
| 107 | maxVgroupsPerDb | 是 | 否 |
| 108 | maxTablesPerVnode | 是 | 否 |
| 109 | minTablesPerVnode | 是 | 否 |
| 110 | tableIncStepPerVnode | 是 | 否 |
| 111 | cache | 是 | 否 |
| 112 | blocks | 是 | 否 |
| 113 | days | 是 | 否 |
| 114 | keep | 是 | 否 |
| 115 | minRows | 是 | 否 |
| 116 | maxRows | 是 | 否 |
| 117 | quorum | 是 | 否 |
| 118 | comp | 是 | 否 |
| 119 | walLevel | 是 | 否 |
| 120 | fsync | 是 | 否 |
| 121 | replica | 是 | 否 |
| 122 | partitions | 是 | 否 |
| 123 | quorum | 是 | 否 |
| 124 | update | 是 | 否 |
| 125 | cachelast | 是 | 否 |
| 126 | maxSQLLength | 是 | 否 |
| 127 | maxWildCardsLength | 是 | 否 |
| 128 | maxRegexStringLen | 是 | 否 |
| 129 | maxNumOfOrderedRes | 是 | 否 |
| 130 | maxConnections | 是 | 否 |
| 131 | mnodeEqualVnodeNum | 是 | 否 |
| 132 | http | 是 | 否 |
| 133 | httpEnableRecordSql | 是 | 否 |
| 134 | httpMaxThreads | 是 | 否 |
| 135 | restfulRowLimit | 是 | 否 |
| 136 | httpDbNameMandatory | 是 | 否 |
| 137 | httpKeepAlive | 是 | 否 |
| 138 | enableRecordSql | 是 | 否 |
| 139 | maxBinaryDisplayWidth | 是 | 否 |
| 140 | stream | 是 | 否 |
| 141 | retrieveBlockingModel | 是 | 否 |
| 142 | tsdbMetaCompactRatio | 是 | 否 |
| 143 | defaultJSONStrType | 是 | 否 |
| 144 | walFlushSize | 是 | 否 |
| 145 | keepTimeOffset | 是 | 否 |
| 146 | flowctrl | 是 | 否 |
| 147 | slaveQuery | 是 | 否 |
| 148 | adjustMaster | 是 | 否 |
| 149 | topicBinaryLen | 是 | 否 |
| 150 | telegrafUseFieldNum | 是 | 否 |
| 151 | deadLockKillQuery | 是 | 否 |
| 152 | clientMerge | 是 | 否 |
| 153 | sdbDebugFlag | 是 | 否 |
| 154 | odbcDebugFlag | 是 | 否 |
| 155 | httpDebugFlag | 是 | 否 |
| 156 | monDebugFlag | 是 | 否 |
| 157 | cqDebugFlag | 是 | 否 |
| 158 | shortcutFlag | 是 | 否 |
| 159 | probeSeconds | 是 | 否 |
| 160 | probeKillSeconds | 是 | 否 |
| 161 | probeInterval | 是 | 否 |
| 162 | lossyColumns | 是 | 否 |
| 163 | fPrecision | 是 | 否 |
| 164 | dPrecision | 是 | 否 |
| 165 | maxRange | 是 | 否 |
| 166 | range | 是 | 否 |
......@@ -3,7 +3,7 @@ sidebar_label: 发布历史
title: 发布历史
---
import Release from "/components/Release";
import Release from "/components/ReleaseV3";
<Release versionPrefix="3.0" />
......@@ -45,10 +45,9 @@ static int32_t msg_process(TAOS_RES* msg) {
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf);
printf("row content: %s\n", buf);
}
return rows;
......@@ -167,7 +166,7 @@ int32_t create_topic() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -199,9 +198,7 @@ tmq_t* build_consumer() {
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "msg.with.table.name", "true");
code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -220,14 +217,7 @@ tmq_list_t* build_topic_list() {
return topicList;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
int32_t code;
if ((code = tmq_subscribe(tmq, topicList))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
return;
}
void basic_consume_loop(tmq_t* tmq) {
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
......@@ -237,8 +227,8 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
msgCnt++;
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
/*} else {*/
/*break;*/
} else {
break;
}
}
......@@ -267,14 +257,12 @@ int main(int argc, char* argv[]) {
return -1;
}
basic_consume_loop(tmq, topic_list);
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
} else {
fprintf(stderr, "%% unsubscribe\n");
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
}
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
code = tmq_consumer_close(tmq);
if (code) {
......
......@@ -131,10 +131,10 @@ DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT int taos_init(void);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type);
const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
......@@ -244,33 +244,37 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
/* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable
enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
};
typedef struct tmq_raw_data{
void* raw;
typedef struct tmq_raw_data {
void *raw;
uint32_t raw_len;
uint16_t raw_type;
} tmq_raw_data;
typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char* tbname);
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT void tmq_free_json_meta(char* jsonMeta);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
/* ------------------------------ TMQ END -------------------------------- */
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);
DLL_EXPORT void tmq_free_json_meta(char *jsonMeta);
/* ---------------------------- TAOSX END -------------------------------- */
typedef enum {
TSDB_SRV_STATUS_UNAVAILABLE = 0,
......
......@@ -53,6 +53,8 @@ typedef struct SParseContext {
int8_t schemalessType;
const char* svrVer;
bool nodeOffline;
SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos
} SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
......@@ -84,8 +86,8 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
int32_t rowNum);
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields);
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, TAOS_MULTI_BIND* bind,
char* msgBuf, int32_t msgBufLen);
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen);
......
......@@ -622,6 +622,7 @@ int32_t* taosGetErrno();
//tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
#ifdef __cplusplus
}
......
......@@ -29,11 +29,11 @@ int32_t taosOpenRef(int32_t max, void (*fp)(void *));
// close the reference set, refId is the return value by taosOpenRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
int32_t taosCloseRef(int32_t refId);
int32_t taosCloseRef(int32_t rsetId);
// add ref, p is the pointer to resource or pointer ID
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
int64_t taosAddRef(int32_t refId, void *p);
int64_t taosAddRef(int32_t rsetId, void *p);
// remove ref, rid is the reference ID returned by taosAddRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
......
......@@ -689,11 +689,11 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (TDMT_VND_SUBMIT == pRequest->type) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, res.numOfRows);
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
}
......@@ -800,8 +800,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
break;
}
case TDMT_VND_SUBMIT: {
atomic_add_fetch_64((int64_t *)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
}
......@@ -832,9 +832,9 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
if (pResult) {
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
if (TDMT_VND_SUBMIT == pRequest->type) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, pResult->numOfRows);
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
}
}
......@@ -877,14 +877,14 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
if (pQuery->pRoot) {
pRequest->stmtType = pQuery->pRoot->type;
}
if (pQuery->pRoot && !pRequest->inRetry) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
if (QUERY_NODE_VNODE_MODIF_STMT == pQuery->pRoot->type) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
}
}
......@@ -1467,9 +1467,9 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
if (pResultInfo->numOfRows == 0) {
return NULL;
......@@ -2006,7 +2006,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
bool inEscape = false;
int32_t code = 0;
void *pIter = NULL;
void* pIter = NULL;
int32_t vIdx = 0;
int32_t vPos[2];
......
......@@ -192,6 +192,7 @@ void taos_free_result(TAOS_RES *res) {
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
taosMemoryFree(pRspObj->metaRsp.metaRsp);
......
此差异已折叠。
此差异已折叠。
......@@ -22,6 +22,7 @@ extern "C" {
#include "catalog.h"
#include "os.h"
#include "parser.h"
#include "query.h"
#define parserFatal(param, ...) qFatal("PARSER: " param, ##__VA_ARGS__)
......@@ -44,18 +45,37 @@ typedef struct SParseTablesMetaReq {
SHashObj* pTables;
} SParseTablesMetaReq;
typedef enum ECatalogReqType {
CATALOG_REQ_TYPE_META = 1,
CATALOG_REQ_TYPE_VGROUP,
CATALOG_REQ_TYPE_BOTH
} ECatalogReqType;
typedef struct SInsertTablesMetaReq {
char dbFName[TSDB_DB_FNAME_LEN];
SArray* pTableMetaPos;
SArray* pTableMetaReq; // element is SName
SArray* pTableVgroupPos;
SArray* pTableVgroupReq; // element is SName
} SInsertTablesMetaReq;
typedef struct SParseMetaCache {
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
SArray* pDnodes; // element is SEpSet
bool dnodeRequired;
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
SArray* pDnodes; // element is SEpSet
bool dnodeRequired;
SHashObj* pInsertTables; // key is dbName, element is SInsertTablesMetaReq*, for insert
const char* pUser;
const SArray* pTableMetaData; // pRes = STableMeta*
const SArray* pTableVgroupData; // pRes = SVgroupInfo*
int32_t sqlTableNum;
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
......@@ -72,8 +92,9 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
bool insertValuesStmt);
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
......@@ -100,6 +121,12 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
STableMeta** pMeta);
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
SVgroupInfo* pVgroup);
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
#ifdef __cplusplus
......
......@@ -73,6 +73,9 @@ typedef struct SInsertParseContext {
SStmtCallback* pStmtCb;
SParseMetaCache* pMetaCache;
char sTableName[TSDB_TABLE_NAME_LEN];
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
int64_t memElapsed;
int64_t parRowElapsed;
} SInsertParseContext;
typedef struct SInsertParseSyntaxCxt {
......@@ -203,10 +206,11 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass)
return catalogChkAuth(pBasicCtx->pCatalog, &conn, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
}
static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
static int32_t getTableSchema(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, bool isStb,
STableMeta** pTableMeta) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (pBasicCtx->async) {
return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
return getTableMetaFromCacheForInsert(pBasicCtx->pTableMetaPos, pCxt->pMetaCache, tbNo, pTableMeta);
}
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
......@@ -219,10 +223,10 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is
return catalogGetTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
}
static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
static int32_t getTableVgroup(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, SVgroupInfo* pVg) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (pBasicCtx->async) {
return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
return getTableVgroupFromCacheForInsert(pBasicCtx->pTableVgroupPos, pCxt->pMetaCache, tbNo, pVg);
}
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
......@@ -231,28 +235,22 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroup
return catalogGetTableHashVgroup(pBasicCtx->pCatalog, &conn, pTbName, pVg);
}
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
bool pass = false;
CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
if (!pass) {
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta));
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname, bool isStb) {
CHECK_CODE(getTableSchema(pCxt, tbNo, name, isStb, &pCxt->pTableMeta));
if (!isStb) {
SVgroupInfo vg;
CHECK_CODE(getTableVgroup(pCxt, name, &vg));
CHECK_CODE(getTableVgroup(pCxt, tbNo, name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
}
return TSDB_CODE_SUCCESS;
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, name, dbFname, false);
static int32_t getTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, tbNo, name, dbFname, false);
}
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, name, dbFname, true);
static int32_t getSTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, tbNo, name, dbFname, true);
}
static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) {
......@@ -1028,13 +1026,13 @@ end:
return code;
}
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
int32_t len, STableMeta* pMeta) {
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, int32_t tbNo, SName* pTableName,
const char* pName, int32_t len, STableMeta* pMeta) {
SVgroupInfo vg;
CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg));
CHECK_CODE(getTableVgroup(pCxt, tbNo, pTableName, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pMeta->uid = 0;
pMeta->uid = tbNo;
pMeta->vgId = vg.vgId;
pMeta->tableType = TSDB_CHILD_TABLE;
......@@ -1084,7 +1082,7 @@ static int32_t ignoreAutoCreateTableClause(SInsertParseContext* pCxt) {
}
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* tbFName) {
int32_t len = strlen(tbFName);
STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
if (NULL != pMeta) {
......@@ -1102,11 +1100,11 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
tNameGetFullDbName(&sname, dbFName);
strcpy(pCxt->sTableName, sname.tname);
CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName));
CHECK_CODE(getSTableMeta(pCxt, tbNo, &sname, dbFName));
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
}
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, tbNo, name, tbFName, len, pCxt->pTableMeta));
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
......@@ -1195,7 +1193,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
tdSRowEnd(pBuilder);
*gotRow = true;
#ifdef TD_DEBUG_PRINT_ROW
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(schema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
......@@ -1214,7 +1212,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
(*numOfRows) = 0;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
// char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
SToken sToken;
while (1) {
int32_t index = 0;
......@@ -1232,7 +1230,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
}
bool gotRow = false;
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, pCxt->tmpTokenBuf));
if (gotRow) {
pDataBlock->size += extendedRowSize; // len;
}
......@@ -1347,7 +1345,9 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
}
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
taosMemoryFreeClear(pCxt->pTableMeta);
if (!pCxt->pComCxt->async) {
taosMemoryFreeClear(pCxt->pTableMeta);
}
destroyBoundColumnInfo(&pCxt->tags);
tdDestroySVCreateTbReq(&pCxt->createTblReq);
}
......@@ -1365,6 +1365,20 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyBlockArrayList(pCxt->pVgDataBlocks);
}
static int32_t parseTableName(SInsertParseContext* pCxt, SToken* pTbnameToken, SName* pName, char* pDbFName,
char* pTbFName) {
int32_t code = createSName(pName, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
if (TSDB_CODE_SUCCESS == code) {
tNameExtractFullName(pName, pTbFName);
code = taosHashPut(pCxt->pTableNameHashObj, pTbFName, strlen(pTbFName), pName, sizeof(SName));
}
if (TSDB_CODE_SUCCESS == code) {
tNameGetFullDbName(pName, pDbFName);
code = taosHashPut(pCxt->pDbFNameHashObj, pDbFName, strlen(pDbFName), pDbFName, TSDB_DB_FNAME_LEN);
}
return code;
}
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
......@@ -1372,7 +1386,9 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t tbNum = 0;
SName name;
char tbFName[TSDB_TABLE_FNAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
bool autoCreateTbl = false;
// for each table
......@@ -1415,20 +1431,15 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
SToken tbnameToken = sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
SName name;
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&name, dbFName);
CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)));
if (!pCxt->pComCxt->async || TK_USING == sToken.type) {
CHECK_CODE(parseTableName(pCxt, &tbnameToken, &name, dbFName, tbFName));
}
bool existedUsing = false;
// USING clause
if (TK_USING == sToken.type) {
existedUsing = true;
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
NEXT_TOKEN(pCxt->pSql, sToken);
autoCreateTbl = true;
}
......@@ -1438,22 +1449,31 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// pSql -> field1_name, ...)
pBoundColsStart = pCxt->pSql;
CHECK_CODE(ignoreBoundColumns(pCxt));
// CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
NEXT_TOKEN(pCxt->pSql, sToken);
}
if (TK_USING == sToken.type) {
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
if (pCxt->pComCxt->async) {
CHECK_CODE(parseTableName(pCxt, &tbnameToken, &name, dbFName, tbFName));
}
CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName));
NEXT_TOKEN(pCxt->pSql, sToken);
autoCreateTbl = true;
} else if (!existedUsing) {
CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName));
}
STableDataBlocks* dataBuf = NULL;
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
&dataBuf, NULL, &pCxt->createTblReq));
if (pCxt->pComCxt->async) {
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid),
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL,
&pCxt->createTblReq));
} else {
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
&dataBuf, NULL, &pCxt->createTblReq));
}
if (NULL != pBoundColsStart) {
char* pCurrPos = pCxt->pSql;
......@@ -1532,7 +1552,9 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
.totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
.pStmtCb = pContext->pStmtCb,
.pMetaCache = pMetaCache};
.pMetaCache = pMetaCache,
.memElapsed = 0,
.parRowElapsed = 0};
if (pContext->pStmtCb && *pQuery) {
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
......@@ -1547,7 +1569,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
} else {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
context.pTableBlockHashObj =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
}
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
......@@ -1656,24 +1678,24 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
return TSDB_CODE_SUCCESS;
}
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) {
SName name;
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo,
pCxt->pMetaCache));
return TSDB_CODE_SUCCESS;
}
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, int32_t tableNo, SToken* pTbToken) {
SName name;
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, CATALOG_REQ_TYPE_VGROUP, tableNo, pCxt->pMetaCache));
return TSDB_CODE_SUCCESS;
}
static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
bool hasData = false;
bool hasData = false;
int32_t tableNo = 0;
// for each table
while (1) {
SToken sToken;
......@@ -1702,9 +1724,9 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
// USING clause
if (TK_USING == sToken.type) {
existedUsing = true;
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken));
NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
CHECK_CODE(skipUsingClause(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
}
......@@ -1717,15 +1739,17 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
if (TK_USING == sToken.type && !existedUsing) {
existedUsing = true;
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken));
NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken));
CHECK_CODE(skipUsingClause(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
} else {
CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken));
} else if (!existedUsing) {
CHECK_CODE(collectTableMetaKey(pCxt, false, tableNo, &tbnameToken));
}
++tableNo;
if (TK_VALUES == sToken.type) {
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
CHECK_CODE(skipValuesClause(pCxt));
......
......@@ -476,9 +476,11 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) {
static int32_t buildTableReqFromDb(SHashObj* pDbsHash, SArray** pDbs) {
if (NULL != pDbsHash) {
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SParseTablesMetaReq* p = taosHashIterate(pDbsHash, NULL);
while (NULL != p) {
......@@ -530,7 +532,62 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
return TSDB_CODE_SUCCESS;
}
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
static int32_t buildCatalogReqForInsert(SParseContext* pCxt, const SParseMetaCache* pMetaCache,
SCatalogReq* pCatalogReq) {
int32_t ndbs = taosHashGetSize(pMetaCache->pInsertTables);
pCatalogReq->pTableMeta = taosArrayInit(ndbs, sizeof(STablesReq));
if (NULL == pCatalogReq->pTableMeta) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCatalogReq->pTableHash = taosArrayInit(ndbs, sizeof(STablesReq));
if (NULL == pCatalogReq->pTableHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCatalogReq->pUser = taosArrayInit(ndbs, sizeof(SUserAuthInfo));
if (NULL == pCatalogReq->pUser) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pTableMetaPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
pCxt->pTableVgroupPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
int32_t metaReqNo = 0;
int32_t vgroupReqNo = 0;
SInsertTablesMetaReq* p = taosHashIterate(pMetaCache->pInsertTables, NULL);
while (NULL != p) {
STablesReq req = {0};
strcpy(req.dbFName, p->dbFName);
TSWAP(req.pTables, p->pTableMetaReq);
taosArrayPush(pCatalogReq->pTableMeta, &req);
req.pTables = NULL;
TSWAP(req.pTables, p->pTableVgroupReq);
taosArrayPush(pCatalogReq->pTableHash, &req);
int32_t ntables = taosArrayGetSize(p->pTableMetaPos);
for (int32_t i = 0; i < ntables; ++i) {
taosArrayInsert(pCxt->pTableMetaPos, *(int32_t*)taosArrayGet(p->pTableMetaPos, i), &metaReqNo);
++metaReqNo;
}
ntables = taosArrayGetSize(p->pTableVgroupPos);
for (int32_t i = 0; i < ntables; ++i) {
taosArrayInsert(pCxt->pTableVgroupPos, *(int32_t*)taosArrayGet(p->pTableVgroupPos, i), &vgroupReqNo);
++vgroupReqNo;
}
SUserAuthInfo auth = {0};
strcpy(auth.user, pCxt->pUser);
strcpy(auth.dbFName, p->dbFName);
auth.type = AUTH_TYPE_WRITE;
taosArrayPush(pCatalogReq->pUser, &auth);
p = taosHashIterate(pMetaCache->pInsertTables, p);
}
return TSDB_CODE_SUCCESS;
}
int32_t buildCatalogReqForQuery(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
......@@ -560,6 +617,13 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
return code;
}
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
if (NULL != pMetaCache->pInsertTables) {
return buildCatalogReqForInsert(pCxt, pMetaCache, pCatalogReq);
}
return buildCatalogReqForQuery(pMetaCache, pCatalogReq);
}
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) {
if (NULL == *pHash) {
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
......@@ -647,7 +711,8 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas
return TSDB_CODE_SUCCESS;
}
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t putMetaDataToCacheForQuery(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
SParseMetaCache* pMetaCache) {
int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup);
......@@ -677,6 +742,30 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
return code;
}
int32_t putMetaDataToCacheForInsert(const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t ndbs = taosArrayGetSize(pMetaData->pUser);
for (int32_t i = 0; i < ndbs; ++i) {
SMetaRes* pRes = taosArrayGet(pMetaData->pUser, i);
if (TSDB_CODE_SUCCESS != pRes->code) {
return pRes->code;
}
if (!(*(bool*)pRes->pRes)) {
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
}
pMetaCache->pTableMetaData = pMetaData->pTableMeta;
pMetaCache->pTableVgroupData = pMetaData->pTableHash;
return TSDB_CODE_SUCCESS;
}
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
bool insertValuesStmt) {
if (insertValuesStmt) {
return putMetaDataToCacheForInsert(pMetaData, pMetaCache);
}
return putMetaDataToCacheForQuery(pCatalogReq, pMetaData, pMetaCache);
}
static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) {
if (NULL == *pTables) {
*pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
......@@ -977,6 +1066,82 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
return TSDB_CODE_SUCCESS;
}
static int32_t reserveTableReqInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SInsertTablesMetaReq* pReq) {
switch (reqType) {
case CATALOG_REQ_TYPE_META:
taosArrayPush(pReq->pTableMetaReq, pName);
taosArrayPush(pReq->pTableMetaPos, &tableNo);
break;
case CATALOG_REQ_TYPE_VGROUP:
taosArrayPush(pReq->pTableVgroupReq, pName);
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
break;
case CATALOG_REQ_TYPE_BOTH:
taosArrayPush(pReq->pTableMetaReq, pName);
taosArrayPush(pReq->pTableMetaPos, &tableNo);
taosArrayPush(pReq->pTableVgroupReq, pName);
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
break;
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static int32_t reserveTableReqInDbCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SHashObj* pDbs) {
SInsertTablesMetaReq req = {.pTableMetaReq = taosArrayInit(4, sizeof(SName)),
.pTableMetaPos = taosArrayInit(4, sizeof(int32_t)),
.pTableVgroupReq = taosArrayInit(4, sizeof(SName)),
.pTableVgroupPos = taosArrayInit(4, sizeof(int32_t))};
tNameGetFullDbName(pName, req.dbFName);
int32_t code = reserveTableReqInCacheForInsert(pName, reqType, tableNo, &req);
if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pDbs, pName->dbname, strlen(pName->dbname), &req, sizeof(SInsertTablesMetaReq));
}
return code;
}
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SParseMetaCache* pMetaCache) {
if (NULL == pMetaCache->pInsertTables) {
pMetaCache->pInsertTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pMetaCache->pInsertTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pMetaCache->sqlTableNum = tableNo;
SInsertTablesMetaReq* pReq = taosHashGet(pMetaCache->pInsertTables, pName->dbname, strlen(pName->dbname));
if (NULL == pReq) {
return reserveTableReqInDbCacheForInsert(pName, reqType, tableNo, pMetaCache->pInsertTables);
}
return reserveTableReqInCacheForInsert(pName, reqType, tableNo, pReq);
}
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
STableMeta** pMeta) {
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableMetaPos, tableNo);
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableMetaData, reqIndex);
if (TSDB_CODE_SUCCESS == pRes->code) {
*pMeta = pRes->pRes;
if (NULL == *pMeta) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return pRes->code;
}
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
SVgroupInfo* pVgroup) {
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableVgroupPos, tableNo);
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableVgroupData, reqIndex);
if (TSDB_CODE_SUCCESS == pRes->code) {
memcpy(pVgroup, pRes->pRes, sizeof(SVgroupInfo));
}
return pRes->code;
}
void destoryParseTablesMetaReqHash(SHashObj* pHash) {
SParseTablesMetaReq* p = taosHashIterate(pHash, NULL);
while (NULL != p) {
......
......@@ -185,7 +185,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq(&metaCache, pCatalogReq);
code = buildCatalogReq(pCxt, &metaCache, pCatalogReq);
}
destoryParseMetaCache(&metaCache, true);
terrno = code;
......@@ -195,7 +195,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) {
SParseMetaCache metaCache = {0};
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache);
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pQuery->pRoot) {
code = parseInsertSql(pCxt, &pQuery, &metaCache);
......
......@@ -13,21 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <functional>
#include <gtest/gtest.h>
#include "mockCatalogService.h"
#include "os.h"
#include "parInt.h"
#include "parTestUtil.h"
using namespace std;
using namespace std::placeholders;
using namespace testing;
namespace {
string toString(int32_t code) { return tstrerror(code); }
} // namespace
namespace ParserTest {
// syntax:
// INSERT INTO
......@@ -36,259 +28,60 @@ string toString(int32_t code) { return tstrerror(code); }
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
class InsertTest : public Test {
protected:
InsertTest() : res_(nullptr) {}
~InsertTest() { reset(); }
void setDatabase(const string& acctId, const string& db) {
acctId_ = acctId;
db_ = db;
}
void bind(const char* sql) {
reset();
cxt_.acctId = atoi(acctId_.c_str());
cxt_.db = (char*)db_.c_str();
strcpy(sqlBuf_, sql);
cxt_.sqlLen = strlen(sql);
sqlBuf_[cxt_.sqlLen] = '\0';
cxt_.pSql = sqlBuf_;
}
int32_t run() {
code_ = parseInsertSql(&cxt_, &res_, nullptr);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
}
return code_;
}
int32_t runAsync() {
cxt_.async = true;
bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
new SParseMetaCache(), std::bind(_destoryParseMetaCache, _1, cref(request)));
code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get());
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq);
code_ = buildCatalogReq(metaCache.get(), catalogReq.get());
if (code_ != TSDB_CODE_SUCCESS) {
cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get());
metaCache.reset(new SParseMetaCache());
request = false;
code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
if (code_ != TSDB_CODE_SUCCESS) {
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
code_ = parseInsertSql(&cxt_, &res_, metaCache.get());
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSql code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
return code_;
}
void dumpReslut() {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
cout << "payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType
<< ", numOfVgs:" << num << endl;
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
SSubmitReq* submit = (SSubmitReq*)vg->pData;
cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl;
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
cout << "Block:" << i << endl;
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion)
<< ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen)
<< ", numOfRows:" << ntohl(blk->numOfRows) << endl;
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}
}
void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
ASSERT_GE(num, 0);
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
ASSERT_EQ(vg->numOfTables, numOfTables);
ASSERT_GE(vg->size, 0);
SSubmitReq* submit = (SSubmitReq*)vg->pData;
ASSERT_GE(ntohl(submit->length), 0);
ASSERT_GE(ntohl(submit->numOfBlocks), 0);
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}
}
private:
static const int max_err_len = 1024;
static const int max_sql_len = 1024 * 1024;
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
destoryParseMetaCache(pMetaCache, request);
delete pMetaCache;
}
void reset() {
memset(&cxt_, 0, sizeof(cxt_));
memset(errMagBuf_, 0, max_err_len);
cxt_.pMsg = errMagBuf_;
cxt_.msgLen = max_err_len;
code_ = TSDB_CODE_SUCCESS;
qDestroyQuery(res_);
res_ = nullptr;
}
SVnodeModifOpStmt* getVnodeModifStmt(SQuery* pQuery) { return (SVnodeModifOpStmt*)pQuery->pRoot; }
string acctId_;
string db_;
char errMagBuf_[max_err_len];
char sqlBuf_[max_sql_len];
SParseContext cxt_;
int32_t code_;
SQuery* res_;
};
class ParserInsertTest : public ParserTestBase {};
// INSERT INTO tb_name [(field1_name, ...)] VALUES (field1_value, ...)
TEST_F(InsertTest, singleTableSingleRowTest) {
setDatabase("root", "test");
bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 1);
TEST_F(ParserInsertTest, singleTableSingleRowTest) {
useDb("root", "test");
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
run("INSERT INTO t1 VALUES (now, 1, 'beijing', 3, 4, 5)");
bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 1);
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO t1 (ts, c1, c2, c3, c4, c5) VALUES (now, 1, 'beijing', 3, 4, 5)");
}
// INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...)
TEST_F(InsertTest, singleTableMultiRowTest) {
setDatabase("root", "test");
bind(
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
"(now+2s, 3, 'guangzhou', 9, 10, 11)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 3);
TEST_F(ParserInsertTest, singleTableMultiRowTest) {
useDb("root", "test");
bind(
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
run("INSERT INTO t1 VALUES (now, 1, 'beijing', 3, 4, 5)"
"(now+1s, 2, 'shanghai', 6, 7, 8)"
"(now+2s, 3, 'guangzhou', 9, 10, 11)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
}
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
TEST_F(InsertTest, multiTableSingleRowTest) {
setDatabase("root", "test");
TEST_F(ParserInsertTest, multiTableSingleRowTest) {
useDb("root", "test");
bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(2, 1);
bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO st1s1 VALUES (now, 1, 'beijing') st1s2 VALUES (now, 10, '131028')");
}
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
TEST_F(InsertTest, multiTableMultiRowTest) {
setDatabase("root", "test");
bind(
"insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
" st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(2, 3, 2);
TEST_F(ParserInsertTest, multiTableMultiRowTest) {
useDb("root", "test");
bind(
"insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
" st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO "
"st1s1 VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou') "
"st1s2 VALUES (now, 10, '131028')(now+1s, 20, '132028')");
}
// INSERT INTO
// tb1_name USING st1_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
// tb2_name USING st2_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
TEST_F(InsertTest, autoCreateTableTest) {
setDatabase("root", "test");
bind(
"insert into st1s1 using st1 tags(1, 'wxy', now) "
"values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 3);
TEST_F(ParserInsertTest, autoCreateTableTest) {
useDb("root", "test");
bind(
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
run("INSERT INTO st1s1 USING st1 TAGS(1, 'wxy', now) "
"VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')");
bind(
"insert into st1s1 using st1 tags(1, 'wxy', now) "
"values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO st1s1 USING st1 (tag1, tag2) TAGS(1, 'wxy') (ts, c1, c2) "
"VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')");
bind(
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
run("INSERT INTO st1s1 (ts, c1, c2) USING st1 (tag1, tag2) TAGS(1, 'wxy') "
"VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')");
bind(
"insert into st1s1 using st1 tags(1, 'wxy', now) values (now, 1, \"beijing\")"
"st1s1 using st1 tags(1, 'wxy', now) values (now+1s, 2, \"shanghai\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
run("INSERT INTO "
"st1s1 USING st1 (tag1, tag2) TAGS(1, 'wxy') (ts, c1, c2) VALUES (now, 1, 'beijing') "
"st1s2 (ts, c1, c2) USING st1 TAGS(2, 'abc', now) VALUES (now+1s, 2, 'shanghai')");
}
TEST_F(InsertTest, toleranceTest) {
setDatabase("root", "test");
bind("insert into");
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
bind("insert into t");
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
bind("insert into");
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
bind("insert into t");
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
}
} // namespace ParserTest
......@@ -225,16 +225,17 @@ class ParserTestBaseImpl {
DO_WITH_THROW(collectMetaKey, pCxt, pQuery, pMetaCache);
}
void doBuildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
DO_WITH_THROW(buildCatalogReq, pMetaCache, pCatalogReq);
void doBuildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
DO_WITH_THROW(buildCatalogReq, pCxt, pMetaCache, pCatalogReq);
}
void doGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) {
DO_WITH_THROW(g_mockCatalogService->catalogGetAllMeta, pCatalogReq, pMetaData);
}
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache);
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
bool isInsertValues) {
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache, isInsertValues);
}
void doAuthenticate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
......@@ -261,7 +262,9 @@ class ParserTestBaseImpl {
void doParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq) {
DO_WITH_THROW(qParseSqlSyntax, pCxt, pQuery, pCatalogReq);
ASSERT_NE(*pQuery, nullptr);
res_.parsedAst_ = toString((*pQuery)->pRoot);
if (nullptr != (*pQuery)->pRoot) {
res_.parsedAst_ = toString((*pQuery)->pRoot);
}
}
void doAnalyseSqlSemantic(SParseContext* pCxt, const SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
......@@ -270,6 +273,17 @@ class ParserTestBaseImpl {
res_.calcConstAst_ = toString(pQuery->pRoot);
}
void doParseInsertSql(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(parseInsertSql, pCxt, pQuery, pMetaCache);
ASSERT_NE(*pQuery, nullptr);
res_.parsedAst_ = toString((*pQuery)->pRoot);
}
void doParseInsertSyntax(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(parseInsertSyntax, pCxt, pQuery, pMetaCache);
ASSERT_NE(*pQuery, nullptr);
}
string toString(const SNode* pRoot) {
char* pStr = NULL;
int32_t len = 0;
......@@ -287,15 +301,20 @@ class ParserTestBaseImpl {
SParseContext cxt = {0};
setParseContext(sql, &cxt);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParse(&cxt, query.get());
SQuery* pQuery = *(query.get());
if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParseInsertSql(&cxt, query.get(), nullptr);
} else {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParse(&cxt, query.get());
SQuery* pQuery = *(query.get());
doAuthenticate(&cxt, pQuery, nullptr);
doAuthenticate(&cxt, pQuery, nullptr);
doTranslate(&cxt, pQuery, nullptr);
doTranslate(&cxt, pQuery, nullptr);
doCalculateConstant(&cxt, pQuery);
doCalculateConstant(&cxt, pQuery);
}
if (g_dump) {
dump();
......@@ -338,17 +357,22 @@ class ParserTestBaseImpl {
setParseContext(sql, &cxt, true);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParse(&cxt, query.get());
SQuery* pQuery = *(query.get());
bool request = true;
bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request)));
doCollectMetaKey(&cxt, pQuery, metaCache.get());
bool isInsertValues = qIsInsertValuesSql(cxt.pSql, cxt.sqlLen);
if (isInsertValues) {
doParseInsertSyntax(&cxt, query.get(), metaCache.get());
} else {
doParse(&cxt, query.get());
doCollectMetaKey(&cxt, *(query.get()), metaCache.get());
}
SQuery* pQuery = *(query.get());
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq);
doBuildCatalogReq(metaCache.get(), catalogReq.get());
doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get());
string err;
thread t1([&]() {
......@@ -358,13 +382,17 @@ class ParserTestBaseImpl {
metaCache.reset(new SParseMetaCache());
request = false;
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues);
doAuthenticate(&cxt, pQuery, metaCache.get());
if (isInsertValues) {
doParseInsertSql(&cxt, query.get(), metaCache.get());
} else {
doAuthenticate(&cxt, pQuery, metaCache.get());
doTranslate(&cxt, pQuery, metaCache.get());
doTranslate(&cxt, pQuery, metaCache.get());
doCalculateConstant(&cxt, pQuery);
doCalculateConstant(&cxt, pQuery);
}
} catch (const TerminateFlag& e) {
// success and terminate
} catch (const runtime_error& e) {
......
......@@ -624,6 +624,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file"
//tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
#ifdef TAOS_ERROR_C
};
......
......@@ -1120,7 +1120,7 @@ class Database:
@classmethod
def setupLastTick(cls):
# start time will be auto generated , start at 10 years ago local time
local_time = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-16]
local_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-16]
local_epoch_time = [int(i) for i in local_time.split("-")]
#local_epoch_time will be such as : [2022, 7, 18]
......
......@@ -46,7 +46,7 @@ class Logging:
@classmethod
def _get_datetime(cls):
return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-1]
return datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-1]
@classmethod
def getLogger(cls):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册