@@ -33,7 +33,7 @@ The below SQL statement is used to insert one row into table "d1001".
INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31);
```
`ts1` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detial, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
`ts1` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detail, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
### Insert Multiple Rows
...
...
@@ -43,7 +43,7 @@ Multiple rows can be inserted in a single SQL statement. The example below inser
`ts1` and `ts2` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detial, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
`ts1` and `ts2` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detail, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
### Insert into Multiple Tables
...
...
@@ -53,7 +53,7 @@ Data can be inserted into multiple tables in the same SQL statement. The example
`ts1`, `ts2` and `ts3` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detial, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
`ts1`, `ts2` and `ts3` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detail, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
For more details about `INSERT` please refer to [INSERT](/taos-sql/insert).
@@ -244,6 +244,8 @@ The following SQL statement creates a topic in TDengine:
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```
- There is an upper limit to the number of topics created, controlled by the parameter tmqMaxTopicNum, with a default of 20
Multiple subscription types are supported.
#### Subscribe to a Column
...
...
@@ -265,14 +267,15 @@ You can subscribe to a topic through a SELECT statement. Statements that specify
Syntax:
```sql
CREATE TOPIC topic_name AS STABLE stb_name
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
```
Creating a topic in this manner differs from a `SELECT * from stbName` statement as follows:
- The table schema can be modified.
- Unstructured data is returned. The format of the data returned changes based on the supertable schema.
- A different table schema may exist for every data block to be processed.
- The 'with meta' parameter is optional. When selected, statements such as creating super tables and sub tables will be returned, mainly used for Taosx to perform super table migration
- The 'where_condition' parameter is optional and will be used to filter and subscribe to sub tables that meet the criteria. Where conditions cannot have ordinary columns, only tags or tbnames. Functions can be used in where conditions to filter tags, but cannot be aggregate functions because sub table tag values cannot be aggregated. It can also be a constant expression, such as 2>1 (subscribing to all child tables), Or false (subscribe to 0 sub tables)
- The data returned does not include tags.
### Subscribe to a Database
...
...
@@ -280,10 +283,12 @@ Creating a topic in this manner differs from a `SELECT * from stbName` statement
Syntax:
```sql
CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
```
This SQL statement creates a subscription to all tables in the database. You can add the `WITH META` parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka.
This SQL statement creates a subscription to all tables in the database.
- The 'with meta' parameter is optional. When selected, it will return statements for creating all super tables and sub tables in the database, mainly used for Taosx database migration
## Create a Consumer
...
...
@@ -295,7 +300,7 @@ You configure the following parameters when creating a consumer:
| `td.connect.user` | string | User Name | |
| `td.connect.pass` | string | Password | |
| `td.connect.port` | string | Port of the server side | |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups. |
| `client.id` | string | Client ID | Maximum length: 192. |
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
@@ -17,7 +17,7 @@ When you create a user-defined function, you must implement standard interface f
- For aggregate functions, implement the `aggfn_start`, `aggfn`, and `aggfn_finish` interface functions.
- To initialize your function, implement the `udf_init` function. To terminate your function, implement the `udf_destroy` function.
There are strict naming conventions for these interface functions. The names of the start, finish, init, and destroy interfaces must be <udf-name\>_start, <udf-name\>_finish, <udf-name\>_init, and <udf-name\>_destroy, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function.
There are strict naming conventions for these interface functions. The names of the start, finish, init, and destroy interfaces must be `_start`, `_finish`, `_init`, and `_destroy`, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function.
### Implementing a Scalar Function in C
The implementation of a scalar function is described as follows:
...
...
@@ -318,7 +318,7 @@ The implementation of a scalar UDF is described as follows:
Description: this function prcesses datablock, which is the input; you can use datablock.data(row, col) to access the python object at location(row,col); the output is a tuple object consisted of objects of type outputtype
Description: this function processes datablock, which is the input; you can use datablock.data(row, col) to access the python object at location(row,col); the output is a tuple object consisted of objects of type outputtype
Note: aggregate UDF requires init(), destroy(), start(), reduce() and finish() to be impemented. start() generates the initial result in buffer, then the input data is divided into multiple row data blocks, reduce() is invoked for each data block `inputs` and intermediate `buf`, finally finish() is invoked to generate final result from the intermediate result `buf`.
Note: aggregate UDF requires init(), destroy(), start(), reduce() and finish() to be implemented. start() generates the initial result in buffer, then the input data is divided into multiple row data blocks, reduce() is invoked for each data block `inputs` and intermediate `buf`, finally finish() is invoked to generate final result from the intermediate result `buf`.
### Data Mapping between TDengine SQL and Python UDF
...
...
@@ -559,7 +559,7 @@ Note: Prior to TDengine 3.0.5.0 (excluding), updating a UDF requires to restart
#### Sample 3: UDF with n arguments
A UDF which accepts n intergers, likee (x1, x2, ..., xn) and output the sum of the product of each value and its sequence number: 1 * x1 + 2 * x2 + ... + n * xn. If there is `null` in the input, then the result is `null`. The difference from sample 1 is that it can accept any number of columns as input and process each column. Assume the program is written in /root/udf/nsum.py:
A UDF which accepts n integers, likee (x1, x2, ..., xn) and output the sum of the product of each value and its sequence number: 1 * x1 + 2 * x2 + ... + n * xn. If there is `null` in the input, then the result is `null`. The difference from sample 1 is that it can accept any number of columns as input and process each column. Assume the program is written in /root/udf/nsum.py:
```python
definit():
...
...
@@ -607,7 +607,7 @@ Query OK, 4 row(s) in set (0.010653s)
#### Sample 4: Utilize 3rd party package
A UDF which accepts a timestamp and output the next closed Sunday. This sample requires to use third party package `moment`, you need to install it firslty.
A UDF which accepts a timestamp and output the next closed Sunday. This sample requires to use third party package `moment`, you need to install it firstly.
```shell
pip3 install moment
...
...
@@ -701,7 +701,7 @@ Query OK, 4 row(s) in set (1.011474s)
#### Sample 5: Aggregate Function
An aggregate function which calculates the difference of the maximum and the minimum in a column. An aggregate funnction takes multiple rows as input and output only one data. The execution process of an aggregate UDF is like map-reduce, the framework divides the input into multiple parts, each mapper processes one block and the reducer aggregates the result of the mappers. The reduce() of Python UDF has the functionality of both map() and reduce(). The reduce() takes two arguments: the data to be processed; and the result of other tasks executing reduce(). For exmaple, assume the code is in `/root/udf/myspread.py`.
An aggregate function which calculates the difference of the maximum and the minimum in a column. An aggregate funnction takes multiple rows as input and output only one data. The execution process of an aggregate UDF is like map-reduce, the framework divides the input into multiple parts, each mapper processes one block and the reducer aggregates the result of the mappers. The reduce() of Python UDF has the functionality of both map() and reduce(). The reduce() takes two arguments: the data to be processed; and the result of other tasks executing reduce(). For example, assume the code is in `/root/udf/myspread.py`.
```python
importio
...
...
@@ -755,7 +755,7 @@ In this example, we implemented an aggregate function, and added some logging.
2. log() is the function for logging, it converts the input object to string and output with an end of line
3. destroy() closes the log file \
4. start() returns the initial buffer for storing the intermediate result
5. reduce() processes each daa block and aggregates the result
5. reduce() processes each data block and aggregates the result
6. finish() converts the final buffer() to final result\
@@ -672,7 +672,7 @@ If you input a specific column, the number of non-null values in the column is r
ELAPSED(ts_primary_key[,time_unit])
```
**Description**: `elapsed` function can be used to calculate the continuous time length in which there is valid data. If it's used with `INTERVAL` clause, the returned result is the calculated time length within each time window. If it's used without `INTERVAL` caluse, the returned result is the calculated time length within the specified time range. Please be noted that the return value of `elapsed` is the number of `time_unit` in the calculated time length.
**Description**: `elapsed` function can be used to calculate the continuous time length in which there is valid data. If it's used with `INTERVAL` clause, the returned result is the calculated time length within each time window. If it's used without `INTERVAL` clause, the returned result is the calculated time length within the specified time range. Please be noted that the return value of `elapsed` is the number of `time_unit` in the calculated time length.
**Return value type**: Double if the input value is not NULL;
...
...
@@ -999,18 +999,14 @@ SAMPLE(expr, k)
**Description**: _k_ sampling values of a specific column. The applicable range of _k_ is [1,1000].
**Return value type**: Same as the column being operated plus the associated timestamp
**Return value type**: Same as the column being operated
**Applicable data types**: Any data type except for tags of STable
**Applicable data types**: Any data type
**Applicable nested query**: Inner query and Outer query
**Applicable table types**: standard tables and supertables
**More explanations**:
- This function cannot be used in expression calculation.
### TAIL
...
...
@@ -1055,11 +1051,11 @@ TOP(expr, k)
UNIQUE(expr)
```
**Description**: The values that occur the first time in the specified column. The effect is similar to `distinct` keyword, but it can also be used to match tags or timestamp. The first occurrence of a timestamp or tag is used.
**Description**: The values that occur the first time in the specified column. The effect is similar to `distinct` keyword.
**Return value type**:Same as the data type of the column being operated upon
**Applicable column types**: Any data types except for timestamp
@@ -21,7 +21,7 @@ part_list can be any scalar expression, such as a column, constant, scalar funct
A PARTITION BY clause is processed as follows:
- The PARTITION BY clause must occur after the WHERE clause
- The PARTITION BY caluse partitions the data according to the specified dimensions, then perform computation on each partition. The performed computation is determined by the rest of the statement - a window clause, GROUP BY clause, or SELECT clause.
- The PARTITION BY clause partitions the data according to the specified dimensions, then perform computation on each partition. The performed computation is determined by the rest of the statement - a window clause, GROUP BY clause, or SELECT clause.
- The PARTITION BY clause can be used together with a window clause or GROUP BY clause. In this case, the window or GROUP BY clause takes effect on every partition. For example, the following statement partitions the table by the location tag, performs downsampling over a 10 minute window, and returns the maximum value:
@@ -284,9 +285,9 @@ The configuration parameters in the URL are as follows:
- batchfetch: true: pulls result sets in batches when executing queries; false: pulls result sets row by row. The default value is: false. batchfetch uses HTTP for data transfer. JDBC REST supports batch pulls. taos-jdbcdriver and TDengine transfer data via WebSocket connection. Compared with HTTP, WebSocket enables JDBC REST connection to support large data volume querying and improve query performance.
- charset: specify the charset to parse the string, this parameter is valid only when set batchfetch to true.
- batchErrorIgnore: true: when executing executeBatch of Statement, if one SQL execution fails in the middle, continue to execute the following SQL. false: no longer execute any statement after the failed SQL. The default value is: false.
- httpConnectTimeout: REST connection timeout in milliseconds, the default value is 5000 ms.
- httpSocketTimeout: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when batchfetch is false.
- messageWaitTimeout: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when batchfetch is true.
- httpConnectTimeout: REST connection timeout in milliseconds, the default value is 60000 ms.
- httpSocketTimeout: socket timeout in milliseconds, the default value is 60000 ms. It only takes effect when batchfetch is false.
- messageWaitTimeout: message transmission timeout in milliseconds, the default value is 60000 ms. It only takes effect when batchfetch is true.
- useSSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection.
- httpPoolSize: size of REST concurrent requests. The default value is 20.
...
...
@@ -352,9 +353,9 @@ The configuration parameters in properties are as follows.
- TSDBDriver.PROPERTY_KEY_CHARSET: In the character set used by the client, the default value is the system character set.
- TSDBDriver.PROPERTY_KEY_LOCALE: this only takes effect when using JDBC native connection. Client language environment, the default value is system current locale.
- TSDBDriver.PROPERTY_KEY_TIME_ZONE: only takes effect when using JDBC native connection. In the time zone used by the client, the default value is the system's current time zone.
- TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection and batchfetch is false.
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when using JDBC REST connection and batchfetch is true.
- TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is false.
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is true.
- TSDBDriver.PROPERTY_KEY_USE_SSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_POOL_SIZE: size of REST concurrent requests. The default value is 20.
For JDBC native connections, you can specify other parameters, such as log level, SQL length, etc., by specifying URL and Properties. For more detailed configuration, please refer to [Client Configuration](/reference/config/#Client-Only).
| v0.8.10 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.0 | 3.0.4.0 | Support schemaless insert. |
| v0.7.6 | 3.0.3.0 | Support req_id in query. |
| v0.6.0 | 3.0.0.0 | Base features. |
The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.
## Installation
## Handling exceptions
After the error is reported, the specific information of the error can be obtained:
```rust
match conn.exec(sql) {
Ok(_) => {
Ok(())
}
Err(e) => {
eprintln!("ERROR: {:?}", e);
Err(e)
}
}
```
## TDengine DataType vs. Rust DataType
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Rust is as follows:
| TDengine DataType | Rust DataType |
| ----------------- | ----------------- |
| TIMESTAMP | Timestamp |
| INT | i32 |
| BIGINT | i64 |
| FLOAT | f32 |
| DOUBLE | f64 |
| SMALLINT | i16 |
| TINYINT | i8 |
| BOOL | bool |
| BINARY | Vec<u8\> |
| NCHAR | String |
| JSON | serde_json::Value |
Note: Only TAG supports JSON types
## Installation Steps
### Pre-installation preparation
* Install the Rust development toolchain
* If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver)
### Add taos dependency
### Install the connectors
Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows:
...
...
@@ -146,7 +182,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
// use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
```
After the connection is established, you can perform operations on your database.
...
...
@@ -228,41 +265,191 @@ There are two ways to query data: Using built-in types or the [serde](https://se
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
]).await?;
}
```
> The query is consistent with operating a relational database. When using subscripts to get the contents of the returned fields, you have to start from 1. However, we recommend using the field names to get the values of the fields in the result set.
### Insert data
<RustInsert />
#### STMT Write
### Query data
<RustQuery />
### execute SQL with req_id
This req_id can be used to request link tracing.
```rust
let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?;
```
### Writing data via parameter binding
TDengine has significantly improved the bind APIs to support data writing (INSERT) scenarios. Writing data in this way avoids the resource consumption of SQL syntax parsing, resulting in significant write performance improvements in many cases.
Parameter binding details see [API Reference](#stmt-api)
<RustBind />
#### Schemaless Write
### Schemaless Writing
TDengine supports schemaless writing. It is compatible with InfluxDB's Line Protocol, OpenTSDB's telnet line protocol, and OpenTSDB's JSON format protocol. For more information, see [Schemaless Writing](../../schemaless).
<RustSml />
### Query data
### Schemaless with req_id
<RustQuery />
This req_id can be used to request link tracing.
## API Reference
```rust
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.data(data)
.req_id(100u64)
.build()?;
client.put(&sml_data)?
```
### Data Subscription
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
#### Create a Topic
```rust
taos.exec_many([
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
])
.await?;
```
#### Create a Consumer
You create a TMQ connector by using a DSN.
```rust
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
```
Create a consumer:
```rust
let mut consumer = tmq.build()?;
```
#### Subscribe to consume data
A single consumer can subscribe to one or more topics.
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
```rust
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
```
Get assignments:
Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
```rust
let assignments = consumer.assignments().await.unwrap();
```
### Connector Constructor
#### Assignment subscription Offset
You create a connector constructor by using a DSN.
Seek offset:
Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
You use the builder object to create multiple connections.
#### Close subscriptions
```rust
let conn: Taos = cfg.build();
consumer.unsubscribe().await;
```
### Connection pooling
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
#### Full Sample Code
For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
### Use with connection pool
In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2].
...
...
@@ -292,7 +479,17 @@ In the application code, use `pool.get()? ` to get a connection object [Taos].
let taos = pool.get()?;
```
### Connectors
### More sample programs
The source code of the sample application is under `TDengine/examples/rust` :
For additional troubleshooting, see [FAQ](../../../train-faq/faq).
## API Reference
The [Taos][struct.Taos] object provides an API to perform operations on multiple databases.
...
...
@@ -378,9 +575,13 @@ Note that Rust asynchronous functions and an asynchronous runtime are required.
- `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement.
- `.use_database(database: &str)`: Executes the `USE` statement.
In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage.
In addition, this structure is also the entry point for Parameter Binding and Line Protocol Interface. Please refer to the specific API descriptions for usage.
### Bind Interface
<p>
<a id="stmt-api" style={{color:'#141414'}}>
Bind Interface
</a>
</p>
Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement.
...
...
@@ -391,7 +592,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
The bind object provides a set of interfaces for implementing parameter binding.
#### `.set_tbname(name)`
`.set_tbname(name)`
To bind table names.
...
...
@@ -400,7 +601,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```
#### `.set_tags(&[tag])`
`.set_tags(&[tag])`
Bind sub-table table names and tag values when the SQL statement uses a super table.
Bind value types. Use the [ColumnView] structure to create and bind the required types.
...
...
@@ -434,7 +635,7 @@ let params = vec![
let rows = stmt.bind(¶ms)?.add_batch()?.execute()?;
```
#### `.execute()`
`.execute()`
Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`.
...
...
@@ -449,92 +650,6 @@ stmt.execute()?;
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
### Subscriptions
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
You create a TMQ connector by using a DSN.
```rust
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
```
Create a consumer:
```rust
let mut consumer = tmq.build()?;
```
A single consumer can subscribe to one or more topics.
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
```rust
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
```
Get assignments:
Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
```rust
let assignments = consumer.assignments().await.unwrap();
```
Seek offset:
Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos).