提交 9b953707 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/main' into fix/mainto3.0

...@@ -35,11 +35,12 @@ meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0 ...@@ -35,11 +35,12 @@ meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0
:::note :::note
- All the data in `tag_set` will be converted to NCHAR type automatically . - All the data in `tag_set` will be converted to NCHAR type automatically
- Each data in `field_set` must be self-descriptive for its data type. For example 1.2f32 means a value 1.2 of float type. Without the "f" type suffix, it will be treated as type double. - Each data in `field_set` must be self-descriptive for its data type. For example 1.2f32 means a value 1.2 of float type. Without the "f" type suffix, it will be treated as type double
- Multiple kinds of precision can be used for the `timestamp` field. Time precision can be from nanosecond (ns) to hour (h). - Multiple kinds of precision can be used for the `timestamp` field. Time precision can be from nanosecond (ns) to hour (h)
- The child table name is created automatically in a rule to guarantee its uniqueness. But you can configure `smlChildTableName` in taos.cfg to specify a tag value as the table names if the tag value is unique globally. For example, if a tag is called `tname` and you set `smlChildTableName=tname` in taos.cfg, when you insert `st,tname=cpu1,t1=4 c1=3 1626006833639000000`, the child table `cpu1` will be created automatically. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored. - The child table name is created automatically in a rule to guarantee its uniqueness. But you can configure `smlChildTableName` in taos.cfg to specify a tag value as the table names if the tag value is unique globally. For example, if a tag is called `tname` and you set `smlChildTableName=tname` in taos.cfg, when you insert `st,tname=cpu1,t1=4 c1=3 1626006833639000000`, the child table `cpu1` will be created automatically. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored
- It is assumed that the order of field_set in a supertable is consistent, meaning that the first record contains all fields and subsequent records store fields in the same order. If the order is not consistent, set smlDataFormat in taos.cfg to false. Otherwise, data will be written out of order and a database error will occur.(smlDataFormat in taos.cfg default to false after version of 3.0.1.3, smlDataFormat is discarded since 3.0.3.0) - It is assumed that the order of field_set in a supertable is consistent, meaning that the first record contains all fields and subsequent records store fields in the same order. If the order is not consistent, set smlDataFormat in taos.cfg to false. Otherwise, data will be written out of order and a database error will occur.(smlDataFormat in taos.cfg default to false after version of 3.0.1.3, smlDataFormat is discarded since 3.0.3.0)
::: :::
For more details please refer to [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and [TDengine Schemaless](/reference/schemaless/#Schemaless-Line-Protocol) For more details please refer to [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and [TDengine Schemaless](/reference/schemaless/#Schemaless-Line-Protocol)
......
...@@ -34,6 +34,7 @@ meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3 ...@@ -34,6 +34,7 @@ meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3
``` ```
- The child table name is created automatically in a rule to guarantee its uniqueness. But you can configure `smlChildTableName` in taos.cfg to specify a tag value as the table names if the tag value is unique globally. For example, if a tag is called `tname` and you set `smlChildTableName=tname` in taos.cfg, when you insert `st,tname=cpu1,t1=4 c1=3 1626006833639000000`, the child table `cpu1` will be automatically created. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored. - The child table name is created automatically in a rule to guarantee its uniqueness. But you can configure `smlChildTableName` in taos.cfg to specify a tag value as the table names if the tag value is unique globally. For example, if a tag is called `tname` and you set `smlChildTableName=tname` in taos.cfg, when you insert `st,tname=cpu1,t1=4 c1=3 1626006833639000000`, the child table `cpu1` will be automatically created. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored.
Please refer to [OpenTSDB Telnet API](http://opentsdb.net/docs/build/html/api_telnet/put.html) for more details. Please refer to [OpenTSDB Telnet API](http://opentsdb.net/docs/build/html/api_telnet/put.html) for more details.
## Examples ## Examples
...@@ -68,11 +69,11 @@ Database changed. ...@@ -68,11 +69,11 @@ Database changed.
taos> show stables; taos> show stables;
name | name |
================================= =================================
meters.current | meters_current |
meters.voltage | meters_voltage |
Query OK, 2 row(s) in set (0.002544s) Query OK, 2 row(s) in set (0.002544s)
taos> select tbname, * from `meters.current`; taos> select tbname, * from `meters_current`;
tbname | _ts | _value | groupid | location | tbname | _ts | _value | groupid | location |
================================================================================================================================== ==================================================================================================================================
t_0e7bcfa21a02331c06764f275... | 2022-03-28 09:56:51.249 | 10.800000000 | 3 | California.LosAngeles | t_0e7bcfa21a02331c06764f275... | 2022-03-28 09:56:51.249 | 10.800000000 | 3 | California.LosAngeles |
...@@ -87,5 +88,5 @@ Query OK, 4 row(s) in set (0.005399s) ...@@ -87,5 +88,5 @@ Query OK, 4 row(s) in set (0.005399s)
If you want query the data of `location=California.LosAngeles groupid=3`, here is the query SQL: If you want query the data of `location=California.LosAngeles groupid=3`, here is the query SQL:
```sql ```sql
SELECT * FROM `meters.current` WHERE location = "California.LosAngeles" AND groupid = 3; SELECT * FROM `meters_current` WHERE location = "California.LosAngeles" AND groupid = 3;
``` ```
...@@ -49,6 +49,7 @@ Please refer to [OpenTSDB HTTP API](http://opentsdb.net/docs/build/html/api_http ...@@ -49,6 +49,7 @@ Please refer to [OpenTSDB HTTP API](http://opentsdb.net/docs/build/html/api_http
- In JSON protocol, strings will be converted to NCHAR type and numeric values will be converted to double type. - In JSON protocol, strings will be converted to NCHAR type and numeric values will be converted to double type.
- The child table name is created automatically in a rule to guarantee its uniqueness. But you can configure `smlChildTableName` in taos.cfg to specify a tag value as the table names if the tag value is unique globally. For example, if a tag is called `tname` and you set `smlChildTableName=tname` in taos.cfg, when you insert `st,tname=cpu1,t1=4 c1=3 1626006833639000000`, the child table `cpu1` will be automatically created. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored. - The child table name is created automatically in a rule to guarantee its uniqueness. But you can configure `smlChildTableName` in taos.cfg to specify a tag value as the table names if the tag value is unique globally. For example, if a tag is called `tname` and you set `smlChildTableName=tname` in taos.cfg, when you insert `st,tname=cpu1,t1=4 c1=3 1626006833639000000`, the child table `cpu1` will be automatically created. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored.
::: :::
## Examples ## Examples
...@@ -83,11 +84,11 @@ Database changed. ...@@ -83,11 +84,11 @@ Database changed.
taos> show stables; taos> show stables;
name | name |
================================= =================================
meters.current | meters_current |
meters.voltage | meters_voltage |
Query OK, 2 row(s) in set (0.001954s) Query OK, 2 row(s) in set (0.001954s)
taos> select * from `meters.current`; taos> select * from `meters_current`;
_ts | _value | groupid | location | _ts | _value | groupid | location |
=================================================================================================================== ===================================================================================================================
2022-03-28 09:56:51.249 | 10.300000000 | 2.000000000 | California.SanFrancisco | 2022-03-28 09:56:51.249 | 10.300000000 | 2.000000000 | California.SanFrancisco |
...@@ -100,5 +101,5 @@ Query OK, 2 row(s) in set (0.004076s) ...@@ -100,5 +101,5 @@ Query OK, 2 row(s) in set (0.004076s)
If you want query the data of "tags": {"location": "California.LosAngeles", "groupid": 1}, here is the query SQL: If you want query the data of "tags": {"location": "California.LosAngeles", "groupid": 1}, here is the query SQL:
```sql ```sql
SELECT * FROM `meters.current` WHERE location = "California.LosAngeles" AND groupid = 3; SELECT * FROM `meters_current` WHERE location = "California.LosAngeles" AND groupid = 3;
``` ```
...@@ -135,7 +135,7 @@ The following describes the basic API, synchronous API, asynchronous API, subscr ...@@ -135,7 +135,7 @@ The following describes the basic API, synchronous API, asynchronous API, subscr
The base API is used to do things like create database connections and provide a runtime environment for the execution of other APIs. The base API is used to do things like create database connections and provide a runtime environment for the execution of other APIs.
- `void taos_init()` - `int taos_init()`
Initializes the runtime environment. If the API is not actively called, the driver will automatically call the API when `taos_connect()` is called, so the program generally does not need to call it manually. Initializes the runtime environment. If the API is not actively called, the driver will automatically call the API when `taos_connect()` is called, so the program generally does not need to call it manually.
...@@ -454,6 +454,7 @@ In addition to writing data using the SQL method or the parameter binding API, w ...@@ -454,6 +454,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
- zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)` - zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)` - `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**Function description** **Function description**
- get the committed offset - get the committed offset
...@@ -467,9 +468,9 @@ In addition to writing data using the SQL method or the parameter binding API, w ...@@ -467,9 +468,9 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Function description** **Function description**
The commit interface is divided into two types, each with synchronous and asynchronous interfaces: - The commit interface is divided into two types, each with synchronous and asynchronous interfaces:
- The first type: based on message submission, submit the progress in the message. If the message passes NULL, submit the current progress of all vgroups consumed by the current consumer: tmq_commit_sync/tmq_commit_async - The first type: based on message submission, submit the progress in the message. If the message passes NULL, submit the current progress of all vgroups consumed by the current consumer: tmq_commit_sync/tmq_commit_async
- The second type: submit based on the offset of a Vgroup in a topic: tmq_commit_offset_sync/tmq_commit_offset_async - The second type: submit based on the offset of a Vgroup in a topic: tmq_commit_offset_sync/tmq_commit_offset_async
**Parameter description** **Parameter description**
- msg:Message consumed, If the message passes NULL, submit the current progress of all vgroups consumed by the current consumer - msg:Message consumed, If the message passes NULL, submit the current progress of all vgroups consumed by the current consumer
...@@ -513,4 +514,4 @@ In addition to writing data using the SQL method or the parameter binding API, w ...@@ -513,4 +514,4 @@ In addition to writing data using the SQL method or the parameter binding API, w
- topics: a list of topics subscribed by consumers,need to be freed by tmq_list_destroy - topics: a list of topics subscribed by consumers,need to be freed by tmq_list_destroy
**Return value** **Return value**
- zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)` - zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
\ No newline at end of file
...@@ -32,8 +32,8 @@ All data in tag_set is automatically converted to the NCHAR data type and does n ...@@ -32,8 +32,8 @@ All data in tag_set is automatically converted to the NCHAR data type and does n
In the schemaless writing data line protocol, each data item in the field_set needs to be described with its data type. Let's explain in detail: In the schemaless writing data line protocol, each data item in the field_set needs to be described with its data type. Let's explain in detail:
- If there are English double quotes on both sides, it indicates the BINARY(32) type. For example, `"abc"`. - If there are English double quotes on both sides, it indicates the VARCHAR(N) type. For example, `"abc"`.
- If there are double quotes on both sides and an L prefix, it means NCHAR(32) type. For example, `L"error message"`. - If there are double quotes on both sides and an L prefix, it means NCHAR(N) type. For example, `L"error message"`.
- Spaces, equals sign (=), comma (,), double quote ("), and backslash (\\) need to be escaped with a backslash (\\) in front. (All refer to the ASCII character). The rules are as follows: - Spaces, equals sign (=), comma (,), double quote ("), and backslash (\\) need to be escaped with a backslash (\\) in front. (All refer to the ASCII character). The rules are as follows:
| **Serial number** | **Element** | **Escape characters** | | **Serial number** | **Element** | **Escape characters** |
......
...@@ -34,11 +34,12 @@ meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0 ...@@ -34,11 +34,12 @@ meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0
:::note :::note
- tag_set 中的所有的数据自动转化为 NCHAR 数据类型; - tag_set 中的所有的数据自动转化为 NCHAR 数据类型
- field_set 中的每个数据项都需要对自身的数据类型进行描述, 比如 1.2f32 代表 FLOAT 类型的数值 1.2, 如果不带类型后缀会被当作 DOUBLE 处理; - field_set 中的每个数据项都需要对自身的数据类型进行描述, 比如 1.2f32 代表 FLOAT 类型的数值 1.2, 如果不带类型后缀会被当作 DOUBLE 处理
- timestamp 支持多种时间精度。写入数据的时候需要用参数指定时间精度,支持从小时到纳秒的 6 种时间精度 - timestamp 支持多种时间精度。写入数据的时候需要用参数指定时间精度,支持从小时到纳秒的 6 种时间精度
- 为了提高写入的效率,默认假设同一个超级表中 field_set 的顺序是一样的(第一条数据包含所有的 field,后面的数据按照这个顺序),如果顺序不一样,需要配置参数 smlDataFormat 为 false,否则,数据写入按照相同顺序写入,库中数据会异常。(3.0.1.3 之后的版本 smlDataFormat 默认为 false,从3.0.3.0开始,该配置废弃) [TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议) - 为了提高写入的效率,默认假设同一个超级表中 field_set 的顺序是一样的(第一条数据包含所有的 field,后面的数据按照这个顺序),如果顺序不一样,需要配置参数 smlDataFormat 为 false,否则,数据写入按照相同顺序写入,库中数据会异常。(3.0.1.3 之后的版本 smlDataFormat 默认为 false,从3.0.3.0开始,该配置废弃) [TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议)
- 默认产生的子表名是根据规则生成的唯一 ID 值。用户也可以通过在client端的 taos.cfg 里配置 smlChildTableName 参数来指定某个标签值作为子表名。该标签值应该具有全局唯一性。举例如下:假设有个标签名为tname, 配置 smlChildTableName=tname, 插入数据为 st,tname=cpu1,t1=4 c1=3 1626006833639000000 则创建的子表名为 cpu1。注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。[TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议) - 默认产生的子表名是根据规则生成的唯一 ID 值。用户也可以通过在client端的 taos.cfg 里配置 smlChildTableName 参数来指定某个标签值作为子表名。该标签值应该具有全局唯一性。举例如下:假设有个标签名为tname, 配置 smlChildTableName=tname, 插入数据为 st,tname=cpu1,t1=4 c1=3 1626006833639000000 则创建的子表名为 cpu1。注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。[TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议)
::: :::
要了解更多可参考:[InfluxDB Line 协议官方文档](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) 和 [TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议) 要了解更多可参考:[InfluxDB Line 协议官方文档](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) 和 [TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议)
......
...@@ -67,11 +67,11 @@ Database changed. ...@@ -67,11 +67,11 @@ Database changed.
taos> SHOW STABLES; taos> SHOW STABLES;
name | name |
================================= =================================
meters.current | meters_current |
meters.voltage | meters_voltage |
Query OK, 2 row(s) in set (0.002544s) Query OK, 2 row(s) in set (0.002544s)
taos> SELECT TBNAME, * FROM `meters.current`; taos> SELECT TBNAME, * FROM `meters_current`;
tbname | _ts | _value | groupid | location | tbname | _ts | _value | groupid | location |
================================================================================================================================== ==================================================================================================================================
t_0e7bcfa21a02331c06764f275... | 2022-03-28 09:56:51.249 | 10.800000000 | 3 | California.LosAngeles | t_0e7bcfa21a02331c06764f275... | 2022-03-28 09:56:51.249 | 10.800000000 | 3 | California.LosAngeles |
...@@ -83,10 +83,10 @@ Query OK, 4 row(s) in set (0.005399s) ...@@ -83,10 +83,10 @@ Query OK, 4 row(s) in set (0.005399s)
## SQL 查询示例 ## SQL 查询示例
`meters.current` 是插入数据的超级表名。 `meters_current` 是插入数据的超级表名。
可以通过超级表的 TAG 来过滤数据,比如查询 `location=California.LosAngeles groupid=3` 可以通过如下 SQL: 可以通过超级表的 TAG 来过滤数据,比如查询 `location=California.LosAngeles groupid=3` 可以通过如下 SQL:
```sql ```sql
SELECT * FROM `meters.current` WHERE location = "California.LosAngeles" AND groupid = 3; SELECT * FROM `meters_current` WHERE location = "California.LosAngeles" AND groupid = 3;
``` ```
...@@ -48,6 +48,7 @@ OpenTSDB JSON 格式协议采用一个 JSON 字符串表示一行或多行数据 ...@@ -48,6 +48,7 @@ OpenTSDB JSON 格式协议采用一个 JSON 字符串表示一行或多行数据
- 对于 JSON 格式协议,TDengine 并不会自动把所有标签转成 NCHAR 类型, 字符串将将转为 NCHAR 类型, 数值将同样转换为 DOUBLE 类型。 - 对于 JSON 格式协议,TDengine 并不会自动把所有标签转成 NCHAR 类型, 字符串将将转为 NCHAR 类型, 数值将同样转换为 DOUBLE 类型。
- 默认生成的子表名是根据规则生成的唯一 ID 值。用户也可以通过在client端的 taos.cfg 里配置 smlChildTableName 参数来指定某个标签值作为子表名。该标签值应该具有全局唯一性。举例如下:假设有个标签名为tname, 配置 smlChildTableName=tname, 插入数据为 `"tags": { "host": "web02","dc": "lga","tname":"cpu1"}` 则创建的子表名为 cpu1。注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。 - 默认生成的子表名是根据规则生成的唯一 ID 值。用户也可以通过在client端的 taos.cfg 里配置 smlChildTableName 参数来指定某个标签值作为子表名。该标签值应该具有全局唯一性。举例如下:假设有个标签名为tname, 配置 smlChildTableName=tname, 插入数据为 `"tags": { "host": "web02","dc": "lga","tname":"cpu1"}` 则创建的子表名为 cpu1。注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。
::: :::
## 示例代码 ## 示例代码
...@@ -82,8 +83,8 @@ Database changed. ...@@ -82,8 +83,8 @@ Database changed.
taos> SHOW STABLES; taos> SHOW STABLES;
name | name |
================================= =================================
meters.current | meters_current |
meters.voltage | meters_voltage |
Query OK, 2 row(s) in set (0.001954s) Query OK, 2 row(s) in set (0.001954s)
taos> SELECT * FROM `meters.current`; taos> SELECT * FROM `meters.current`;
...@@ -96,10 +97,10 @@ Query OK, 2 row(s) in set (0.004076s) ...@@ -96,10 +97,10 @@ Query OK, 2 row(s) in set (0.004076s)
## SQL 查询示例 ## SQL 查询示例
`meters.voltage` 是插入数据的超级表名。 `meters_voltage` 是插入数据的超级表名。
可以通过超级表的 TAG 来过滤数据,比如查询 `location=California.LosAngeles groupid=1` 可以通过如下 SQL: 可以通过超级表的 TAG 来过滤数据,比如查询 `location=California.LosAngeles groupid=1` 可以通过如下 SQL:
```sql ```sql
SELECT * FROM `meters.current` WHERE location = "California.LosAngeles" AND groupid = 3; SELECT * FROM `meters_current` WHERE location = "California.LosAngeles" AND groupid = 3;
``` ```
...@@ -223,7 +223,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) ...@@ -223,7 +223,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
基础 API 用于完成创建数据库连接等工作,为其它 API 的执行提供运行时环境。 基础 API 用于完成创建数据库连接等工作,为其它 API 的执行提供运行时环境。
- `void taos_init()` - `int taos_init()`
初始化运行环境。如果没有主动调用该 API,那么调用 `taos_connect()` 时驱动将自动调用该 API,故程序一般无需手动调用。 初始化运行环境。如果没有主动调用该 API,那么调用 `taos_connect()` 时驱动将自动调用该 API,故程序一般无需手动调用。
...@@ -542,6 +542,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多 ...@@ -542,6 +542,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。 - 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)` - `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**功能说明** **功能说明**
- 获取当前 consumer 在某个 topic 和 vgroup上的 commit 位置。 - 获取当前 consumer 在某个 topic 和 vgroup上的 commit 位置。
...@@ -555,9 +556,9 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多 ...@@ -555,9 +556,9 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**功能说明** **功能说明**
commit接口分为两种类型,每种类型有同步和异步接口: - commit接口分为两种类型,每种类型有同步和异步接口:
- 第一种类型:根据消息提交,提交消息里的进度,如果消息传NULL,提交当前consumer所有消费的vgroup的当前进度 : tmq_commit_sync/tmq_commit_async - 第一种类型:根据消息提交,提交消息里的进度,如果消息传NULL,提交当前consumer所有消费的vgroup的当前进度 : tmq_commit_sync/tmq_commit_async
- 第二种类型:根据某个topic的某个vgroup的offset提交 : tmq_commit_offset_sync/tmq_commit_offset_async - 第二种类型:根据某个topic的某个vgroup的offset提交 : tmq_commit_offset_sync/tmq_commit_offset_async
**参数说明** **参数说明**
- msg:消费到的消息结构,如果msg传NULL,提交当前consumer所有消费的vgroup的当前进度 - msg:消费到的消息结构,如果msg传NULL,提交当前consumer所有消费的vgroup的当前进度
...@@ -584,8 +585,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多 ...@@ -584,8 +585,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)` - `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
**功能说明** **功能说明**
- 获取 poll 消费到的数据的起始offset
获取 poll 消费到的数据的起始offset
**参数说明** **参数说明**
- msg:消费到的消息结构 - msg:消费到的消息结构
...@@ -596,10 +596,10 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多 ...@@ -596,10 +596,10 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)` - `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)`
**功能说明** **功能说明**
- 获取消费者订阅的 topic 列表
获取消费者订阅的 topic 列表
**参数说明** **参数说明**
- topics: 获取的 topic 列表存储在这个结构中,接口内分配内存,需调用tmq_list_destroy释放 - topics: 获取的 topic 列表存储在这个结构中,接口内分配内存,需调用tmq_list_destroy释放
**返回值** **返回值**
- 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
\ No newline at end of file
...@@ -33,8 +33,8 @@ tag_set 中的所有的数据自动转化为 nchar 数据类型,并不需要 ...@@ -33,8 +33,8 @@ tag_set 中的所有的数据自动转化为 nchar 数据类型,并不需要
在无模式写入数据行协议中,field_set 中的每个数据项都需要对自身的数据类型进行描述。具体来说: 在无模式写入数据行协议中,field_set 中的每个数据项都需要对自身的数据类型进行描述。具体来说:
- 如果两边有英文双引号,表示 BINARY(32) 类型。例如 `"abc"` - 如果两边有英文双引号,表示 VARCHAR(N) 类型。例如 `"abc"`
- 如果两边有英文双引号而且带有 L 前缀,表示 NCHAR(32) 类型。例如 `L"报错信息"` - 如果两边有英文双引号而且带有 L 前缀,表示 NCHAR(N) 类型。例如 `L"报错信息"`
- 对空格、等号(=)、逗号(,)、双引号(")、反斜杠(\),前面需要使用反斜杠(\)进行转义。(都指的是英文半角符号)。具体转义规则如下: - 对空格、等号(=)、逗号(,)、双引号(")、反斜杠(\),前面需要使用反斜杠(\)进行转义。(都指的是英文半角符号)。具体转义规则如下:
| **序号** | **域** | **需转义字符** | | **序号** | **域** | **需转义字符** |
......
...@@ -32,12 +32,12 @@ static void queryDB(TAOS *taos, char *command) { ...@@ -32,12 +32,12 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql); taos_free_result(pSql);
pSql = NULL; pSql = NULL;
} }
pSql = taos_query(taos, command); pSql = taos_query(taos, command);
code = taos_errno(pSql); code = taos_errno(pSql);
if (0 == code) { if (0 == code) {
break; break;
} }
} }
if (code != 0) { if (code != 0) {
...@@ -63,7 +63,7 @@ int main(int argc, char *argv[]) { ...@@ -63,7 +63,7 @@ int main(int argc, char *argv[]) {
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/); printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
exit(1); exit(1);
} }
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
...@@ -86,14 +86,14 @@ void Test(TAOS *taos, char *qstr, int index) { ...@@ -86,14 +86,14 @@ void Test(TAOS *taos, char *qstr, int index) {
for (i = 0; i < 10; ++i) { for (i = 0; i < 10; ++i) {
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello"); sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello");
printf("qstr: %s\n", qstr); printf("qstr: %s\n", qstr);
// note: how do you wanna do if taos_query returns non-NULL // note: how do you wanna do if taos_query returns non-NULL
// if (taos_query(taos, qstr)) { // if (taos_query(taos, qstr)) {
// printf("insert row: %i, reason:%s\n", i, taos_errstr(taos)); // printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
// } // }
TAOS_RES *result1 = taos_query(taos, qstr); TAOS_RES *result1 = taos_query(taos, qstr);
if (result1 == NULL || taos_errno(result1) != 0) { if (result1 == NULL || taos_errno(result1) != 0) {
printf("failed to insert row, reason:%s\n", taos_errstr(result1)); printf("failed to insert row, reason:%s\n", taos_errstr(result1));
taos_free_result(result1); taos_free_result(result1);
exit(1); exit(1);
} else { } else {
...@@ -107,7 +107,7 @@ void Test(TAOS *taos, char *qstr, int index) { ...@@ -107,7 +107,7 @@ void Test(TAOS *taos, char *qstr, int index) {
sprintf(qstr, "SELECT * FROM m1"); sprintf(qstr, "SELECT * FROM m1");
result = taos_query(taos, qstr); result = taos_query(taos, qstr);
if (result == NULL || taos_errno(result) != 0) { if (result == NULL || taos_errno(result) != 0) {
printf("failed to select, reason:%s\n", taos_errstr(result)); printf("failed to select, reason:%s\n", taos_errstr(result));
taos_free_result(result); taos_free_result(result);
exit(1); exit(1);
} }
......
...@@ -1610,9 +1610,11 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO ...@@ -1610,9 +1610,11 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO
return -1; return -1;
} }
col_id_t colId = pOld->pColumns[col].colId;
uint32_t nLen = 0; uint32_t nLen = 0;
for (int32_t i = 0; i < pOld->numOfColumns; ++i) { for (int32_t i = 0; i < pOld->numOfColumns; ++i) {
nLen += (pOld->pColumns[i].colId == col) ? pField->bytes : pOld->pColumns[i].bytes; nLen += (pOld->pColumns[i].colId == colId) ? pField->bytes : pOld->pColumns[i].bytes;
} }
if (nLen > TSDB_MAX_BYTES_PER_ROW) { if (nLen > TSDB_MAX_BYTES_PER_ROW) {
...@@ -1620,7 +1622,6 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO ...@@ -1620,7 +1622,6 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO
return -1; return -1;
} }
col_id_t colId = pOld->pColumns[col].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) { if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
return -1; return -1;
} }
......
...@@ -708,14 +708,21 @@ typedef struct { ...@@ -708,14 +708,21 @@ typedef struct {
TSDBROW row; TSDBROW row;
} SRowInfo; } SRowInfo;
typedef struct SSttBlockLoadCostInfo {
int64_t loadBlocks;
int64_t loadStatisBlocks;
double blockElapsedTime;
double statisElapsedTime;
} SSttBlockLoadCostInfo;
typedef struct SSttBlockLoadInfo { typedef struct SSttBlockLoadInfo {
SBlockData blockData[2]; SBlockData blockData[2]; // buffered block data
int32_t statisBlockIndex; // buffered statistics block index
void *statisBlock; // buffered statistics block data
void *pSttStatisBlkArray; void *pSttStatisBlkArray;
SArray *aSttBlk; SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex; int32_t currentLoadBlockIndex;
int32_t loadBlocks;
double elapsedTime;
STSchema *pSchema; STSchema *pSchema;
int16_t *colIds; int16_t *colIds;
int32_t numOfCols; int32_t numOfCols;
...@@ -723,12 +730,7 @@ typedef struct SSttBlockLoadInfo { ...@@ -723,12 +730,7 @@ typedef struct SSttBlockLoadInfo {
bool isLast; bool isLast;
bool sttBlockLoaded; bool sttBlockLoaded;
// keep the last access position, this position may be used to reduce the binary times for SSttBlockLoadCostInfo cost;
// starting last block data for a new table
struct {
int32_t blockIndex;
int32_t rowIndex;
} prevEndPos;
} SSttBlockLoadInfo; } SSttBlockLoadInfo;
typedef struct SMergeTree { typedef struct SMergeTree {
...@@ -831,9 +833,9 @@ void tMergeTreeClose(SMergeTree *pMTree); ...@@ -831,9 +833,9 @@ void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt); SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el); void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
// tsdbCache ============================================================================================== // tsdbCache ==============================================================================================
typedef enum { typedef enum {
......
...@@ -1760,10 +1760,7 @@ typedef struct { ...@@ -1760,10 +1760,7 @@ typedef struct {
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) { tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
int32_t code = 0; int32_t code = 0;
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, NULL);
int64_t loadBlocks = 0;
double elapse = 0;
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
SMergeTreeConf conf = { SMergeTreeConf conf = {
......
...@@ -125,10 +125,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf ...@@ -125,10 +125,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
pReader->pTableList = pTableIdList; pReader->pTableList = pTableIdList;
pReader->numOfTables = numOfTables; pReader->numOfTables = numOfTables;
pReader->lastTs = INT64_MIN; pReader->lastTs = INT64_MIN;
pReader->pLDataIterArray = destroySttBlockReader(pReader->pLDataIterArray, NULL);
int64_t blocks;
double elapse;
pReader->pLDataIterArray = destroySttBlockReader(pReader->pLDataIterArray, &blocks, &elapse);
pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -208,9 +205,7 @@ void* tsdbCacherowsReaderClose(void* pReader) { ...@@ -208,9 +205,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(p->pCurrSchema); taosMemoryFree(p->pCurrSchema);
if (p->pLDataIterArray) { if (p->pLDataIterArray) {
int64_t loadBlocks = 0; destroySttBlockReader(p->pLDataIterArray, NULL);
double elapse = 0;
destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse);
} }
if (p->pFileReader) { if (p->pFileReader) {
......
...@@ -530,7 +530,9 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) { ...@@ -530,7 +530,9 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
ASSERT(committer->tombIterMerger == NULL); ASSERT(committer->tombIterMerger == NULL);
TARRAY2_DESTROY(committer->dataIterArray, NULL); TARRAY2_DESTROY(committer->dataIterArray, NULL);
TARRAY2_DESTROY(committer->tombIterArray, NULL); TARRAY2_DESTROY(committer->tombIterArray, NULL);
TARRAY2_DESTROY(committer->sttReaderArray, NULL);
TARRAY2_DESTROY(committer->fopArray, NULL); TARRAY2_DESTROY(committer->fopArray, NULL);
TARRAY2_DESTROY(committer->sttReaderArray, NULL);
tsdbFSDestroyCopySnapshot(&committer->fsetArr); tsdbFSDestroyCopySnapshot(&committer->fsetArr);
_exit: _exit:
......
...@@ -86,7 +86,7 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l ...@@ -86,7 +86,7 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l
// create a file obj // create a file obj
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2); code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
if (code) return code; if (code) return code;
code = TARRAY2_APPEND(lvl2->fobjArr, fobj2); code = TARRAY2_INSERT_PTR(lvl2->fobjArr, i2, &fobj2);
if (code) return code; if (code) return code;
i1++; i1++;
i2++; i2++;
...@@ -112,7 +112,7 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l ...@@ -112,7 +112,7 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l
// create a file obj // create a file obj
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2); code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
if (code) return code; if (code) return code;
code = TARRAY2_APPEND(lvl2->fobjArr, fobj2); code = TARRAY2_INSERT_PTR(lvl2->fobjArr, i2, &fobj2);
if (code) return code; if (code) return code;
i1++; i1++;
i2++; i2++;
......
...@@ -91,41 +91,39 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { ...@@ -91,41 +91,39 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
taosArrayClear(pLoadInfo[i].aSttBlk); taosArrayClear(pLoadInfo[i].aSttBlk);
pLoadInfo[i].elapsedTime = 0; pLoadInfo[i].cost.loadBlocks = 0;
pLoadInfo[i].loadBlocks = 0; pLoadInfo[i].cost.blockElapsedTime = 0;
pLoadInfo[i].cost.statisElapsedTime = 0;
pLoadInfo[i].cost.loadStatisBlocks = 0;
pLoadInfo[i].statisBlockIndex = -1;
tStatisBlockDestroy(pLoadInfo[i].statisBlock);
pLoadInfo[i].sttBlockLoaded = false; pLoadInfo[i].sttBlockLoaded = false;
} }
} }
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) { void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) {
for (int32_t i = 0; i < 1; ++i) { for (int32_t i = 0; i < 1; ++i) {
*el += pLoadInfo[i].elapsedTime; pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime;
*blocks += pLoadInfo[i].loadBlocks; pLoadCost->loadBlocks += pLoadInfo[i].cost.loadBlocks;
pLoadCost->loadStatisBlocks += pLoadInfo[i].cost.loadStatisBlocks;
pLoadCost->statisElapsedTime += pLoadInfo[i].cost.statisElapsedTime;
} }
} }
static void freeTombBlock(void *param) {
STombBlock **pTombBlock = (STombBlock **)param;
tTombBlockDestroy(*pTombBlock);
taosMemoryFree(*pTombBlock);
}
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
return NULL; return NULL;
} }
for (int32_t i = 0; i < 1; ++i) { pLoadInfo->currentLoadBlockIndex = 1;
pLoadInfo[i].currentLoadBlockIndex = 1; pLoadInfo->blockIndex[0] = -1;
pLoadInfo[i].blockIndex[0] = -1; pLoadInfo->blockIndex[1] = -1;
pLoadInfo[i].blockIndex[1] = -1;
tBlockDataDestroy(&pLoadInfo[i].blockData[0]);
tBlockDataDestroy(&pLoadInfo[i].blockData[1]);
taosArrayDestroy(pLoadInfo[i].aSttBlk); tBlockDataDestroy(&pLoadInfo->blockData[0]);
} tBlockDataDestroy(&pLoadInfo->blockData[1]);
taosArrayDestroy(pLoadInfo->aSttBlk);
taosMemoryFree(pLoadInfo); taosMemoryFree(pLoadInfo);
return NULL; return NULL;
} }
...@@ -136,7 +134,7 @@ static void destroyLDataIter(SLDataIter *pIter) { ...@@ -136,7 +134,7 @@ static void destroyLDataIter(SLDataIter *pIter) {
taosMemoryFree(pIter); taosMemoryFree(pIter);
} }
void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el) { void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoadCost) {
if (pLDataIterArray == NULL) { if (pLDataIterArray == NULL) {
return NULL; return NULL;
} }
...@@ -146,8 +144,13 @@ void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el ...@@ -146,8 +144,13 @@ void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el
SArray *pList = taosArrayGetP(pLDataIterArray, i); SArray *pList = taosArrayGetP(pLDataIterArray, i);
for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) { for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
SLDataIter *pIter = taosArrayGetP(pList, j); SLDataIter *pIter = taosArrayGetP(pList, j);
*el += pIter->pBlockLoadInfo->elapsedTime; if (pLoadCost != NULL) {
*blocks += pIter->pBlockLoadInfo->loadBlocks; pLoadCost->loadBlocks += pIter->pBlockLoadInfo->cost.loadBlocks;
pLoadCost->loadStatisBlocks += pIter->pBlockLoadInfo->cost.loadStatisBlocks;
pLoadCost->blockElapsedTime += pIter->pBlockLoadInfo->cost.blockElapsedTime;
pLoadCost->statisElapsedTime += pIter->pBlockLoadInfo->cost.statisElapsedTime;
}
destroyLDataIter(pIter); destroyLDataIter(pIter);
} }
taosArrayDestroy(pList); taosArrayDestroy(pList);
...@@ -195,12 +198,12 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { ...@@ -195,12 +198,12 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
} }
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
pInfo->elapsedTime += el; pInfo->cost.blockElapsedTime += el;
pInfo->loadBlocks += 1; pInfo->cost.loadBlocks += 1;
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64 tsdbDebug("read last block, total load:%"PRId64", trigger by uid:%" PRIu64
", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s", ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow, pInfo->cost.loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
pBlock, el, idStr); pBlock, el, idStr);
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
...@@ -363,8 +366,9 @@ static int32_t suidComparFn(const void *target, const void *p2) { ...@@ -363,8 +366,9 @@ static int32_t suidComparFn(const void *target, const void *p2) {
} }
} }
static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid, static bool existsFromSttBlkStatis(SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, uint64_t uid,
SSttFileReader *pReader) { SSttFileReader *pReader) {
const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) { if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
return true; return true;
} }
...@@ -387,23 +391,40 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6 ...@@ -387,23 +391,40 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
return false; return false;
} }
STbStatisBlock block = {0}; // if (pBlockLoadInfo->statisBlock == NULL) {
tsdbSttFileReadStatisBlock(pReader, p, &block); // pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
//
int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ); // int64_t st = taosGetTimestampMs();
// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
// pBlockLoadInfo->statisBlockIndex = i;
//
// double el = (taosGetTimestampMs() - st) / 1000.0;
// pBlockLoadInfo->cost.loadStatisBlocks += 1;
// pBlockLoadInfo->cost.statisElapsedTime += el;
// } else if (pBlockLoadInfo->statisBlockIndex != i) {
// tStatisBlockDestroy(pBlockLoadInfo->statisBlock);
//
// int64_t st = taosGetTimestampMs();
// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
// pBlockLoadInfo->statisBlockIndex = i;
//
// double el = (taosGetTimestampMs() - st) / 1000.0;
// pBlockLoadInfo->cost.loadStatisBlocks += 1;
// pBlockLoadInfo->cost.statisElapsedTime += el;
// }
STbStatisBlock* pBlock = pBlockLoadInfo->statisBlock;
int32_t index = tarray2SearchIdx(pBlock->suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
if (index == -1) { if (index == -1) {
tStatisBlockDestroy(&block);
return false; return false;
} }
int32_t j = index; int32_t j = index;
if (block.uid->data[j] == uid) { if (pBlock->uid->data[j] == uid) {
tStatisBlockDestroy(&block);
return true; return true;
} else if (block.uid->data[j] > uid) { } else if (pBlock->uid->data[j] > uid) {
while (j >= 0 && block.suid->data[j] == suid) { while (j >= 0 && pBlock->suid->data[j] == suid) {
if (block.uid->data[j] == uid) { if (pBlock->uid->data[j] == uid) {
tStatisBlockDestroy(&block);
return true; return true;
} else { } else {
j -= 1; j -= 1;
...@@ -411,9 +432,8 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6 ...@@ -411,9 +432,8 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
} }
} else { } else {
j = index + 1; j = index + 1;
while (j < block.suid->size && block.suid->data[j] == suid) { while (j < pBlock->suid->size && pBlock->suid->data[j] == suid) {
if (block.uid->data[j] == uid) { if (pBlock->uid->data[j] == uid) {
tStatisBlockDestroy(&block);
return true; return true;
} else { } else {
j += 1; j += 1;
...@@ -421,14 +441,47 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6 ...@@ -421,14 +441,47 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
} }
} }
tStatisBlockDestroy(&block);
i += 1; i += 1;
} }
return false; return false;
} }
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward, static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
_load_tomb_fn loadTombFn, void *pReader1, const char *idStr) {
int64_t st = taosGetTimestampUs();
const TSttBlkArray *pSttBlkArray = NULL;
pBlockLoadInfo->sttBlockLoaded = true;
// load the stt block info for each stt-block
int32_t code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
return code;
}
code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
return code;
}
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
return code;
}
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
return code;
}
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange,
SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange, SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange,
_load_tomb_fn loadTombFn, void *pReader1) { _load_tomb_fn loadTombFn, void *pReader1) {
...@@ -444,6 +497,7 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader ...@@ -444,6 +497,7 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
pIter->pReader = pSttFileReader; pIter->pReader = pSttFileReader;
pIter->pBlockLoadInfo = pBlockLoadInfo; pIter->pBlockLoadInfo = pBlockLoadInfo;
// open stt file failed, ignore and continue
if (pIter->pReader == NULL) { if (pIter->pReader == NULL) {
tsdbError("stt file reader is null, %s", idStr); tsdbError("stt file reader is null, %s", idStr);
pIter->pSttBlk = NULL; pIter->pSttBlk = NULL;
...@@ -452,43 +506,18 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader ...@@ -452,43 +506,18 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
} }
if (!pBlockLoadInfo->sttBlockLoaded) { if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs(); code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, suid, loadTombFn, pReader1, idStr);
const TSttBlkArray *pSttBlkArray = NULL;
pBlockLoadInfo->sttBlockLoaded = true;
// load the stt block info for each stt-block
code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
return code;
}
code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
return code; return code;
} }
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
return code;
}
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
} }
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader); // bool exists = existsFromSttBlkStatis(pBlockLoadInfo, suid, uid, pIter->pReader);
// if (!exists) { // if (!exists) {
// pIter->iSttBlk = -1; // pIter->iSttBlk = -1;
// pIter->pSttBlk = NULL; // pIter->pSttBlk = NULL;
// return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
// } // }
// find the start block, actually we could load the position to avoid repeatly searching for the start position when // find the start block, actually we could load the position to avoid repeatly searching for the start position when
// the skey is updated. // the skey is updated.
......
...@@ -63,7 +63,7 @@ typedef struct STableBlockScanInfo { ...@@ -63,7 +63,7 @@ typedef struct STableBlockScanInfo {
SIterInfo iiter; // imem buffer skip list iterator SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index int32_t fileDelIndex; // file block delete index
int32_t lastBlockDelIndex; // delete index for last block int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not bool iterInit; // whether to initialize the in-memory skip list iterator or not
} STableBlockScanInfo; } STableBlockScanInfo;
...@@ -87,13 +87,13 @@ typedef struct SIOCostSummary { ...@@ -87,13 +87,13 @@ typedef struct SIOCostSummary {
double headFileLoadTime; double headFileLoadTime;
int64_t smaDataLoad; int64_t smaDataLoad;
double smaLoadTime; double smaLoadTime;
int64_t lastBlockLoad; int64_t sttStatisBlockLoad;
double lastBlockLoadTime; int64_t sttBlockLoad;
double sttBlockLoadTime;
int64_t composedBlocks; int64_t composedBlocks;
double buildComposedBlockTime; double buildComposedBlockTime;
double createScanInfoList; double createScanInfoList;
// double getTbFromMemTime; SSttBlockLoadCostInfo sttCost;
// double getTbFromIMemTime;
double initDelSkylineIterTime; double initDelSkylineIterTime;
} SIOCostSummary; } SIOCostSummary;
...@@ -586,8 +586,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo ...@@ -586,8 +586,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SIOCostSummary* pSum = &pReader->cost; SIOCostSummary* pCost = &pReader->cost;
getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime); getSttBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pCost->sttCost);
pIter->pLastBlockReader->uid = 0; pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
...@@ -1976,7 +1976,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc ...@@ -1976,7 +1976,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
pLastBlockReader->currentKey = key; pLastBlockReader->currentKey = key;
pScanInfo->lastKeyInStt = key; pScanInfo->lastKeyInStt = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order,
pVerRange)) { pVerRange)) {
return true; return true;
} }
...@@ -3018,7 +3018,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* ...@@ -3018,7 +3018,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
pBlockScanInfo->iter.index = index; pBlockScanInfo->iter.index = index;
pBlockScanInfo->iiter.index = index; pBlockScanInfo->iiter.index = index;
pBlockScanInfo->fileDelIndex = index; pBlockScanInfo->fileDelIndex = index;
pBlockScanInfo->lastBlockDelIndex = index; pBlockScanInfo->sttBlockDelIndex = index;
return code; return code;
...@@ -4029,7 +4029,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc ...@@ -4029,7 +4029,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
tsdbRowMergerAdd(pMerger, pRow1, NULL); tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else { } else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt, pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
idStr); idStr);
break; break;
} }
...@@ -4697,7 +4697,7 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -4697,7 +4697,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
tMergeTreeClose(&pLReader->mergeTree); tMergeTreeClose(&pLReader->mergeTree);
getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime); getSttBlockLoadInfo(pLReader->pInfo, &pCost->sttCost);
pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo); pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
taosMemoryFree(pLReader); taosMemoryFree(pLReader);
...@@ -4711,7 +4711,7 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -4711,7 +4711,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,initDelSkylineIterTime:%.2f " ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,initDelSkylineIterTime:%.2f "
"ms, %s", "ms, %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks, pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->sttBlockLoad, pCost->sttBlockLoadTime, pCost->composedBlocks,
pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
pCost->initDelSkylineIterTime, pReader->idStr); pCost->initDelSkylineIterTime, pReader->idStr);
......
...@@ -168,13 +168,11 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo ...@@ -168,13 +168,11 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCostSummary* pSum = &pReader->cost; SCostSummary* pCost = &pReader->cost;
pIter->pLastBlockReader->uid = 0; pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
pReader->status.pLDataIterArray =
destroySttBlockReader(pReader->status.pLDataIterArray, &pSum->lastBlockLoad, &pSum->lastBlockLoadTime);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
// check file the time range of coverage // check file the time range of coverage
...@@ -1389,7 +1387,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc ...@@ -1389,7 +1387,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
pLastBlockReader->currentKey = key; pLastBlockReader->currentKey = key;
pScanInfo->lastKeyInStt = key; pScanInfo->lastKeyInStt = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order,
pVerRange)) { pVerRange)) {
return true; return true;
} }
...@@ -2408,7 +2406,7 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) { ...@@ -2408,7 +2406,7 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost) { int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost) {
int32_t code = 0; int32_t code = 0;
int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pfileDelData); int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pFileDelData);
if (newDelDataInFile == 0 && if (newDelDataInFile == 0 &&
((pBlockScanInfo->delSkyline != NULL) || (TARRAY_SIZE(pBlockScanInfo->pMemDelData) == 0))) { ((pBlockScanInfo->delSkyline != NULL) || (TARRAY_SIZE(pBlockScanInfo->pMemDelData) == 0))) {
return code; return code;
...@@ -2422,7 +2420,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde ...@@ -2422,7 +2420,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde
pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY)); pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
} }
SArray* pSource = pBlockScanInfo->pfileDelData; SArray* pSource = pBlockScanInfo->pFileDelData;
if (pSource == NULL) { if (pSource == NULL) {
pSource = pBlockScanInfo->pMemDelData; pSource = pBlockScanInfo->pMemDelData;
} else { } else {
...@@ -2431,13 +2429,13 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde ...@@ -2431,13 +2429,13 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde
code = tsdbBuildDeleteSkyline(pSource, 0, taosArrayGetSize(pSource) - 1, pBlockScanInfo->delSkyline); code = tsdbBuildDeleteSkyline(pSource, 0, taosArrayGetSize(pSource) - 1, pBlockScanInfo->delSkyline);
taosArrayClear(pBlockScanInfo->pfileDelData); taosArrayClear(pBlockScanInfo->pFileDelData);
int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, order); int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, order);
pBlockScanInfo->iter.index = index; pBlockScanInfo->iter.index = index;
pBlockScanInfo->iiter.index = index; pBlockScanInfo->iiter.index = index;
pBlockScanInfo->fileDelIndex = index; pBlockScanInfo->fileDelIndex = index;
pBlockScanInfo->lastBlockDelIndex = index; pBlockScanInfo->sttBlockDelIndex = index;
double el = taosGetTimestampUs() - st; double el = taosGetTimestampUs() - st;
pCost->createSkylineIterTime = el / 1000.0; pCost->createSkylineIterTime = el / 1000.0;
...@@ -3411,7 +3409,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc ...@@ -3411,7 +3409,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
tsdbRowMergerAdd(pMerger, pRow1, NULL); tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else { } else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt, pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
idStr); idStr);
break; break;
} }
...@@ -4067,18 +4065,20 @@ void tsdbReaderClose2(STsdbReader* pReader) { ...@@ -4067,18 +4065,20 @@ void tsdbReaderClose2(STsdbReader* pReader) {
taosMemoryFree(pLReader); taosMemoryFree(pLReader);
} }
destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime); destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
taosMemoryFreeClear(pReader->status.uidList.tableUidList); taosMemoryFreeClear(pReader->status.uidList.tableUidList);
tsdbDebug( tsdbDebug(
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64 " SMA-time:%.2f ms, fileBlocks:%" PRId64
", fileBlocks-load-time:%.2f ms, " ", fileBlocks-load-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64 "build in-memory-block-time:%.2f ms, sttBlocks:%" PRId64 ", sttBlocks-time:%.2f ms, sttStatisBlock:%" PRId64
", stt-statis-Block-time:%.2f ms, composed-blocks:%" PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,createSkylineIterTime:%.2f " ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,createSkylineIterTime:%.2f "
"ms, initLastBlockReader:%.2fms, %s", "ms, initLastBlockReader:%.2fms, %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks, pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->sttCost.loadBlocks, pCost->sttCost.blockElapsedTime,
pCost->sttCost.loadStatisBlocks, pCost->sttCost.statisElapsedTime, pCost->composedBlocks,
pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
pCost->createSkylineIterTime, pCost->initLastBlockReader, pReader->idStr); pCost->createSkylineIterTime, pCost->initLastBlockReader, pReader->idStr);
...@@ -4092,9 +4092,8 @@ void tsdbReaderClose2(STsdbReader* pReader) { ...@@ -4092,9 +4092,8 @@ void tsdbReaderClose2(STsdbReader* pReader) {
} }
int32_t tsdbReaderSuspend2(STsdbReader* pReader) { int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
int32_t code = 0;
// save reader's base state & reset top state to be reconstructed from base state // save reader's base state & reset top state to be reconstructed from base state
int32_t code = 0;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableBlockScanInfo* pBlockScanInfo = NULL; STableBlockScanInfo* pBlockScanInfo = NULL;
...@@ -4110,9 +4109,9 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { ...@@ -4110,9 +4109,9 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
} }
tsdbDataFileReaderClose(&pReader->pFileReader); tsdbDataFileReaderClose(&pReader->pFileReader);
int64_t loadBlocks = 0;
double elapse = 0; SCostSummary* pCost = &pReader->cost;
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse); pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
// resetDataBlockScanInfo excluding lastKey // resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL; STableBlockScanInfo** p = NULL;
...@@ -4134,7 +4133,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { ...@@ -4134,7 +4133,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
} }
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->pfileDelData = taosArrayDestroy(pInfo->pfileDelData); pInfo->pFileDelData = taosArrayDestroy(pInfo->pFileDelData);
} }
} else { } else {
// resetDataBlockScanInfo excluding lastKey // resetDataBlockScanInfo excluding lastKey
...@@ -4209,7 +4208,6 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { ...@@ -4209,7 +4208,6 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
} }
tsdbReaderSuspend2(pReader); tsdbReaderSuspend2(pReader);
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
return code; return code;
...@@ -4222,8 +4220,7 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { ...@@ -4222,8 +4220,7 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
} }
int32_t tsdbReaderResume2(STsdbReader* pReader) { int32_t tsdbReaderResume2(STsdbReader* pReader) {
int32_t code = 0; int32_t code = 0;
STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter; STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
// restore reader's state // restore reader's state
...@@ -4290,7 +4287,6 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { ...@@ -4290,7 +4287,6 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
pBlock->info.rows = pReader->rowsNum; pBlock->info.rows = pReader->rowsNum;
pBlock->info.id.uid = 0; pBlock->info.id.uid = 0;
pBlock->info.dataLoad = 0; pBlock->info.dataLoad = 0;
pReader->rowsNum = 0; pReader->rowsNum = 0;
return pBlock->info.rows > 0; return pBlock->info.rows > 0;
......
...@@ -221,7 +221,7 @@ void clearBlockScanInfo(STableBlockScanInfo* p) { ...@@ -221,7 +221,7 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {
p->delSkyline = taosArrayDestroy(p->delSkyline); p->delSkyline = taosArrayDestroy(p->delSkyline);
p->pBlockList = taosArrayDestroy(p->pBlockList); p->pBlockList = taosArrayDestroy(p->pBlockList);
p->pMemDelData = taosArrayDestroy(p->pMemDelData); p->pMemDelData = taosArrayDestroy(p->pMemDelData);
p->pfileDelData = taosArrayDestroy(p->pfileDelData); p->pFileDelData = taosArrayDestroy(p->pFileDelData);
} }
void destroyAllBlockScanInfo(SSHashObj* pTableMap) { void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
...@@ -238,7 +238,7 @@ void destroyAllBlockScanInfo(SSHashObj* pTableMap) { ...@@ -238,7 +238,7 @@ void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) { static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
taosArrayClear(pScanInfo->pBlockList); taosArrayClear(pScanInfo->pBlockList);
taosArrayClear(pScanInfo->pfileDelData); // del data from each file set taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
} }
void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) { void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) {
...@@ -502,14 +502,14 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ ...@@ -502,14 +502,14 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
if (newTable) { if (newTable) {
(*pScanInfo) = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); (*pScanInfo) = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if ((*pScanInfo)->pfileDelData == NULL) { if ((*pScanInfo)->pFileDelData == NULL) {
(*pScanInfo)->pfileDelData = taosArrayInit(4, sizeof(SDelData)); (*pScanInfo)->pFileDelData = taosArrayInit(4, sizeof(SDelData));
} }
} }
if (record.version <= pReader->info.verRange.maxVer) { if (record.version <= pReader->info.verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush((*pScanInfo)->pfileDelData, &delData); taosArrayPush((*pScanInfo)->pFileDelData, &delData);
} }
} }
...@@ -556,8 +556,8 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs ...@@ -556,8 +556,8 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs
uint64_t uid = pReader->status.uidList.tableUidList[j]; uint64_t uid = pReader->status.uidList.tableUidList[j];
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pfileDelData == NULL) { if (pScanInfo->pFileDelData == NULL) {
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
} }
ETombBlkCheckEnum ret = 0; ETombBlkCheckEnum ret = 0;
......
...@@ -65,12 +65,12 @@ typedef struct STableBlockScanInfo { ...@@ -65,12 +65,12 @@ typedef struct STableBlockScanInfo {
TSKEY lastKeyInStt; // last accessed key in stt TSKEY lastKeyInStt; // last accessed key in stt
SArray* pBlockList; // block data index list, SArray<SBrinRecord> SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pMemDelData; // SArray<SDelData> SArray* pMemDelData; // SArray<SDelData>
SArray* pfileDelData; // SArray<SDelData> from each file set SArray* pFileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index int32_t fileDelIndex; // file block delete index
int32_t lastBlockDelIndex; // delete index for last block int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not bool iterInit; // whether to initialize the in-memory skip list iterator or not
} STableBlockScanInfo; } STableBlockScanInfo;
...@@ -88,8 +88,7 @@ typedef struct SCostSummary { ...@@ -88,8 +88,7 @@ typedef struct SCostSummary {
double headFileLoadTime; double headFileLoadTime;
int64_t smaDataLoad; int64_t smaDataLoad;
double smaLoadTime; double smaLoadTime;
int64_t lastBlockLoad; SSttBlockLoadCostInfo sttCost;
double lastBlockLoadTime;
int64_t composedBlocks; int64_t composedBlocks;
double buildComposedBlockTime; double buildComposedBlockTime;
double createScanInfoList; double createScanInfoList;
......
...@@ -551,8 +551,8 @@ struct STsdbSnapWriter { ...@@ -551,8 +551,8 @@ struct STsdbSnapWriter {
int32_t fid; int32_t fid;
STFileSet* fset; STFileSet* fset;
SDiskID did; SDiskID did;
bool hasData; bool hasData; // if have time series data
bool hasTomb; bool hasTomb; // if have tomb data
// reader // reader
SDataFileReader* dataReader; SDataFileReader* dataReader;
...@@ -630,6 +630,15 @@ static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) { ...@@ -630,6 +630,15 @@ static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
dataFileReaderConfig.files[ftype].exist = true; dataFileReaderConfig.files[ftype].exist = true;
dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0]; dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
STFileOp fileOp = {
.optype = TSDB_FOP_REMOVE,
.fid = writer->ctx->fset->fid,
.of = writer->ctx->fset->farr[ftype]->f[0],
};
code = TARRAY2_APPEND(writer->fopArr, fileOp);
TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader); code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader);
...@@ -653,6 +662,15 @@ static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) { ...@@ -653,6 +662,15 @@ static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader); code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
STFileOp fileOp = {
.optype = TSDB_FOP_REMOVE,
.fid = fobj->f->fid,
.of = fobj->f[0],
};
code = TARRAY2_APPEND(writer->fopArr, fileOp);
TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
} }
...@@ -862,6 +880,7 @@ static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) { ...@@ -862,6 +880,7 @@ static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// end timeseries data write
SRowInfo row = { SRowInfo row = {
.suid = INT64_MAX, .suid = INT64_MAX,
.uid = INT64_MAX, .uid = INT64_MAX,
...@@ -870,6 +889,7 @@ static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) { ...@@ -870,6 +889,7 @@ static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
code = tsdbSnapWriteTimeSeriesRow(writer, &row); code = tsdbSnapWriteTimeSeriesRow(writer, &row);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// end tombstone data write
STombRecord record = { STombRecord record = {
.suid = INT64_MAX, .suid = INT64_MAX,
.uid = INT64_MAX, .uid = INT64_MAX,
...@@ -1008,6 +1028,10 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1008,6 +1028,10 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// disable background tasks
tsdbFSDisableBgTask(pTsdb->pFS);
// start to write
writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
...@@ -1026,8 +1050,6 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1026,8 +1050,6 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr); code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tsdbFSDisableBgTask(pTsdb->pFS);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
......
...@@ -671,6 +671,12 @@ sql alter table tb2023 add column v nchar(16379); ...@@ -671,6 +671,12 @@ sql alter table tb2023 add column v nchar(16379);
sql_error alter table tb2023 modify column v nchar(16380); sql_error alter table tb2023 modify column v nchar(16380);
sql desc tb2023 sql desc tb2023
print =============== modify column for normal table
sql create table ntb_ts3841(ts timestamp, c0 varchar(64000));
sql alter table ntb_ts3841 modify column c0 varchar(64001);
sql create table ntb1_ts3841(ts timestamp, c0 nchar(15000));
sql alter table ntb1_ts3841 modify column c0 nchar(15001);
print =============== error for super table print =============== error for super table
sql create table stb2023(ts timestamp, f int) tags(t1 int); sql create table stb2023(ts timestamp, f int) tags(t1 int);
sql_error alter table stb2023 add column v varchar(65518); sql_error alter table stb2023 add column v varchar(65518);
...@@ -685,6 +691,20 @@ sql alter table stb2023 add column v nchar(16379); ...@@ -685,6 +691,20 @@ sql alter table stb2023 add column v nchar(16379);
sql_error alter table stb2023 modify column v nchar(16380); sql_error alter table stb2023 modify column v nchar(16380);
sql desc stb2023 sql desc stb2023
print =============== modify column/tag for super table
sql create table stb_ts3841(ts timestamp, c0 varchar(64000)) tags(t1 binary(16380));
sql alter table stb_ts3841 modify column c0 varchar(64001);
sql alter table stb_ts3841 modify tag t1 binary(16381);
sql alter table stb_ts3841 modify tag t1 binary(16382);
sql_error alter table stb_ts3841 modify tag t1 binary(16383);
sql create table stb1_ts3841(ts timestamp, c0 nchar(15000)) tags(t1 nchar(4093));
sql alter table stb1_ts3841 modify column c0 nchar(15001);
sql alter table stb1_ts3841 modify tag t1 nchar(4094);
sql alter table stb1_ts3841 modify tag t1 nchar(4095);
sql_error alter table stb1_ts3841 modify tag t1 nchar(4096);
sql_error alter table stb1_ts3841 modify tag t1 binary(16382);
print ======= over print ======= over
sql drop database d1 sql drop database d1
sql select * from information_schema.ins_databases sql select * from information_schema.ins_databases
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册