提交 8cf3b3b2 编写于 作者: A Adam Ji

docs: rust connector docs improvement

上级 f9b97f7c
...@@ -31,21 +31,57 @@ Websocket connections are supported on all platforms that can run Go. ...@@ -31,21 +31,57 @@ Websocket connections are supported on all platforms that can run Go.
| connector-rust version | TDengine version | major features | | connector-rust version | TDengine version | major features |
| :----------------: | :--------------: | :--------------------------------------------------: | | :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.10 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. | | v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.0 | 3.0.4.0 | Support schemaless insert. | | v0.8.0 | 3.0.4.0 | Support schemaless insert. |
| v0.7.6 | 3.0.3.0 | Support req_id in query. | | v0.7.6 | 3.0.3.0 | Support req_id in query. |
| v0.6.0 | 3.0.0.0 | Base features. | | v0.6.0 | 3.0.0.0 | Base features. |
The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues. The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.
## Installation ## Handling exceptions
After the error is reported, the specific information of the error can be obtained:
```rust
match conn.exec(sql) {
Ok(_) => {
Ok(())
}
Err(e) => {
eprintln!("ERROR: {:?}", e);
Err(e)
}
}
```
## TDengine DataType vs. Rust DataType
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Rust is as follows:
| TDengine DataType | Rust DataType |
| ----------------- | ----------------- |
| TIMESTAMP | Timestamp |
| INT | i32 |
| BIGINT | i64 |
| FLOAT | f32 |
| DOUBLE | f64 |
| SMALLINT | i16 |
| TINYINT | i8 |
| BOOL | bool |
| BINARY | Vec<u8\> |
| NCHAR | String |
| JSON | serde_json::Value |
Note: Only TAG supports JSON types
## Installation Steps
### Pre-installation preparation ### Pre-installation preparation
* Install the Rust development toolchain * Install the Rust development toolchain
* If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver) * If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver)
### Add taos dependency ### Install the connectors
Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows: Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows:
...@@ -146,7 +182,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; ...@@ -146,7 +182,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build(); let conn1 = builder.build();
// use websocket protocol. // use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
``` ```
After the connection is established, you can perform operations on your database. After the connection is established, you can perform operations on your database.
...@@ -228,41 +265,183 @@ There are two ways to query data: Using built-in types or the [serde](https://se ...@@ -228,41 +265,183 @@ There are two ways to query data: Using built-in types or the [serde](https://se
## Usage examples ## Usage examples
### Write data ### Create database and tables
```rust
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "query";
// create database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
// create table
taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
]).await?;
}
```
#### SQL Write ### Insert data
<RustInsert /> <RustInsert />
#### STMT Write ### Query data
<RustQuery />
### execute SQL with req_id
This req_id can be used to request link tracing.
```rust
let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?;
```
### Writing data via parameter binding
<RustBind /> <RustBind />
#### Schemaless Write ### Schemaless Writing
<RustSml /> <RustSml />
### Query data ### Schemaless with req_id
<RustQuery /> This req_id can be used to request link tracing.
## API Reference ```rust
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.data(data)
.req_id(100u64)
.build()?;
client.put(&sml_data)?
```
### Data Subscription
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
#### Create a Topic
```rust
taos.exec_many([
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
])
.await?;
```
#### Create a Consumer
You create a TMQ connector by using a DSN.
```rust
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
```
Create a consumer:
```rust
let mut consumer = tmq.build()?;
```
#### Subscribe to consume data
A single consumer can subscribe to one or more topics.
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
```rust
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
```
Get assignments:
Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
### Connector Constructor ```rust
let assignments = consumer.assignments().await.unwrap();
```
#### Assignment subscription Offset
Seek offset:
You create a connector constructor by using a DSN. Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
```rust ```rust
let cfg = TaosBuilder::default().build()?; consumer.offset_seek(topic, vgroup_id, offset).await;
``` ```
You use the builder object to create multiple connections. #### Close subscriptions
```rust ```rust
let conn: Taos = cfg.build(); consumer.unsubscribe().await;
``` ```
### Connection pooling The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
#### Full Sample Code
For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
### Use with connection pool
In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2]. In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2].
...@@ -292,7 +471,17 @@ In the application code, use `pool.get()? ` to get a connection object [Taos]. ...@@ -292,7 +471,17 @@ In the application code, use `pool.get()? ` to get a connection object [Taos].
let taos = pool.get()?; let taos = pool.get()?;
``` ```
### Connectors ### More sample programs
The source code of the sample application is under `TDengine/examples/rust` :
[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust)
## Frequently Asked Questions
For additional troubleshooting, see [FAQ](../../../train-faq/faq).
## API Reference
The [Taos][struct.Taos] object provides an API to perform operations on multiple databases. The [Taos][struct.Taos] object provides an API to perform operations on multiple databases.
...@@ -380,7 +569,7 @@ Note that Rust asynchronous functions and an asynchronous runtime are required. ...@@ -380,7 +569,7 @@ Note that Rust asynchronous functions and an asynchronous runtime are required.
In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage. In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage.
### Bind Interface Bind Interface
Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement. Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement.
...@@ -391,7 +580,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; ...@@ -391,7 +580,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
The bind object provides a set of interfaces for implementing parameter binding. The bind object provides a set of interfaces for implementing parameter binding.
#### `.set_tbname(name)` `.set_tbname(name)`
To bind table names. To bind table names.
...@@ -400,7 +589,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?; ...@@ -400,7 +589,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?; stmt.set_tbname("d0")?;
``` ```
#### `.set_tags(&[tag])` `.set_tags(&[tag])`
Bind sub-table table names and tag values when the SQL statement uses a super table. Bind sub-table table names and tag values when the SQL statement uses a super table.
...@@ -410,7 +599,7 @@ stmt.set_tbname("d0")?; ...@@ -410,7 +599,7 @@ stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("taos".to_string())])?; stmt.set_tags(&[Value::VarChar("taos".to_string())])?;
``` ```
#### `.bind(&[column])` `.bind(&[column])`
Bind value types. Use the [ColumnView] structure to create and bind the required types. Bind value types. Use the [ColumnView] structure to create and bind the required types.
...@@ -434,7 +623,7 @@ let params = vec![ ...@@ -434,7 +623,7 @@ let params = vec![
let rows = stmt.bind(&params)?.add_batch()?.execute()?; let rows = stmt.bind(&params)?.add_batch()?.execute()?;
``` ```
#### `.execute()` `.execute()`
Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`. Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`.
...@@ -449,92 +638,6 @@ stmt.execute()?; ...@@ -449,92 +638,6 @@ stmt.execute()?;
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs). For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
### Subscriptions
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
You create a TMQ connector by using a DSN.
```rust
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
```
Create a consumer:
```rust
let mut consumer = tmq.build()?;
```
A single consumer can subscribe to one or more topics.
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
```rust
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
```
Get assignments:
Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
```rust
let assignments = consumer.assignments().await.unwrap();
```
Seek offset:
Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
```rust
consumer.offset_seek(topic, vgroup_id, offset).await;
```
Unsubscribe:
```rust
consumer.unsubscribe().await;
```
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos). For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos).
......
...@@ -30,21 +30,57 @@ Websocket 连接支持所有能运行 Rust 的平台。 ...@@ -30,21 +30,57 @@ Websocket 连接支持所有能运行 Rust 的平台。
| Rust 连接器版本 | TDengine 版本 | 主要功能 | | Rust 连接器版本 | TDengine 版本 | 主要功能 |
| :----------------: | :--------------: | :--------------------------------------------------: | | :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.10 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 | | v0.8.12 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.8.0 | 3.0.4.0 | 支持无模式写入。 | | v0.8.0 | 3.0.4.0 | 支持无模式写入。 |
| v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 | | v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 |
| v0.6.0 | 3.0.0.0 | 基础功能。 | | v0.6.0 | 3.0.0.0 | 基础功能。 |
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。 Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
## 安装 ## 处理错误
在报错后,可以获取到错误的具体信息:
```rust
match conn.exec(sql) {
Ok(_) => {
Ok(())
}
Err(e) => {
eprintln!("ERROR: {:?}", e);
Err(e)
}
}
```
## TDengine DataType 和 Rust DataType
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对应类型转换如下:
| TDengine DataType | Rust DataType |
| ----------------- | ----------------- |
| TIMESTAMP | Timestamp |
| INT | i32 |
| BIGINT | i64 |
| FLOAT | f32 |
| DOUBLE | f64 |
| SMALLINT | i16 |
| TINYINT | i8 |
| BOOL | bool |
| BINARY | Vec<u8\> |
| NCHAR | String |
| JSON | serde_json::Value |
**注意**:JSON 类型仅在 tag 中支持。
## 安装步骤
### 安装前准备 ### 安装前准备
* 安装 Rust 开发工具链 * 安装 Rust 开发工具链
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) * 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 添加 taos 依赖 ### 安装连接器
根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖: 根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖:
...@@ -151,7 +187,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; ...@@ -151,7 +187,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build(); let conn1 = builder.build();
// use websocket protocol. // use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
``` ```
建立连接后,您可以进行相关数据库操作: 建立连接后,您可以进行相关数据库操作:
...@@ -233,41 +270,183 @@ async fn demo(taos: &Taos, db: &str) -> Result<(), Error> { ...@@ -233,41 +270,183 @@ async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
## 使用示例 ## 使用示例
### 写入数据 ### 创建数据库和表
```rust
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "query";
// create database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
// create table
taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
]).await?;
}
```
#### SQL 写入 ### 插入数据
<RustInsert /> <RustInsert />
#### STMT 写入 ### 查询数据
<RustQuery />
### 执行带有 req_id 的 SQL
此 req_id 可用于请求链路追踪。
```rust
let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?;
```
### 通过参数绑定写入数据
<RustBind /> <RustBind />
#### Schemaless 写入 ### 无模式写入
<RustSml /> <RustSml />
### 查询数据 ### 执行带有 req_id 的无模式写入
<RustQuery /> 此 req_id 可用于请求链路追踪。
## API 参考 ```rust
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.data(data)
.req_id(100u64)
.build()?;
client.put(&sml_data)?
```
### 数据订阅
### 连接构造器 TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
通过 DSN 来构建一个连接器构造器。 #### 创建 Topic
```rust ```rust
let cfg = TaosBuilder::default().build()?; taos.exec_many([
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
])
.await?;
``` ```
使用 `builder` 对象创建多个连接: #### 创建 Consumer
从 DSN 开始,构建一个 TMQ 连接器。
```rust ```rust
let conn: Taos = cfg.build(); let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
``` ```
### 连接池 创建消费者:
```rust
let mut consumer = tmq.build()?;
```
#### 订阅消费数据
消费者可订阅一个或多个 `TOPIC`。
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
```rust
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
```
获取消费进度:
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
```rust
let assignments = consumer.assignments().await.unwrap();
```
#### 指定订阅 Offset
按照指定的进度消费:
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
```rust
consumer.offset_seek(topic, vgroup_id, offset).await;
```
#### 关闭订阅
```rust
consumer.unsubscribe().await;
```
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。
- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。
#### 完整示例
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
### 与连接池使用
在复杂应用中,建议启用连接池。[taos] 的连接池默认(异步模式)使用 [deadpool] 实现。 在复杂应用中,建议启用连接池。[taos] 的连接池默认(异步模式)使用 [deadpool] 实现。
...@@ -295,7 +474,17 @@ let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()). ...@@ -295,7 +474,17 @@ let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).
let taos = pool.get()?; let taos = pool.get()?;
``` ```
### 连接 ### 更多示例程序
示例程序源码位于 `TDengine/examples/rust` 下:
请参考:[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust)
## 常见问题
请参考 [FAQ](../../../train-faq/faq)
## API 参考
[Taos][struct.Taos] 对象提供了多个数据库操作的 API: [Taos][struct.Taos] 对象提供了多个数据库操作的 API:
...@@ -383,7 +572,7 @@ let taos = pool.get()?; ...@@ -383,7 +572,7 @@ let taos = pool.get()?;
除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。 除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。
### 参数绑定接口 参数绑定接口
与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]: 与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]:
...@@ -394,7 +583,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; ...@@ -394,7 +583,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
参数绑定对象提供了一组接口用于实现参数绑定: 参数绑定对象提供了一组接口用于实现参数绑定:
#### `.set_tbname(name)` `.set_tbname(name)`
用于绑定表名。 用于绑定表名。
...@@ -403,7 +592,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?; ...@@ -403,7 +592,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?; stmt.set_tbname("d0")?;
``` ```
#### `.set_tags(&[tag])` `.set_tags(&[tag])`
当 SQL 语句使用超级表时,用于绑定子表表名和标签值: 当 SQL 语句使用超级表时,用于绑定子表表名和标签值:
...@@ -413,7 +602,7 @@ stmt.set_tbname("d0")?; ...@@ -413,7 +602,7 @@ stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
``` ```
#### `.bind(&[column])` `.bind(&[column])`
用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定: 用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
...@@ -437,7 +626,7 @@ let params = vec![ ...@@ -437,7 +626,7 @@ let params = vec![
let rows = stmt.bind(&params)?.add_batch()?.execute()?; let rows = stmt.bind(&params)?.add_batch()?.execute()?;
``` ```
#### `.execute()` `.execute()`
执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。 执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。
...@@ -452,92 +641,6 @@ stmt.execute()?; ...@@ -452,92 +641,6 @@ stmt.execute()?;
一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。 一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。
### 订阅
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
从 DSN 开始,构建一个 TMQ 连接器。
```rust
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
```
创建消费者:
```rust
let mut consumer = tmq.build()?;
```
消费者可订阅一个或多个 `TOPIC`。
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
```rust
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
```
获取消费进度:
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
```rust
let assignments = consumer.assignments().await.unwrap();
```
按照指定的进度消费:
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
```rust
consumer.offset_seek(topic, vgroup_id, offset).await;
```
停止订阅:
```rust
consumer.unsubscribe().await;
```
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。
- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。 其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册