From 8cf3b3b2586da9903a206a7bc683235279dc84e5 Mon Sep 17 00:00:00 2001 From: Adam Ji Date: Tue, 27 Jun 2023 11:09:11 +0800 Subject: [PATCH] docs: rust connector docs improvement --- docs/en/14-reference/03-connector/06-rust.mdx | 321 ++++++++++++------ docs/zh/08-connector/26-rust.mdx | 321 ++++++++++++------ 2 files changed, 424 insertions(+), 218 deletions(-) diff --git a/docs/en/14-reference/03-connector/06-rust.mdx b/docs/en/14-reference/03-connector/06-rust.mdx index 344bd3590e..402eba844c 100644 --- a/docs/en/14-reference/03-connector/06-rust.mdx +++ b/docs/en/14-reference/03-connector/06-rust.mdx @@ -31,21 +31,57 @@ Websocket connections are supported on all platforms that can run Go. | connector-rust version | TDengine version | major features | | :----------------: | :--------------: | :--------------------------------------------------: | -| v0.8.10 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. | +| v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. | | v0.8.0 | 3.0.4.0 | Support schemaless insert. | | v0.7.6 | 3.0.3.0 | Support req_id in query. | | v0.6.0 | 3.0.0.0 | Base features. | The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues. -## Installation +## Handling exceptions + +After the error is reported, the specific information of the error can be obtained: + +```rust +match conn.exec(sql) { + Ok(_) => { + Ok(()) + } + Err(e) => { + eprintln!("ERROR: {:?}", e); + Err(e) + } +} +``` + +## TDengine DataType vs. Rust DataType + +TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Rust is as follows: + +| TDengine DataType | Rust DataType | +| ----------------- | ----------------- | +| TIMESTAMP | Timestamp | +| INT | i32 | +| BIGINT | i64 | +| FLOAT | f32 | +| DOUBLE | f64 | +| SMALLINT | i16 | +| TINYINT | i8 | +| BOOL | bool | +| BINARY | Vec | +| NCHAR | String | +| JSON | serde_json::Value | + +Note: Only TAG supports JSON types + +## Installation Steps ### Pre-installation preparation * Install the Rust development toolchain * If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver) -### Add taos dependency +### Install the connectors Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows: @@ -146,7 +182,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; let conn1 = builder.build(); // use websocket protocol. -let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; +let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; +let conn2 = builder2.build(); ``` After the connection is established, you can perform operations on your database. @@ -228,41 +265,183 @@ There are two ways to query data: Using built-in types or the [serde](https://se ## Usage examples -### Write data +### Create database and tables + +```rust +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + + let db = "query"; + + // create database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + // create table + taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ + TAGS (`groupid` INT, `location` BINARY(16))", + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + ]).await?; +} +``` -#### SQL Write +### Insert data -#### STMT Write +### Query data + + + +### execute SQL with req_id + +This req_id can be used to request link tracing. + +```rust +let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?; +``` + +### Writing data via parameter binding -#### Schemaless Write +### Schemaless Writing -### Query data +### Schemaless with req_id - +This req_id can be used to request link tracing. -## API Reference +```rust +let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Line) + .data(data) + .req_id(100u64) + .build()?; + +client.put(&sml_data)? +``` + +### Data Subscription + +TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/). + +#### Create a Topic + +```rust +taos.exec_many([ + // create topic for subscription + format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") +]) +.await?; +``` + +#### Create a Consumer + +You create a TMQ connector by using a DSN. + +```rust +let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; +``` + +Create a consumer: + +```rust +let mut consumer = tmq.build()?; +``` + +#### Subscribe to consume data + +A single consumer can subscribe to one or more topics. + +```rust +consumer.subscribe(["tmq_meters"]).await?; +``` + +The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed. + +```rust +{ + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } +} +``` + +Get assignments: + +Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0 -### Connector Constructor +```rust +let assignments = consumer.assignments().await.unwrap(); +``` + +#### Assignment subscription Offset + +Seek offset: -You create a connector constructor by using a DSN. +Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0 ```rust -let cfg = TaosBuilder::default().build()?; +consumer.offset_seek(topic, vgroup_id, offset).await; ``` -You use the builder object to create multiple connections. +#### Close subscriptions ```rust -let conn: Taos = cfg.build(); +consumer.unsubscribe().await; ``` -### Connection pooling +The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory. + +- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis. +- `client.id`: Subscriber client ID. +- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group. +- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential. +- `auto.commit.interval.ms`: Interval for automatic commits. + +#### Full Sample Code + +For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs). + +### Use with connection pool In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2]. @@ -292,7 +471,17 @@ In the application code, use `pool.get()? ` to get a connection object [Taos]. let taos = pool.get()?; ``` -### Connectors +### More sample programs + +The source code of the sample application is under `TDengine/examples/rust` : + +[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust) + +## Frequently Asked Questions + +For additional troubleshooting, see [FAQ](../../../train-faq/faq). + +## API Reference The [Taos][struct.Taos] object provides an API to perform operations on multiple databases. @@ -380,7 +569,7 @@ Note that Rust asynchronous functions and an asynchronous runtime are required. In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage. -### Bind Interface +Bind Interface Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement. @@ -391,7 +580,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; The bind object provides a set of interfaces for implementing parameter binding. -#### `.set_tbname(name)` +`.set_tbname(name)` To bind table names. @@ -400,7 +589,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?; stmt.set_tbname("d0")?; ``` -#### `.set_tags(&[tag])` +`.set_tags(&[tag])` Bind sub-table table names and tag values when the SQL statement uses a super table. @@ -410,7 +599,7 @@ stmt.set_tbname("d0")?; stmt.set_tags(&[Value::VarChar("taos".to_string())])?; ``` -#### `.bind(&[column])` +`.bind(&[column])` Bind value types. Use the [ColumnView] structure to create and bind the required types. @@ -434,7 +623,7 @@ let params = vec![ let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; ``` -#### `.execute()` +`.execute()` Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`. @@ -449,92 +638,6 @@ stmt.execute()?; For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs). -### Subscriptions - -TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/). - -You create a TMQ connector by using a DSN. - -```rust -let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; -``` - -Create a consumer: - -```rust -let mut consumer = tmq.build()?; -``` - -A single consumer can subscribe to one or more topics. - -```rust -consumer.subscribe(["tmq_meters"]).await?; -``` - -The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed. - -```rust -{ - let mut stream = consumer.stream(); - - while let Some((offset, message)) = stream.try_next().await? { - // get information from offset - - // the topic - let topic = offset.topic(); - // the vgroup id, like partition id in kafka. - let vgroup_id = offset.vgroup_id(); - println!("* in vgroup id {vgroup_id} of topic {topic}\n"); - - if let Some(data) = message.into_data() { - while let Some(block) = data.fetch_raw_block().await? { - // one block for one table, get table name if needed - let name = block.table_name(); - let records: Vec = block.deserialize().try_collect()?; - println!( - "** table: {}, got {} records: {:#?}\n", - name.unwrap(), - records.len(), - records - ); - } - } - consumer.commit(offset).await?; - } -} -``` - -Get assignments: - -Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0 - -```rust -let assignments = consumer.assignments().await.unwrap(); -``` - -Seek offset: - -Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0 - -```rust -consumer.offset_seek(topic, vgroup_id, offset).await; -``` - -Unsubscribe: - -```rust -consumer.unsubscribe().await; -``` - -The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory. - -- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis. -- `client.id`: Subscriber client ID. -- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group. -- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential. -- `auto.commit.interval.ms`: Interval for automatic commits. - -For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs). For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos). diff --git a/docs/zh/08-connector/26-rust.mdx b/docs/zh/08-connector/26-rust.mdx index c23228c8cf..287d70bc44 100644 --- a/docs/zh/08-connector/26-rust.mdx +++ b/docs/zh/08-connector/26-rust.mdx @@ -30,21 +30,57 @@ Websocket 连接支持所有能运行 Rust 的平台。 | Rust 连接器版本 | TDengine 版本 | 主要功能 | | :----------------: | :--------------: | :--------------------------------------------------: | -| v0.8.10 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 | +| v0.8.12 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 | | v0.8.0 | 3.0.4.0 | 支持无模式写入。 | | v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 | | v0.6.0 | 3.0.0.0 | 基础功能。 | Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。 -## 安装 +## 处理错误 + +在报错后,可以获取到错误的具体信息: + +```rust +match conn.exec(sql) { + Ok(_) => { + Ok(()) + } + Err(e) => { + eprintln!("ERROR: {:?}", e); + Err(e) + } +} +``` + +## TDengine DataType 和 Rust DataType + +TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对应类型转换如下: + +| TDengine DataType | Rust DataType | +| ----------------- | ----------------- | +| TIMESTAMP | Timestamp | +| INT | i32 | +| BIGINT | i64 | +| FLOAT | f32 | +| DOUBLE | f64 | +| SMALLINT | i16 | +| TINYINT | i8 | +| BOOL | bool | +| BINARY | Vec | +| NCHAR | String | +| JSON | serde_json::Value | + +**注意**:JSON 类型仅在 tag 中支持。 + +## 安装步骤 ### 安装前准备 * 安装 Rust 开发工具链 * 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) -### 添加 taos 依赖 +### 安装连接器 根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖: @@ -151,7 +187,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; let conn1 = builder.build(); // use websocket protocol. -let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; +let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; +let conn2 = builder2.build(); ``` 建立连接后,您可以进行相关数据库操作: @@ -233,41 +270,183 @@ async fn demo(taos: &Taos, db: &str) -> Result<(), Error> { ## 使用示例 -### 写入数据 +### 创建数据库和表 + +```rust +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + + let db = "query"; + + // create database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + // create table + taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ + TAGS (`groupid` INT, `location` BINARY(16))", + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + ]).await?; +} +``` -#### SQL 写入 +### 插入数据 -#### STMT 写入 +### 查询数据 + + + +### 执行带有 req_id 的 SQL + +此 req_id 可用于请求链路追踪。 + +```rust +let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?; +``` + +### 通过参数绑定写入数据 -#### Schemaless 写入 +### 无模式写入 -### 查询数据 +### 执行带有 req_id 的无模式写入 - +此 req_id 可用于请求链路追踪。 -## API 参考 +```rust +let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Line) + .data(data) + .req_id(100u64) + .build()?; + +client.put(&sml_data)? +``` + +### 数据订阅 -### 连接构造器 +TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。 -通过 DSN 来构建一个连接器构造器。 +#### 创建 Topic ```rust -let cfg = TaosBuilder::default().build()?; +taos.exec_many([ + // create topic for subscription + format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") +]) +.await?; ``` -使用 `builder` 对象创建多个连接: +#### 创建 Consumer + +从 DSN 开始,构建一个 TMQ 连接器。 ```rust -let conn: Taos = cfg.build(); +let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; ``` -### 连接池 +创建消费者: + +```rust +let mut consumer = tmq.build()?; +``` + +#### 订阅消费数据 + +消费者可订阅一个或多个 `TOPIC`。 + +```rust +consumer.subscribe(["tmq_meters"]).await?; +``` + +TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。 + +```rust +{ + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } +} +``` + +获取消费进度: + +版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0 + +```rust +let assignments = consumer.assignments().await.unwrap(); +``` + +#### 指定订阅 Offset + +按照指定的进度消费: + +版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0 + +```rust +consumer.offset_seek(topic, vgroup_id, offset).await; +``` + +#### 关闭订阅 + +```rust +consumer.unsubscribe().await; +``` + +对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。 + +- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。 +- `client.id`: 可选的订阅客户端识别项。 +- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。 +- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。 +- `auto.commit.interval.ms`: 自动标记的时间间隔。 + +#### 完整示例 + +完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs). + +### 与连接池使用 在复杂应用中,建议启用连接池。[taos] 的连接池默认(异步模式)使用 [deadpool] 实现。 @@ -295,7 +474,17 @@ let pool: Pool = Pool::builder(Manager::from_dsn(self.dsn.clone()). let taos = pool.get()?; ``` -### 连接 +### 更多示例程序 + +示例程序源码位于 `TDengine/examples/rust` 下: + +请参考:[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust) + +## 常见问题 + +请参考 [FAQ](../../../train-faq/faq) + +## API 参考 [Taos][struct.Taos] 对象提供了多个数据库操作的 API: @@ -383,7 +572,7 @@ let taos = pool.get()?; 除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。 -### 参数绑定接口 +参数绑定接口 与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]: @@ -394,7 +583,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; 参数绑定对象提供了一组接口用于实现参数绑定: -#### `.set_tbname(name)` +`.set_tbname(name)` 用于绑定表名。 @@ -403,7 +592,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?; stmt.set_tbname("d0")?; ``` -#### `.set_tags(&[tag])` +`.set_tags(&[tag])` 当 SQL 语句使用超级表时,用于绑定子表表名和标签值: @@ -413,7 +602,7 @@ stmt.set_tbname("d0")?; stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; ``` -#### `.bind(&[column])` +`.bind(&[column])` 用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定: @@ -437,7 +626,7 @@ let params = vec![ let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; ``` -#### `.execute()` +`.execute()` 执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。 @@ -452,92 +641,6 @@ stmt.execute()?; 一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。 -### 订阅 - -TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。 - -从 DSN 开始,构建一个 TMQ 连接器。 - -```rust -let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; -``` - -创建消费者: - -```rust -let mut consumer = tmq.build()?; -``` - -消费者可订阅一个或多个 `TOPIC`。 - -```rust -consumer.subscribe(["tmq_meters"]).await?; -``` - -TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。 - -```rust -{ - let mut stream = consumer.stream(); - - while let Some((offset, message)) = stream.try_next().await? { - // get information from offset - - // the topic - let topic = offset.topic(); - // the vgroup id, like partition id in kafka. - let vgroup_id = offset.vgroup_id(); - println!("* in vgroup id {vgroup_id} of topic {topic}\n"); - - if let Some(data) = message.into_data() { - while let Some(block) = data.fetch_raw_block().await? { - // one block for one table, get table name if needed - let name = block.table_name(); - let records: Vec = block.deserialize().try_collect()?; - println!( - "** table: {}, got {} records: {:#?}\n", - name.unwrap(), - records.len(), - records - ); - } - } - consumer.commit(offset).await?; - } -} -``` - -获取消费进度: - -版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0 - -```rust -let assignments = consumer.assignments().await.unwrap(); -``` - -按照指定的进度消费: - -版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0 - -```rust -consumer.offset_seek(topic, vgroup_id, offset).await; -``` - -停止订阅: - -```rust -consumer.unsubscribe().await; -``` - -对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。 - -- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。 -- `client.id`: 可选的订阅客户端识别项。 -- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。 -- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。 -- `auto.commit.interval.ms`: 自动标记的时间间隔。 - -完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs). 其他相关结构体 API 使用说明请移步 Rust 文档托管网页:。 -- GitLab