提交 393b36d8 编写于 作者: H Haojun Liao

Merge branch 'refact/fillhistory' of github.com:taosdata/tdengine into refact/fillhistory

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
- id: check-yaml
- id: check-json
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: stable
- id: black
- repo: https://github.com/pocc/pre-commit-hooks
rev: master
- id: cppcheck
args: ["--error-exitcode=0"]
- repo: https://github.com/crate-ci/typos
rev: v1.15.7
- id: typos
......@@ -2,7 +2,7 @@
......@@ -33,7 +33,7 @@ The below SQL statement is used to insert one row into table "d1001".
INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31);
`ts1` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detial, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
`ts1` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detail, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
### Insert Multiple Rows
......@@ -43,7 +43,7 @@ Multiple rows can be inserted in a single SQL statement. The example below inser
INSERT INTO d1001 VALUES (ts2, 10.2, 220, 0.23) (ts2, 10.3, 218, 0.25);
`ts1` and `ts2` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detial, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
`ts1` and `ts2` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detail, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
### Insert into Multiple Tables
......@@ -53,7 +53,7 @@ Data can be inserted into multiple tables in the same SQL statement. The example
INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31) (ts2, 12.6, 218, 0.33) d1002 VALUES (ts3, 12.3, 221, 0.31);
`ts1`, `ts2` and `ts3` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detial, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
`ts1`, `ts2` and `ts3` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detail, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
For more details about `INSERT` please refer to [INSERT](/taos-sql/insert).
......@@ -244,6 +244,8 @@ The following SQL statement creates a topic in TDengine:
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
- There is an upper limit to the number of topics created, controlled by the parameter tmqMaxTopicNum, with a default of 20
Multiple subscription types are supported.
#### Subscribe to a Column
......@@ -265,14 +267,15 @@ You can subscribe to a topic through a SELECT statement. Statements that specify
CREATE TOPIC topic_name AS STABLE stb_name
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
Creating a topic in this manner differs from a `SELECT * from stbName` statement as follows:
- The table schema can be modified.
- Unstructured data is returned. The format of the data returned changes based on the supertable schema.
- A different table schema may exist for every data block to be processed.
- The 'with meta' parameter is optional. When selected, statements such as creating super tables and sub tables will be returned, mainly used for Taosx to perform super table migration
- The 'where_condition' parameter is optional and will be used to filter and subscribe to sub tables that meet the criteria. Where conditions cannot have ordinary columns, only tags or tbnames. Functions can be used in where conditions to filter tags, but cannot be aggregate functions because sub table tag values cannot be aggregated. It can also be a constant expression, such as 2>1 (subscribing to all child tables), Or false (subscribe to 0 sub tables)
- The data returned does not include tags.
### Subscribe to a Database
......@@ -280,10 +283,12 @@ Creating a topic in this manner differs from a `SELECT * from stbName` statement
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
This SQL statement creates a subscription to all tables in the database. You can add the `WITH META` parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka.
This SQL statement creates a subscription to all tables in the database.
- The 'with meta' parameter is optional. When selected, it will return statements for creating all super tables and sub tables in the database, mainly used for Taosx database migration
## Create a Consumer
......@@ -295,7 +300,7 @@ You configure the following parameters when creating a consumer:
| `td.connect.user` | string | User Name | |
| `td.connect.pass` | string | Password | |
| `td.connect.port` | string | Port of the server side | |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups. |
| `client.id` | string | Client ID | Maximum length: 192. |
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
......@@ -17,7 +17,7 @@ When you create a user-defined function, you must implement standard interface f
- For aggregate functions, implement the `aggfn_start`, `aggfn`, and `aggfn_finish` interface functions.
- To initialize your function, implement the `udf_init` function. To terminate your function, implement the `udf_destroy` function.
There are strict naming conventions for these interface functions. The names of the start, finish, init, and destroy interfaces must be <udf-name\>_start, <udf-name\>_finish, <udf-name\>_init, and <udf-name\>_destroy, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function.
There are strict naming conventions for these interface functions. The names of the start, finish, init, and destroy interfaces must be `_start`, `_finish`, `_init`, and `_destroy`, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function.
### Implementing a Scalar Function in C
The implementation of a scalar function is described as follows:
......@@ -318,7 +318,7 @@ The implementation of a scalar UDF is described as follows:
def process(input: datablock) -> tuple[output_type]:
Description: this function prcesses datablock, which is the input; you can use datablock.data(row, col) to access the python object at location(row,col); the output is a tuple object consisted of objects of type outputtype
Description: this function processes datablock, which is the input; you can use datablock.data(row, col) to access the python object at location(row,col); the output is a tuple object consisted of objects of type outputtype
#### Aggregate UDF Interface
......@@ -356,7 +356,7 @@ def process(input: datablock) -> tuple[output_type]:
# return tuple object consisted of object of type outputtype
Note:process() must be implemeted, init() and destroy() must be defined too but they can do nothing.
Note:process() must be implemented, init() and destroy() must be defined too but they can do nothing.
#### Aggregate Template
......@@ -377,7 +377,7 @@ def finish(buf: bytes) -> output_type:
#return obj of type outputtype
Note: aggregate UDF requires init(), destroy(), start(), reduce() and finish() to be impemented. start() generates the initial result in buffer, then the input data is divided into multiple row data blocks, reduce() is invoked for each data block `inputs` and intermediate `buf`, finally finish() is invoked to generate final result from the intermediate result `buf`.
Note: aggregate UDF requires init(), destroy(), start(), reduce() and finish() to be implemented. start() generates the initial result in buffer, then the input data is divided into multiple row data blocks, reduce() is invoked for each data block `inputs` and intermediate `buf`, finally finish() is invoked to generate final result from the intermediate result `buf`.
### Data Mapping between TDengine SQL and Python UDF
......@@ -559,7 +559,7 @@ Note: Prior to TDengine (excluding), updating a UDF requires to restart
#### Sample 3: UDF with n arguments
A UDF which accepts n intergers, likee (x1, x2, ..., xn) and output the sum of the product of each value and its sequence number: 1 * x1 + 2 * x2 + ... + n * xn. If there is `null` in the input, then the result is `null`. The difference from sample 1 is that it can accept any number of columns as input and process each column. Assume the program is written in /root/udf/nsum.py:
A UDF which accepts n integers, likee (x1, x2, ..., xn) and output the sum of the product of each value and its sequence number: 1 * x1 + 2 * x2 + ... + n * xn. If there is `null` in the input, then the result is `null`. The difference from sample 1 is that it can accept any number of columns as input and process each column. Assume the program is written in /root/udf/nsum.py:
def init():
......@@ -607,7 +607,7 @@ Query OK, 4 row(s) in set (0.010653s)
#### Sample 4: Utilize 3rd party package
A UDF which accepts a timestamp and output the next closed Sunday. This sample requires to use third party package `moment`, you need to install it firslty.
A UDF which accepts a timestamp and output the next closed Sunday. This sample requires to use third party package `moment`, you need to install it firstly.
pip3 install moment
......@@ -701,7 +701,7 @@ Query OK, 4 row(s) in set (1.011474s)
#### Sample 5: Aggregate Function
An aggregate function which calculates the difference of the maximum and the minimum in a column. An aggregate funnction takes multiple rows as input and output only one data. The execution process of an aggregate UDF is like map-reduce, the framework divides the input into multiple parts, each mapper processes one block and the reducer aggregates the result of the mappers. The reduce() of Python UDF has the functionality of both map() and reduce(). The reduce() takes two arguments: the data to be processed; and the result of other tasks executing reduce(). For exmaple, assume the code is in `/root/udf/myspread.py`.
An aggregate function which calculates the difference of the maximum and the minimum in a column. An aggregate funnction takes multiple rows as input and output only one data. The execution process of an aggregate UDF is like map-reduce, the framework divides the input into multiple parts, each mapper processes one block and the reducer aggregates the result of the mappers. The reduce() of Python UDF has the functionality of both map() and reduce(). The reduce() takes two arguments: the data to be processed; and the result of other tasks executing reduce(). For example, assume the code is in `/root/udf/myspread.py`.
import io
......@@ -755,7 +755,7 @@ In this example, we implemented an aggregate function, and added some logging.
2. log() is the function for logging, it converts the input object to string and output with an end of line
3. destroy() closes the log file \
4. start() returns the initial buffer for storing the intermediate result
5. reduce() processes each daa block and aggregates the result
5. reduce() processes each data block and aggregates the result
6. finish() converts the final buffer() to final result\
Create the UDF.
......@@ -672,7 +672,7 @@ If you input a specific column, the number of non-null values in the column is r
ELAPSED(ts_primary_key [, time_unit])
**Description**: `elapsed` function can be used to calculate the continuous time length in which there is valid data. If it's used with `INTERVAL` clause, the returned result is the calculated time length within each time window. If it's used without `INTERVAL` caluse, the returned result is the calculated time length within the specified time range. Please be noted that the return value of `elapsed` is the number of `time_unit` in the calculated time length.
**Description**: `elapsed` function can be used to calculate the continuous time length in which there is valid data. If it's used with `INTERVAL` clause, the returned result is the calculated time length within each time window. If it's used without `INTERVAL` clause, the returned result is the calculated time length within the specified time range. Please be noted that the return value of `elapsed` is the number of `time_unit` in the calculated time length.
**Return value type**: Double if the input value is not NULL;
......@@ -999,18 +999,14 @@ SAMPLE(expr, k)
**Description**: _k_ sampling values of a specific column. The applicable range of _k_ is [1,1000].
**Return value type**: Same as the column being operated plus the associated timestamp
**Return value type**: Same as the column being operated
**Applicable data types**: Any data type except for tags of STable
**Applicable data types**: Any data type
**Applicable nested query**: Inner query and Outer query
**Applicable table types**: standard tables and supertables
**More explanations**:
- This function cannot be used in expression calculation.
### TAIL
......@@ -1055,11 +1051,11 @@ TOP(expr, k)
**Description**: The values that occur the first time in the specified column. The effect is similar to `distinct` keyword, but it can also be used to match tags or timestamp. The first occurrence of a timestamp or tag is used.
**Description**: The values that occur the first time in the specified column. The effect is similar to `distinct` keyword.
**Return value type**:Same as the data type of the column being operated upon
**Applicable column types**: Any data types except for timestamp
**Applicable column types**: Any data types
**Applicable table types**: table, STable
......@@ -21,7 +21,7 @@ part_list can be any scalar expression, such as a column, constant, scalar funct
A PARTITION BY clause is processed as follows:
- The PARTITION BY clause must occur after the WHERE clause
- The PARTITION BY caluse partitions the data according to the specified dimensions, then perform computation on each partition. The performed computation is determined by the rest of the statement - a window clause, GROUP BY clause, or SELECT clause.
- The PARTITION BY clause partitions the data according to the specified dimensions, then perform computation on each partition. The performed computation is determined by the rest of the statement - a window clause, GROUP BY clause, or SELECT clause.
- The PARTITION BY clause can be used together with a window clause or GROUP BY clause. In this case, the window or GROUP BY clause takes effect on every partition. For example, the following statement partitions the table by the location tag, performs downsampling over a 10 minute window, and returns the maximum value:
......@@ -36,7 +36,7 @@ Shows information about connections to the system.
Shows information about all active consumers in the system.
Shows information about all consumers in the system.
......@@ -36,7 +36,8 @@ REST connection supports all platforms that can run Java.
| taos-jdbcdriver version | major changes | TDengine version |
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
| 3.2.1 | subscription add seek function | or later |
| 3.2.3 | Fixed resultSet data parsing failure in some cases | or later |
| 3.2.2 | subscription add seek function | or later |
| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | or later |
| 3.2.0 | This version has been deprecated | - |
| 3.1.0 | JDBC REST connection supports subscription over WebSocket | - |
......@@ -284,9 +285,9 @@ The configuration parameters in the URL are as follows:
- batchfetch: true: pulls result sets in batches when executing queries; false: pulls result sets row by row. The default value is: false. batchfetch uses HTTP for data transfer. JDBC REST supports batch pulls. taos-jdbcdriver and TDengine transfer data via WebSocket connection. Compared with HTTP, WebSocket enables JDBC REST connection to support large data volume querying and improve query performance.
- charset: specify the charset to parse the string, this parameter is valid only when set batchfetch to true.
- batchErrorIgnore: true: when executing executeBatch of Statement, if one SQL execution fails in the middle, continue to execute the following SQL. false: no longer execute any statement after the failed SQL. The default value is: false.
- httpConnectTimeout: REST connection timeout in milliseconds, the default value is 5000 ms.
- httpSocketTimeout: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when batchfetch is false.
- messageWaitTimeout: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when batchfetch is true.
- httpConnectTimeout: REST connection timeout in milliseconds, the default value is 60000 ms.
- httpSocketTimeout: socket timeout in milliseconds, the default value is 60000 ms. It only takes effect when batchfetch is false.
- messageWaitTimeout: message transmission timeout in milliseconds, the default value is 60000 ms. It only takes effect when batchfetch is true.
- useSSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection.
- httpPoolSize: size of REST concurrent requests. The default value is 20.
......@@ -352,9 +353,9 @@ The configuration parameters in properties are as follows.
- TSDBDriver.PROPERTY_KEY_CHARSET: In the character set used by the client, the default value is the system character set.
- TSDBDriver.PROPERTY_KEY_LOCALE: this only takes effect when using JDBC native connection. Client language environment, the default value is system current locale.
- TSDBDriver.PROPERTY_KEY_TIME_ZONE: only takes effect when using JDBC native connection. In the time zone used by the client, the default value is the system's current time zone.
- TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection and batchfetch is false.
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when using JDBC REST connection and batchfetch is true.
- TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is false.
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is true.
- TSDBDriver.PROPERTY_KEY_USE_SSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_POOL_SIZE: size of REST concurrent requests. The default value is 20.
For JDBC native connections, you can specify other parameters, such as log level, SQL length, etc., by specifying URL and Properties. For more detailed configuration, please refer to [Client Configuration](/reference/config/#Client-Only).
......@@ -31,21 +31,57 @@ Websocket connections are supported on all platforms that can run Go.
| connector-rust version | TDengine version | major features |
| :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.10 | or later | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.12 | or later | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.0 | | Support schemaless insert. |
| v0.7.6 | | Support req_id in query. |
| v0.6.0 | | Base features. |
The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.
## Installation
## Handling exceptions
After the error is reported, the specific information of the error can be obtained:
match conn.exec(sql) {
Ok(_) => {
Err(e) => {
eprintln!("ERROR: {:?}", e);
## TDengine DataType vs. Rust DataType
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Rust is as follows:
| TDengine DataType | Rust DataType |
| ----------------- | ----------------- |
| TIMESTAMP | Timestamp |
| INT | i32 |
| BIGINT | i64 |
| FLOAT | f32 |
| DOUBLE | f64 |
| SMALLINT | i16 |
| TINYINT | i8 |
| BOOL | bool |
| BINARY | Vec<u8\> |
| NCHAR | String |
| JSON | serde_json::Value |
Note: Only TAG supports JSON types
## Installation Steps
### Pre-installation preparation
* Install the Rust development toolchain
* If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver)
### Add taos dependency
### Install the connectors
Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows:
......@@ -146,7 +182,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
// use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
After the connection is established, you can perform operations on your database.
......@@ -228,41 +265,191 @@ There are two ways to query data: Using built-in types or the [serde](https://se
## Usage examples
### Write data
### Create database and tables
use taos::*;
#### SQL Write
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "query";
// create database
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
// create table
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
> The query is consistent with operating a relational database. When using subscripts to get the contents of the returned fields, you have to start from 1. However, we recommend using the field names to get the values of the fields in the result set.
### Insert data
<RustInsert />
#### STMT Write
### Query data
<RustQuery />
### execute SQL with req_id
This req_id can be used to request link tracing.
let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?;
### Writing data via parameter binding
TDengine has significantly improved the bind APIs to support data writing (INSERT) scenarios. Writing data in this way avoids the resource consumption of SQL syntax parsing, resulting in significant write performance improvements in many cases.
Parameter binding details see [API Reference](#stmt-api)
<RustBind />
#### Schemaless Write
### Schemaless Writing
TDengine supports schemaless writing. It is compatible with InfluxDB's Line Protocol, OpenTSDB's telnet line protocol, and OpenTSDB's JSON format protocol. For more information, see [Schemaless Writing](../../schemaless).
<RustSml />
### Query data
### Schemaless with req_id
<RustQuery />
This req_id can be used to request link tracing.
## API Reference
let sml_data = SmlDataBuilder::default()
### Data Subscription
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
#### Create a Topic
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
#### Create a Consumer
You create a TMQ connector by using a DSN.
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
Create a consumer:
let mut consumer = tmq.build()?;
#### Subscribe to consume data
A single consumer can subscribe to one or more topics.
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
"** table: {}, got {} records: {:#?}\n",
Get assignments:
Version requirements connector-rust >= v0.8.8, TDengine >=
let assignments = consumer.assignments().await.unwrap();
### Connector Constructor
#### Assignment subscription Offset
You create a connector constructor by using a DSN.
Seek offset:
Version requirements connector-rust >= v0.8.8, TDengine >=
let cfg = TaosBuilder::default().build()?;
consumer.offset_seek(topic, vgroup_id, offset).await;
You use the builder object to create multiple connections.
#### Close subscriptions
let conn: Taos = cfg.build();
### Connection pooling
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
#### Full Sample Code
For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
### Use with connection pool
In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2].
......@@ -292,7 +479,17 @@ In the application code, use `pool.get()? ` to get a connection object [Taos].
let taos = pool.get()?;
### Connectors
### More sample programs
The source code of the sample application is under `TDengine/examples/rust` :
[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust)
## Frequently Asked Questions
For additional troubleshooting, see [FAQ](../../../train-faq/faq).
## API Reference
The [Taos][struct.Taos] object provides an API to perform operations on multiple databases.
......@@ -378,9 +575,13 @@ Note that Rust asynchronous functions and an asynchronous runtime are required.
- `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement.
- `.use_database(database: &str)`: Executes the `USE` statement.
In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage.
In addition, this structure is also the entry point for Parameter Binding and Line Protocol Interface. Please refer to the specific API descriptions for usage.
### Bind Interface
<a id="stmt-api" style={{color:'#141414'}}>
Bind Interface
Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement.
......@@ -391,7 +592,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
The bind object provides a set of interfaces for implementing parameter binding.
#### `.set_tbname(name)`
To bind table names.
......@@ -400,7 +601,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
#### `.set_tags(&[tag])`
Bind sub-table table names and tag values when the SQL statement uses a super table.
......@@ -410,7 +611,7 @@ stmt.set_tbname("d0")?;
#### `.bind(&[column])`
Bind value types. Use the [ColumnView] structure to create and bind the required types.
......@@ -434,7 +635,7 @@ let params = vec![
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
#### `.execute()`
Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`.
......@@ -449,92 +650,6 @@ stmt.execute()?;
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
### Subscriptions
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
You create a TMQ connector by using a DSN.
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
Create a consumer:
let mut consumer = tmq.build()?;
A single consumer can subscribe to one or more topics.
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
"** table: {}, got {} records: {:#?}\n",
Get assignments:
Version requirements connector-rust >= v0.8.8, TDengine >=
let assignments = consumer.assignments().await.unwrap();
Seek offset:
Version requirements connector-rust >= v0.8.8, TDengine >=
consumer.offset_seek(topic, vgroup_id, offset).await;
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos).
......@@ -20,10 +20,25 @@ The source code for the Python connector is hosted on [GitHub](https://github.co
- The [supported platforms](/reference/connector/#supported-platforms) for the native connection are the same as the ones supported by the TDengine client.
- REST connections are supported on all platforms that can run Python.
### Supported features
- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing.
- REST connections support features such as connection management and SQL execution. (SQL execution allows you to: manage databases, tables, and supertables, write data, query data, create continuous queries, etc.).
## Version selection
We recommend using the latest version of `taospy`, regardless of the version of TDengine.
|Python Connector Version|major changes|
|2.7.9|support for getting assignment and seek function on subscription|
|2.7.8|add `execute_many` method|
|Python Websocket Connector Version|major changes|
|0.2.5|1. support for getting assignment and seek function on subscription <br/> 2. support schemaless <br/> 3. support STMT|
|0.2.4|support `unsubscribe` on subscription|
## Handling Exceptions
There are 4 types of exception in python connector.
......@@ -54,10 +69,23 @@ All exceptions from the Python Connector are thrown directly. Applications shoul
{{#include docs/examples/python/handle_exception.py}}
## Supported features
## TDengine DataType vs. Python DataType
- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing.
- REST connections support features such as connection management and SQL execution. (SQL execution allows you to: manage databases, tables, and supertables, write data, query data, create continuous queries, etc.).
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Python is as follows:
|TDengine DataType|Python DataType|
## Installation
......@@ -534,7 +562,7 @@ Connector support data subscription. For more information about subscroption, pl
The `consumer` in the connector contains the subscription api.
#### Create Consumer
##### Create Consumer
The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/).
......@@ -544,7 +572,7 @@ from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": ""})
#### Subscribe topics
##### Subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
......@@ -552,7 +580,7 @@ The `subscribe` function is used to subscribe to a list of topics.
consumer.subscribe(['topic1', 'topic2'])
#### Consume
##### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
......@@ -570,7 +598,7 @@ while True:
#### assignment
##### assignment
The `assignment` function is used to get the assignment of the topic.
......@@ -578,7 +606,7 @@ The `assignment` function is used to get the assignment of the topic.
assignments = consumer.assignment()
#### Seek
##### Seek
The `seek` function is used to reset the assignment of the topic.
......@@ -587,7 +615,7 @@ tp = TopicPartition(topic='topic1', partition=0, offset=0)
#### After consuming data
##### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
......@@ -596,13 +624,13 @@ consumer.unsubscribe()
#### Tmq subscription example
##### Tmq subscription example
{{#include docs/examples/python/tmq_example.py}}
#### assignment and seek example
##### assignment and seek example
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
......@@ -614,7 +642,7 @@ consumer.close()
In addition to native connections, the connector also supports subscriptions via websockets.
#### Create Consumer
##### Create Consumer
The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer).
......@@ -624,7 +652,7 @@ import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
#### subscribe topics
##### subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
......@@ -632,7 +660,7 @@ The `subscribe` function is used to subscribe to a list of topics.
consumer.subscribe(['topic1', 'topic2'])
#### Consume
##### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
......@@ -649,7 +677,7 @@ while True:
#### assignment
##### assignment
The `assignment` function is used to get the assignment of the topic.
......@@ -657,7 +685,7 @@ The `assignment` function is used to get the assignment of the topic.
assignments = consumer.assignment()
#### Seek
##### Seek
The `seek` function is used to reset the assignment of the topic.
......@@ -665,7 +693,7 @@ The `seek` function is used to reset the assignment of the topic.
consumer.seek(topic='topic1', partition=0, offset=0)
#### After consuming data
##### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
......@@ -674,13 +702,13 @@ consumer.unsubscribe()
#### Subscription example
##### Subscription example
{{#include docs/examples/python/tmq_websocket_example.py}}
#### Assignment and seek example
##### Assignment and seek example
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
......@@ -696,19 +724,19 @@ Connector support schemaless insert.
<Tabs defaultValue="list">
<TabItem value="list" label="List Insert">
Simple insert
##### Simple insert
{{#include docs/examples/python/schemaless_insert.py}}
Insert with ttl argument
##### Insert with ttl argument
{{#include docs/examples/python/schemaless_insert_ttl.py}}
Insert with req_id argument
##### Insert with req_id argument
{{#include docs/examples/python/schemaless_insert_req_id.py}}
......@@ -718,19 +746,19 @@ Insert with req_id argument
<TabItem value="raw" label="Raw Insert">
Simple insert
##### Simple insert
{{#include docs/examples/python/schemaless_insert_raw.py}}
Insert with ttl argument
##### Insert with ttl argument
{{#include docs/examples/python/schemaless_insert_raw_ttl.py}}
Insert with req_id argument
##### Insert with req_id argument
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
......@@ -746,7 +774,7 @@ The Python connector provides a parameter binding api for inserting data. Simila
<TabItem value="native" label="native connection">
#### Create Stmt
##### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
......@@ -757,7 +785,7 @@ conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
#### parameter binding
##### parameter binding
Call the `new_multi_binds` function to create the parameter list for parameter bindings.
......@@ -787,7 +815,7 @@ Call the `bind_param` (for a single row) method or the `bind_param_batch` (for m
#### execute sql
##### execute sql
Call `execute` method to execute sql.
......@@ -795,13 +823,13 @@ Call `execute` method to execute sql.
#### Close Stmt
##### Close Stmt
#### Example
##### Example
{{#include docs/examples/python/stmt_example.py}}
......@@ -810,7 +838,7 @@ stmt.close()
<TabItem value="websocket" label="WebSocket connection">
#### Create Stmt
##### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
......@@ -821,7 +849,7 @@ conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
#### Prepare sql
##### Prepare sql
Call `prepare` method in stmt to prepare sql.
......@@ -829,7 +857,7 @@ Call `prepare` method in stmt to prepare sql.
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
#### parameter binding
##### parameter binding
Call the `bind_param` method to bind parameters.
......@@ -848,7 +876,7 @@ Call the `add_batch` method to add parameters to the batch.
#### execute sql
##### execute sql
Call `execute` method to execute sql.
......@@ -856,13 +884,13 @@ Call `execute` method to execute sql.
#### Close Stmt
##### Close Stmt
#### Example
##### Example
{{#include docs/examples/python/stmt_websocket_example.py}}
......@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w
import Release from "/components/ReleaseV3";
<Release type="tdengine" version="" />
<Release type="tdengine" version="" />
......@@ -243,6 +243,7 @@ TDengine 使用 SQL 创建一个 topic:
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
- topic创建个数有上限,通过参数 tmqMaxTopicNum 控制,默认 20 个
TMQ 支持多种订阅类型:
......@@ -265,14 +266,15 @@ CREATE TOPIC topic_name as subquery
CREATE TOPIC topic_name AS STABLE stb_name
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
与 `SELECT * from stbName` 订阅的区别是:
- 不会限制用户的表结构变更。
- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
- 用户对于要处理的每一个数据块都可能有不同的表结构。
- with meta 参数可选,选择时将返回创建超级表,子表等语句,主要用于taosx做超级表迁移
- where_condition 参数可选,选择时将用来过滤符合条件的子表,订阅这些子表。where 条件里不能有普通列,只能是tag或tbname,where条件里可以用函数,用来过滤tag,但是不能是聚合函数,因为子表tag值无法做聚合。也可以是常量表达式,比如 2 > 1(订阅全部子表),或者 false(订阅0个子表)
- 返回数据不包含标签。
### 数据库订阅
......@@ -280,11 +282,13 @@ CREATE TOPIC topic_name AS STABLE stb_name
CREATE TOPIC topic_name AS DATABASE db_name;
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
- with meta 参数可选,选择时将返回创建数据库里所有超级表,子表的语句,主要用于taosx做数据库迁移
## 创建消费者 *consumer*
......@@ -295,7 +299,7 @@ CREATE TOPIC topic_name AS DATABASE db_name;
| `td.connect.user` | string | 用户名 | |
| `td.connect.pass` | string | 密码 | |
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | <br />**必填项**。最大长度:192。<br />每个topic最多可建立100个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度:192。 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default;从头开始订阅; <br/>`latest`: 仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
......@@ -17,7 +17,7 @@ TDengine 支持通过 C/Python 语言进行 UDF 定义。接下来结合示例
- 聚合函数需要实现聚合接口函数 aggfn_start , aggfn , aggfn_finish。
- 如果需要初始化,实现 udf_init;如果需要清理工作,实现udf_destroy。
接口函数的名称是 UDF 名称,或者是 UDF 名称和特定后缀(_start, _finish, _init, _destroy)的连接。列表中的scalarfn,aggfn, udf需要替换成udf函数名。
接口函数的名称是 UDF 名称,或者是 UDF 名称和特定后缀(`_start`, `_finish`, `_init`, `_destroy`)的连接。列表中的scalarfn,aggfn, udf需要替换成udf函数名。
### 用 C 语言实现标量函数
......@@ -36,14 +36,15 @@ REST 连接支持所有能运行 Java 的平台。
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
| 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - |
| 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 及更高版本 |
| 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 及更高版本 |
| 3.2.0 | 存在连接问题,不推荐使用 | - |
| 3.1.0 | WebSocket 连接支持订阅功能 | - |
| 3.0.1 - 3.0.4 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用其他版本 | - |
| 3.0.0 | 支持 TDengine 3.0 | 及更高版本 |
| 2.0.42 | 修 WebSocket 连接中 wasNull 接口返回值 | - |
| 2.0.41 | 修 REST 连接中用户名和密码转码方式 | - |
| 2.0.42 | 修 WebSocket 连接中 wasNull 接口返回值 | - |
| 2.0.41 | 修 REST 连接中用户名和密码转码方式 | - |
| 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 | - |
| 2.0.38 | JDBC REST 连接增加批量拉取功能 | - |
| 2.0.37 | 增加对 json tag 支持 | - |
......@@ -287,9 +288,9 @@ url 中的配置参数如下:
- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。
- httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。
- httpSocketTimeout: socket 超时时间,单位 ms,默认值为 5000。仅在 batchfetch 设置为 false 时生效。
- messageWaitTimeout: 消息超时时间, 单位 ms, 默认值为 3000。 仅在 batchfetch 设置为 true 时生效。
- httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 60000。
- httpSocketTimeout: socket 超时时间,单位 ms,默认值为 60000。仅在 batchfetch 设置为 false 时生效。
- messageWaitTimeout: 消息超时时间, 单位 ms, 默认值为 60000。 仅在 batchfetch 设置为 true 时生效。
- useSSL: 连接中是否使用 SSL。
- httpPoolSize: REST 并发请求大小,默认 20。
......@@ -355,9 +356,9 @@ properties 中的配置参数如下:
- TSDBDriver.PROPERTY_KEY_CHARSET:客户端使用的字符集,默认值为系统字符集。
- TSDBDriver.PROPERTY_KEY_LOCALE:仅在使用 JDBC 原生连接时生效。 客户端语言环境,默认值系统当前 locale。
- TSDBDriver.PROPERTY_KEY_TIME_ZONE:仅在使用 JDBC 原生连接时生效。 客户端使用的时区,默认值为系统当前时区。
- TSDBDriver.HTTP_CONNECT_TIMEOUT: 连接超时时间,单位 ms, 默认值为 5000。仅在 REST 连接时生效。
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket 超时时间,单位 ms,默认值为 5000。仅在 REST 连接且 batchfetch 设置为 false 时生效。
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms, 默认值为 3000。 仅在 REST 连接且 batchfetch 设置为 true 时生效。
- TSDBDriver.HTTP_CONNECT_TIMEOUT: 连接超时时间,单位 ms, 默认值为 60000。仅在 REST 连接时生效。
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket 超时时间,单位 ms,默认值为 60000。仅在 REST 连接且 batchfetch 设置为 false 时生效。
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms, 默认值为 60000。 仅在 REST 连接且 batchfetch 设置为 true 时生效。
- TSDBDriver.PROPERTY_KEY_USE_SSL: 连接中是否使用 SSL。仅在 REST 连接时生效。
- TSDBDriver.HTTP_POOL_SIZE: REST 并发请求大小,默认 20。
此外对 JDBC 原生连接,通过指定 URL 和 Properties 还可以指定其他参数,比如日志级别、SQL 长度等。更多详细配置请参考[客户端配置](/reference/config/#仅客户端适用)。
......@@ -30,21 +30,57 @@ Websocket 连接支持所有能运行 Rust 的平台。
| Rust 连接器版本 | TDengine 版本 | 主要功能 |
| :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.10 | or later | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.8.12 | or later | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.8.0 | | 支持无模式写入。 |
| v0.7.6 | | 支持在请求中使用 req_id。 |
| v0.6.0 | | 基础功能。 |
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
## 安装
## 处理错误
match conn.exec(sql) {
Ok(_) => {
Err(e) => {
eprintln!("ERROR: {:?}", e);
## TDengine DataType 和 Rust DataType
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对应类型转换如下:
| TDengine DataType | Rust DataType |
| ----------------- | ----------------- |
| TIMESTAMP | Timestamp |
| INT | i32 |
| BIGINT | i64 |
| FLOAT | f32 |
| DOUBLE | f64 |
| SMALLINT | i16 |
| TINYINT | i8 |
| BOOL | bool |
| BINARY | Vec<u8\> |
| NCHAR | String |
| JSON | serde_json::Value |
**注意**:JSON 类型仅在 tag 中支持。
## 安装步骤
### 安装前准备
* 安装 Rust 开发工具链
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 添加 taos 依赖
### 安装连接器
根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖:
......@@ -151,7 +187,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
// use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
......@@ -233,41 +270,191 @@ async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
## 使用示例
### 写入数据
### 创建数据库和表
use taos::*;
#### SQL 写入
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "query";
// create database
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
// create table
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
> **注意**:如果不使用 `use db` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 db.tb。
### 插入数据
<RustInsert />
#### STMT 写入
### 查询数据
<RustQuery />
### 执行带有 req_id 的 SQL
此 req_id 可用于请求链路追踪。
let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?;
### 通过参数绑定写入数据
TDengine 的 Rust 连接器实现了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
<RustBind />
#### Schemaless 写入
### 无模式写入
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。
<RustSml />
### 查询数据
### 执行带有 req_id 的无模式写入
<RustQuery />
此 req_id 可用于请求链路追踪。
## API 参考
let sml_data = SmlDataBuilder::default()
### 连接构造器
### 数据订阅
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
通过 DSN 来构建一个连接器构造器。
#### 创建 Topic
let cfg = TaosBuilder::default().build()?;
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
使用 `builder` 对象创建多个连接:
#### 创建 Consumer
从 DSN 开始,构建一个 TMQ 连接器。
let conn: Taos = cfg.build();
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
### 连接池
let mut consumer = tmq.build()?;
#### 订阅消费数据
消费者可订阅一个或多个 `TOPIC`。
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
"** table: {}, got {} records: {:#?}\n",
版本要求 connector-rust >= v0.8.8, TDengine >=
let assignments = consumer.assignments().await.unwrap();
#### 指定订阅 Offset
版本要求 connector-rust >= v0.8.8, TDengine >=
consumer.offset_seek(topic, vgroup_id, offset).await;
#### 关闭订阅
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。
- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。
#### 完整示例
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
### 与连接池使用
在复杂应用中,建议启用连接池。[taos] 的连接池默认(异步模式)使用 [deadpool] 实现。
......@@ -295,7 +482,17 @@ let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).
let taos = pool.get()?;
### 连接
### 更多示例程序
示例程序源码位于 `TDengine/examples/rust` 下:
请参考:[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust)
## 常见问题
请参考 [FAQ](../../../train-faq/faq)
## API 参考
[Taos][struct.Taos] 对象提供了多个数据库操作的 API:
......@@ -381,9 +578,13 @@ let taos = pool.get()?;
- `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。
- `.use_database(database: &str)`: 执行 `USE` 语句。
除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。
除此之外,该结构也是参数绑定和行协议接口的入口,使用方法请参考具体的 API 说明。
### 参数绑定接口
<a id="stmt-api" style={{color:'#141414'}}>
与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]:
......@@ -394,7 +595,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
#### `.set_tbname(name)`
......@@ -403,7 +604,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
#### `.set_tags(&[tag])`
当 SQL 语句使用超级表时,用于绑定子表表名和标签值:
......@@ -413,7 +614,7 @@ stmt.set_tbname("d0")?;
#### `.bind(&[column])`
用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
......@@ -437,7 +638,7 @@ let params = vec![
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
#### `.execute()`
执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。
......@@ -452,92 +653,6 @@ stmt.execute()?;
一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。
### 订阅
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
从 DSN 开始,构建一个 TMQ 连接器。
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build()?;
消费者可订阅一个或多个 `TOPIC`。
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
"** table: {}, got {} records: {:#?}\n",
版本要求 connector-rust >= v0.8.8, TDengine >=
let assignments = consumer.assignments().await.unwrap();
版本要求 connector-rust >= v0.8.8, TDengine >=
consumer.offset_seek(topic, vgroup_id, offset).await;
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。
- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。
......@@ -21,10 +21,25 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
- 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。
- REST 连接支持所有能运行 Python 的平台。
## 版本选择
### 支持的功能
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。
## 历史版本
无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。
|Python Connector 版本|主要变化|
|2.7.8|新增 `execute_many`|
|Python Websocket Connector 版本|主要变化|
|0.2.5|1. 数据订阅支持获取消费进度和重置消费进度 <br/> 2. 支持 schemaless <br/> 3. 支持 STMT|
## 处理异常
Python 连接器可能会产生 4 种异常:
......@@ -55,12 +70,25 @@ Python Connector 的所有数据库操作如果出现异常,都会直接抛出
{{#include docs/examples/python/handle_exception.py}}
## 支持的功能
TDengine DataType 和 Python DataType
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:
|TDengine DataType|Python DataType|
## 安装
## 安装步骤
### 安装前准备
......@@ -373,7 +401,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
<TabItem value="websocket" label="WebSocket 连接">
#### Connection 类的使用
##### Connection 类的使用
`Connection` 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: execute、 query、schemaless_insert 和 subscribe 方法。
......@@ -537,7 +565,7 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。
#### 创建 Consumer
##### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
......@@ -547,15 +575,15 @@ from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": ""})
#### 订阅 topics
##### 订阅 topics
Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
consumer.subscribe(['topic1', 'topic2'])
#### 消费数据
##### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
......@@ -573,7 +601,7 @@ while True:
#### 获取消费进度
##### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
......@@ -581,7 +609,7 @@ Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic
assignments = consumer.assignment()
#### 重置消费进度
##### 指定订阅 Offset
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。
......@@ -590,7 +618,7 @@ tp = TopicPartition(topic='topic1', partition=0, offset=0)
#### 结束消费
##### 关闭订阅
消费结束后,应当取消订阅,并关闭 Consumer。
......@@ -599,13 +627,13 @@ consumer.unsubscribe()
#### tmq 订阅示例代码
##### 完整示例
{{#include docs/examples/python/tmq_example.py}}
#### 获取和重置消费进度示例代码
##### 获取和重置消费进度示例代码
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
......@@ -619,7 +647,7 @@ consumer.close()
taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。
#### 创建 Consumer
##### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
......@@ -629,15 +657,15 @@ import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
#### 订阅 topics
##### 订阅 topics
Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
consumer.subscribe(['topic1', 'topic2'])
#### 消费数据
##### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
......@@ -654,7 +682,7 @@ while True:
#### 获取消费进度
##### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
......@@ -662,7 +690,7 @@ Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic
assignments = consumer.assignment()
#### 重置消费进度
##### 重置消费进度
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。
......@@ -670,7 +698,7 @@ Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位
consumer.seek(topic='topic1', partition=0, offset=0)
#### 结束消费
##### 结束消费
消费结束后,应当取消订阅,并关闭 Consumer。
......@@ -679,7 +707,7 @@ consumer.unsubscribe()
#### tmq 订阅示例代码
##### tmq 订阅示例代码
{{#include docs/examples/python/tmq_websocket_example.py}}
......@@ -687,7 +715,7 @@ consumer.close()
连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。
#### 获取和重置消费进度示例代码
##### 获取和重置消费进度示例代码
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
......@@ -703,19 +731,19 @@ consumer.close()
<Tabs defaultValue="list">
<TabItem value="list" label="List 写入">
##### 简单写入
{{#include docs/examples/python/schemaless_insert.py}}
带有 ttl 参数的写入
##### 带有 ttl 参数的写入
{{#include docs/examples/python/schemaless_insert_ttl.py}}
带有 req_id 参数的写入
##### 带有 req_id 参数的写入
{{#include docs/examples/python/schemaless_insert_req_id.py}}
......@@ -725,19 +753,19 @@ consumer.close()
<TabItem value="raw" label="Raw 写入">
##### 简单写入
{{#include docs/examples/python/schemaless_insert_raw.py}}
带有 ttl 参数的写入
##### 带有 ttl 参数的写入
{{#include docs/examples/python/schemaless_insert_raw_ttl.py}}
带有 req_id 参数的写入
##### 带有 req_id 参数的写入
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
......@@ -753,7 +781,7 @@ TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写
<TabItem value="native" label="原生连接">
#### 创建 stmt
##### 创建 stmt
Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
......@@ -764,7 +792,7 @@ conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
#### 参数绑定
##### 参数绑定
调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。
......@@ -794,7 +822,7 @@ params[15].timestamp([None, None, 1626861392591])
#### 执行 sql
##### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
......@@ -802,7 +830,7 @@ stmt.bind_param_batch(params)
#### 关闭 stmt
##### 关闭 stmt
最后需要关闭 stmt。
......@@ -810,7 +838,7 @@ stmt.execute()
#### 示例代码
##### 示例代码
{{#include docs/examples/python/stmt_example.py}}
......@@ -819,7 +847,7 @@ stmt.close()
<TabItem value="websocket" label="WebSocket 连接">
#### 创建 stmt
##### 创建 stmt
Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
......@@ -830,7 +858,7 @@ conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
#### 解析 sql
##### 解析 sql
调用 stmt 的 `prepare` 方法来解析 insert 语句。
......@@ -838,7 +866,7 @@ stmt = conn.statement()
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
#### 参数绑定
##### 参数绑定
调用 stmt 的 `bind_param` 方法绑定参数。
......@@ -857,7 +885,7 @@ stmt.bind_param([
#### 执行 sql
##### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
......@@ -865,7 +893,7 @@ stmt.add_batch()
#### 关闭 stmt
##### 关闭 stmt
最后需要关闭 stmt。
......@@ -873,7 +901,7 @@ stmt.execute()
#### 示例代码
##### 示例代码
{{#include docs/examples/python/stmt_websocket_example.py}}
......@@ -991,18 +991,14 @@ SAMPLE(expr, k)
**功能说明**: 获取数据的 k 个采样值。参数 k 的合法输入范围是 1≤ k ≤ 1000。
**返回结果类型**: 同原始数据类型, 返回结果中带有该行记录的时间戳
**返回结果类型**: 同原始数据类型。
**嵌套子查询支持**: 适用于内层查询和外层查询。
- 不能参与表达式计算;该函数可以应用在普通表和超级表上;
### TAIL
......@@ -1047,11 +1043,11 @@ TOP(expr, k)
**功能说明**:返回该列的数值首次出现的值。该函数功能与 distinct 相似,但是可以匹配标签和时间戳信息。可以针对除时间列以外的字段进行查询,可以匹配标签和时间戳,其中的标签和时间戳是第一次出现时刻的标签和时间戳
**功能说明**:返回该列数据首次出现的值。该函数功能与 distinct 相似
**适用于**: 表和超级表。
......@@ -36,7 +36,7 @@ SHOW CONNECTIONS;
......@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
<Release type="tdengine" version="" />
<Release type="tdengine" version="" />
......@@ -164,6 +164,8 @@ extern char tsSmlTagName[];
// extern bool tsSmlDataFormat;
// extern int32_t tsSmlBatchSize;
extern int32_t tmqMaxTopicNum;
// wal
extern int64_t tsWalFsyncDataSizeLimit;
......@@ -145,7 +145,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
// TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
......@@ -56,7 +56,8 @@ typedef struct {
void* pStateBackend;
struct SStorageAPI api;
int8_t fillHistory;
int8_t fillHistory;
STimeWindow winRange;
} SReadHandle;
// in queue mode, data streams are seperated by msg
......@@ -111,6 +111,12 @@ int32_t udfStartUdfd(int32_t startDnodeId);
int32_t udfStopUdfd();
* get udfd pid
int32_t udfGetUdfdPid(int32_t* pUdfdPid);
#ifdef __cplusplus
......@@ -66,8 +66,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected"
......@@ -277,7 +277,7 @@ int32_t* taosGetErrno();
// mnode-db
......@@ -288,9 +288,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_DB_ACCT TAOS_DEF_ERROR_CODE(0, 0x0389) // internal
......@@ -516,6 +516,7 @@ int32_t* taosGetErrno();
// grant
......@@ -768,6 +769,8 @@ int32_t* taosGetErrno();
// stream
......@@ -778,7 +781,7 @@ int32_t* taosGetErrno();
#ifdef __cplusplus
......@@ -79,6 +79,7 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight);
int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight);
int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight);
int32_t compareLenBinaryVal(const void *pLeft, const void *pRight);
int32_t comparestrRegexMatch(const void *pLeft, const void *pRight);
int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight);
......@@ -152,7 +152,7 @@ function wgetFile {
if [ -f ${file} ];then
echoColor YD "${file} already exists ,it will delete it and download it again "
......@@ -749,6 +749,9 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.suid = pTableMeta->uid;
pReq.source = TD_REQ_FROM_TAOX;
pSql = (action == SCHEMA_ACTION_ADD_COLUMN) ? "sml_add_column" : "sml_modify_column_size";
} else{
uError("SML:0x%" PRIx64 " invalid action:%d", info->id, action);
goto end;
code = buildRequest(info->taos->id, pSql, strlen(pSql), NULL, false, &pRequest, 0);
......@@ -939,8 +939,6 @@ int stmtClose(TAOS_STMT* stmt) {
STMT_DLOG_E("stmt freed");
......@@ -652,7 +652,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
int32_t j = 0;
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j);
if (pVg->vgId == vgId) {
......@@ -666,7 +666,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j);
if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
......@@ -742,13 +742,15 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
static void generateTimedTask(int64_t refId, int32_t type) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
taosReleaseRef(tmqMgmt.rsetId, refId);
if(tmq == NULL) return;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
if(pTaskType == NULL) return;
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
taosReleaseRef(tmqMgmt.rsetId, refId);
void tmqAssignAskEpTask(void* param, void* tmrId) {
......@@ -763,19 +765,19 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
taosWriteQitem(tmq->delayedTask, pTaskType);
taosReleaseRef(tmqMgmt.rsetId, refId);
//void tmqAssignDelayedReportTask(void* param, void* tmrId) {
// int64_t refId = *(int64_t*)param;
// tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
// if (tmq != NULL) {
// int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
// taosWriteQitem(tmq->delayedTask, pTaskType);
// tsem_post(&tmq->rspSem);
// }
// taosReleaseRef(tmqMgmt.rsetId, refId);
// taosMemoryFree(param);
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
......@@ -813,7 +815,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows->offset = pVg->offsetInfo.currentOffset;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows);
// tmq->needReportOffsetRows = false;
......@@ -1489,7 +1491,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
STqOffsetVal offsetNew = {0};
offsetNew.type = tmq->resetOffsetCfg;
SMqClientVg clientVg = {
.pollCnt = 0,
......@@ -162,7 +162,7 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "task_id", .bytes = 32, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
......@@ -290,7 +290,7 @@ static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "offset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
......@@ -352,7 +352,7 @@ static const SSysDbTableSchema connectionsSchema[] = {
static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
......@@ -105,11 +105,13 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
// bool tsSmlDataFormat = false;
// int32_t tsSmlBatchSize = 10000;
// tmq
int32_t tmqMaxTopicNum = 20;
// query
int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = false;
bool tsEnableQueryHb = true;
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true
int32_t tsQuerySmaOptimize = 0;
......@@ -511,6 +513,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1;
......@@ -882,6 +886,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
tmqMaxTopicNum= cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
......@@ -6982,8 +6982,11 @@ int32_t tDecodeSVAlterTbReqSetCtime(SDecoder* pDecoder, SVAlterTbReq* pReq, int6
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeSVAlterTbReqCommon(pDecoder, pReq) < 0) return -1;
*(int64_t *)(pDecoder->data + pDecoder->pos) = ctimeMs;
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
pReq->ctimeMs = 0;
if (!tDecodeIsEnd(pDecoder)) {
*(int64_t *)(pDecoder->data + pDecoder->pos) = ctimeMs;
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
return 0;
......@@ -7541,8 +7544,11 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder *pDecoder, SBatchDeleteReq *pReq, int64_t ctimeMs) {
if (tDecodeSBatchDeleteReqCommon(pDecoder, pReq)) return -1;
*(int64_t *)(pDecoder->data + pDecoder->pos) = ctimeMs;
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
pReq->ctimeMs = 0;
if (!tDecodeIsEnd(pDecoder)) {
*(int64_t *)(pDecoder->data + pDecoder->pos) = ctimeMs;
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
return 0;
......@@ -969,7 +969,7 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision)
fractionLen = 0;
if (taosLocalTime(&quot, &ptm, buf) == NULL) {
......@@ -25,14 +25,15 @@ extern "C" {
enum {
// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode);
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
......@@ -137,12 +137,12 @@ typedef enum {
} EDndReason;
typedef enum {
CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic
CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic
CONSUMER_UPDATE_REB_MODIFY_NOTOPIC = 1, // topic do not need modified after rebalance
CONSUMER_UPDATE_REB_MODIFY_TOPIC, // topic need modified after rebalance
CONSUMER_UPDATE_REB_MODIFY_REMOVE, // topic need removed after rebalance
CONSUMER_UPDATE_SUB_MODIFY, // modify after subscribe req
} ECsmUpdateType;
typedef struct {
......@@ -549,7 +549,7 @@ typedef struct {
// data for display
int32_t pid;
SEpSet ep;
int64_t upTime;
int64_t createTime;
int64_t subscribeTime;
int64_t rebalanceTime;
......@@ -560,7 +560,7 @@ typedef struct {
} SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
......@@ -25,6 +25,7 @@ extern "C" {
int32_t mndInitSubscribe(SMnode *pMnode);
void mndCleanupSubscribe(SMnode *pMnode);
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName);
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName);
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key);
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
......@@ -223,7 +223,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
return (void *)buf;
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
......@@ -254,16 +254,20 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L
return NULL;
pConsumer->upTime = taosGetTimestampMs();
pConsumer->createTime = taosGetTimestampMs();
return pConsumer;
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) {
if(pConsumer == NULL) return;
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
......@@ -278,7 +282,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen += taosEncodeFixedI32(buf, pConsumer->pid);
tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
tlen += taosEncodeFixedI64(buf, pConsumer->upTime);
tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
......@@ -348,7 +352,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
buf = taosDecodeFixedI32(buf, &pConsumer->pid);
buf = taosDecodeSEpSet(buf, &pConsumer->ep);
buf = taosDecodeFixedI64(buf, &pConsumer->upTime);
buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
......@@ -233,7 +233,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
code = -1;
taosIp2String(pReq->info.conn.clientIp, ip);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONNECT) != 0) {
mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, ip, terrstr());
......@@ -271,6 +270,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) {
......@@ -842,7 +842,7 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, subStatus, false);
colDataSetVal(pColInfo, curRowIndex, subStatus, (varDataLen(subStatus) == 0) ? true : false);
STR_TO_VARSTR(sql, pQuery->sql);
......@@ -1303,9 +1303,29 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[20] = {0};
strcpy(status, "normal");
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
if (taskStatus == TASK_STATUS__NORMAL) {
memcpy(varDataVal(status), "normal", 6);
varDataSetLen(status, 6);
} else if (taskStatus == TASK_STATUS__DROPPING) {
memcpy(varDataVal(status), "dropping", 8);
varDataSetLen(status, 8);
} else if (taskStatus == TASK_STATUS__FAIL) {
memcpy(varDataVal(status), "fail", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__STOP) {
memcpy(varDataVal(status), "stop", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__SCAN_HISTORY) {
memcpy(varDataVal(status), "history", 7);
varDataSetLen(status, 7);
} else if (taskStatus == TASK_STATUS__HALT) {
memcpy(varDataVal(status), "halt", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__PAUSE) {
memcpy(varDataVal(status), "pause", 5);
varDataSetLen(status, 5);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
......@@ -1358,6 +1378,11 @@ int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
return 0;
......@@ -1412,6 +1437,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
if (pStream->status == STREAM_STATUS__PAUSE) {
return 0;
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
......@@ -1492,6 +1521,10 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// pStream->pHTasksList is null
......@@ -1521,6 +1554,10 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
if (pStream->status != STREAM_STATUS__PAUSE) {
return 0;
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
......@@ -569,6 +569,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
SMqTopicObj *pTopic = NULL;
SDbObj *pDb = NULL;
SCMCreateTopicReq createTopicReq = {0};
if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum){
mError("topic num out of range");
return code;
if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
......@@ -681,7 +686,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndReleaseConsumer(pMnode, pConsumer);
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
......@@ -1980,6 +1980,11 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
if (!tsTtlChangeOnWrite) return 0;
if (changeTimeMs <= 0) {
metaWarn("Skip to change ttl deletetion time on write, uid: %" PRId64, uid);
STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs};
return ttlMgrUpdateChangeTime(pMeta->pTtlMgr, &ctx);
......@@ -358,7 +358,8 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
if (cacheEntry == NULL) {
metaError("ttlMgr flush failed to get ttl cache since %s", tstrerror(terrno));
metaError("ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", tstrerror(terrno), *pUid,
goto _out;
......@@ -826,7 +826,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
return -1;
SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory};
SReadHandle handle = {.vnode = pTq->pVnode,
.initTqReader = 1,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window};
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
......@@ -849,7 +853,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory};
SReadHandle handle = {.vnode = NULL,
.numOfVgroups = numOfVgroups,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window};
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
......@@ -335,6 +335,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
goto _end;
STagVal tagVal = {
......@@ -350,6 +351,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
goto _end;
......@@ -285,6 +285,8 @@ typedef struct SStreamAggSupporter {
int16_t stateKeyType;
SDiskbasedBuf* pResultBuf;
SStateStore stateStore;
STimeWindow winRange;
SStorageAPI* pSessionAPI;
} SStreamAggSupporter;
typedef struct SWindowSupporter {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册