06-rust.mdx 15.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
---
toc_max_heading_level: 4
sidebar_label: Rust
title: TDengine Rust Connector
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

import Preparition from "./_preparition.mdx"
G
gccgdb1234 已提交
11 12 13
import RustInsert from "../07-develop/03-insert-data/_rust_sql.mdx"
import RustBind from "../07-develop/03-insert-data/_rust_stmt.mdx"
import RustQuery from "../07-develop/04-query-data/_rust.mdx"
14

15
[![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos)
16

17
`taos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。
18

19
`taos` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 **Websocket 连接**,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 `features`)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。
20

21
该 Rust 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-rust)。
22 23 24 25

## 支持的平台

原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。
26
Websocket 连接支持所有能运行 Rust 的平台。
27 28 29

## 版本支持

G
gccgdb1234 已提交
30
请参考[版本支持列表](../#版本支持)
31

32
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
33 34 35 36

## 安装

### 安装前准备
37

38
* 安装 Rust 开发工具链
G
gccgdb1234 已提交
39
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
40

41
### 添加 taos 依赖
42

43
根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖:
44

45 46
<Tabs defaultValue="default">
<TabItem value="default" label="同时支持">
47

48
在 `Cargo.toml` 文件中添加 [taos][taos]:
49 50 51 52

```toml
[dependencies]
# use default feature
53
taos = "*"
54 55
```

L
Linhe Huo 已提交
56 57
</TabItem>

58
<TabItem value="native" label="仅原生连接">
59

L
Linhe Huo 已提交
60
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性:
61 62 63

```toml
[dependencies]
64
taos = { version = "*", default-features = false, features = ["native"] }
65 66 67
```

</TabItem>
68
<TabItem value="rest" label="仅 Websocket">
69

70
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。
71 72 73

```toml
[dependencies]
74
taos = { version = "*", default-features = false, features = ["ws"] }
75 76
```

77 78 79
</TabItem>
</Tabs>

80 81
## 建立连接

82
[TaosBuilder] 通过 DSN 连接描述字符串创建一个连接构造器。
83 84

```rust
85
let builder = TaosBuilder::from_dsn("taos://")?;
86 87 88 89 90
```

现在您可以使用该对象创建连接:

```rust
91
let conn = builder.build()?;
92 93 94 95 96
```

连接对象可以创建多个:

```rust
97 98
let conn1 = builder.build()?;
let conn2 = builder.build()?;
99 100
```

101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
DSN 描述字符串基本结构如下:

```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver|   protocol |   | username  | password  | host | port |  database  |  params               |
```

各部分意义见下表:

- **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
  - **taos**: 表名使用 TDengine 连接器驱动。
  - **tmq**: 使用 TMQ 订阅数据。
  - **http/ws**: 使用 Websocket 创建连接。
  - **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
- **username/password**: 用于创建连接的用户名及密码。
- **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`,Websocket 连接默认为 `localhost:6041` 。
- **database**: 指定默认连接的数据库名。
- **params**:其他可选参数。

一个完整的 DSN 描述字符串示例如下:

```text
taos+ws://localhost:6041/test
```

表示使用 Websocket(`ws`)方式通过 `6041` 端口连接服务器 `localhost`,并指定默认数据库为 `test`。

这使得用户可以通过 DSN 指定连接方式:
131 132

```rust
133 134 135 136 137
use taos::*;

// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
138

139 140
//  use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
141 142
```

143
建立连接后,您可以进行相关数据库操作:
144 145

```rust
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
    // prepare database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
         TAGS (`groupid` INT, `location` BINARY(16))",
        // create child table
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
        // insert into child table
        "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
        // insert with NULL values
        "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
        // insert and automatically create table with tags if not exists
        "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
        // insert many records in a single sql
        "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
    ]).await?;

    assert_eq!(inserted, 6);
    let mut result = taos.query("select * from `meters`").await?;

    for field in result.fields() {
        println!("got field: {}", field.name());
176
    }
177 178

    let values = result.
179 180 181
}
```

182
查询数据可以通过两种方式:使用内建类型或 [serde](https://serde.rs) 序列化框架。
183

184 185 186 187 188 189 190 191
```rust
    // Query option 1, use rows stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("got value of {}: {}", name, value);
        }
    }
192

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
    // Query options 2, use deserialization with serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
    }
208

209 210 211 212 213 214 215 216 217 218
    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;

    dbg!(records);
    Ok(())
```
219

220
## 使用示例
221

222
### 写入数据
223

224
#### SQL 写入
225

226
<RustInsert />
227

228
#### STMT 写入
229

230
<RustBind />
231 232 233 234 235 236 237

### 查询数据

<RustQuery />

## API 参考

238
### 连接构造器
239

240
通过 DSN 来构建一个连接器构造器。
241 242

```rust
243
let cfg = TaosBuilder::default().build()?;
244 245
```

246
使用 `builder` 对象创建多个连接:
247 248

```rust
249
let conn: Taos = cfg.build();
250 251 252 253
```

### 连接池

254
在复杂应用中,建议启用连接池。[taos] 的连接池使用 [r2d2] 实现。
255 256 257 258

如下,可以生成一个默认参数的连接池。

```rust
259
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
260 261 262 263 264
```

同样可以使用连接池的构造器,对连接池参数进行设置:

```rust
265 266 267 268 269 270 271 272 273
let dsn = "taos://localhost:6030";

let opts = PoolBuilder::new()
    .max_size(5000) // max connections
    .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
    .min_idle(Some(1000)) // minimal idle connections
    .connection_timeout(Duration::from_secs(2));

let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
274 275 276 277 278 279 280 281 282 283
```

在应用代码中,使用 `pool.get()?` 来获取一个连接对象 [Taos]。

```rust
let taos = pool.get()?;
```

### 连接

284
[Taos][struct.Taos] 对象提供了多个数据库操作的 API:
285 286 287 288

1. `exec`: 执行某个非查询类 SQL 语句,例如 `CREATE`,`ALTER`,`INSERT` 等。

    ```rust
289 290 291 292 293 294 295 296 297 298 299
    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
    ```

2. `exec_many`: 同时(顺序)执行多个 SQL 语句。

    ```rust
    taos.exec_many([
        "CREATE DATABASE test",
        "USE test",
        "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
300 301
    ```

302
3. `query`:执行查询语句,返回 [ResultSet] 对象。
303 304

    ```rust
305
    let mut q = taos.query("select * from log.logs").await?;
306 307
    ```

308
    [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):
309

310
    列信息使用 [.fields()] 方法获取:
311 312

    ```rust
313
    let cols = q.fields();
314
    for col in cols {
315
        println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
316 317 318 319 320 321
    }
    ```

    逐行获取数据:

    ```rust
322 323 324 325 326 327 328 329
    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
        for (col, (name, value)) in row.enumerate() {
            println!(
                "[{}] got value in col {} (named `{:>8}`): {}",
                nrows, col, name, value
            );
330
        }
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
        nrows += 1;
    }
    ```

    或使用 [serde](https://serde.rs) 序列化框架。

    ```rust
    #[derive(Debug, Deserialize)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
350
    }
351 352 353 354 355 356 357

    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
358 359 360 361
    ```

需要注意的是,需要使用 Rust 异步函数和异步运行时。

362
[Taos][struct.Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率:
363 364 365 366 367 368 369 370 371

- `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。
- `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。
- `.use_database(database: &str)`: 执行 `USE` 语句。

除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。

### 参数绑定接口

372
与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]:
373 374

```rust
375 376
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
377 378 379 380
```

参数绑定对象提供了一组接口用于实现参数绑定:

381
#### `.set_tbname(name)`
382 383 384

用于绑定表名。

385 386 387 388 389 390
```rust
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```

#### `.set_tags(&[tag])`
391 392 393 394

当 SQL 语句使用超级表时,用于绑定子表表名和标签值:

```rust
395 396 397
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
398 399
```

400
#### `.bind(&[column])`
401

402
用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
403 404

```rust
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
let params = vec![
    ColumnView::from_millis_timestamp(vec![164000000000]),
    ColumnView::from_bools(vec![true]),
    ColumnView::from_tiny_ints(vec![i8::MAX]),
    ColumnView::from_small_ints(vec![i16::MAX]),
    ColumnView::from_ints(vec![i32::MAX]),
    ColumnView::from_big_ints(vec![i64::MAX]),
    ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
    ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
    ColumnView::from_unsigned_ints(vec![u32::MAX]),
    ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
    ColumnView::from_floats(vec![f32::MAX]),
    ColumnView::from_doubles(vec![f64::MAX]),
    ColumnView::from_varchar(vec!["ABC"]),
    ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
422 423
```

424
#### `.execute()`
425

426
执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。
427 428 429 430 431 432 433 434 435 436

```rust
stmt.execute()?;

// next bind cycle.
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;
```

437 438 439
一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。

### 订阅
440

441 442 443
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。

从 DSN 开始,构建一个 TMQ 连接器。
444 445

```rust
446
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
447 448
```

449
创建消费者:
450

451 452 453
```rust
let mut consumer = tmq.build()?;
```
454

455
消费者可订阅一个或多个 `TOPIC`。
456

457 458 459
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
460

461
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
462

463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
487
            }
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
        }
        consumer.commit(offset).await?;
    }
}
```

停止订阅:

```rust
consumer.unsubscribe().await;
```

对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。

- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。
  
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs).
509

510
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。
511

512
[taos]: https://github.com/taosdata/rust-connector-taos
513
[r2d2]: https://crates.io/crates/r2d2
514 515 516 517
[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html
[struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html