06-rust.mdx 15.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
---
toc_max_heading_level: 4
sidebar_position: 5
sidebar_label: Rust
title: TDengine Rust Connector
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

import Preparition from "./_preparition.mdx"
G
gccgdb1234 已提交
12 13 14
import RustInsert from "../07-develop/03-insert-data/_rust_sql.mdx"
import RustBind from "../07-develop/03-insert-data/_rust_stmt.mdx"
import RustQuery from "../07-develop/04-query-data/_rust.mdx"
15

16
[![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos)
17

18
`taos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。
19

20
`taos` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 **Websocket 连接**,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 `features`)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。
21

22
该 Rust 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-rust)。
23 24 25 26

## 支持的平台

原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。
27
Websocket 连接支持所有能运行 Rust 的平台。
28 29 30

## 版本支持

G
gccgdb1234 已提交
31
请参考[版本支持列表](../#版本支持)
32

33
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
34 35 36 37

## 安装

### 安装前准备
38

39
* 安装 Rust 开发工具链
G
gccgdb1234 已提交
40
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
41

42
### 添加 taos 依赖
43

44
根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖:
45

46 47
<Tabs defaultValue="default">
<TabItem value="default" label="同时支持">
48

49
在 `Cargo.toml` 文件中添加 [taos][taos]:
50 51 52 53

```toml
[dependencies]
# use default feature
54
taos = "*"
55 56
```

L
Linhe Huo 已提交
57 58
</TabItem>

59
<TabItem value="native" label="仅原生连接">
60

L
Linhe Huo 已提交
61
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性:
62 63 64

```toml
[dependencies]
65
taos = { version = "*", default-features = false, features = ["native"] }
66 67 68
```

</TabItem>
69
<TabItem value="rest" label="仅 Websocket">
70

71
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。
72 73 74

```toml
[dependencies]
75
taos = { version = "*", default-features = false, features = ["ws"] }
76 77
```

78 79 80
</TabItem>
</Tabs>

81 82
## 建立连接

83
[TaosBuilder] 通过 DSN 连接描述字符串创建一个连接构造器。
84 85

```rust
86
let builder = TaosBuilder::from_dsn("taos://")?;
87 88 89 90 91
```

现在您可以使用该对象创建连接:

```rust
92
let conn = builder.build()?;
93 94 95 96 97
```

连接对象可以创建多个:

```rust
98 99
let conn1 = builder.build()?;
let conn2 = builder.build()?;
100 101
```

102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
DSN 描述字符串基本结构如下:

```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver|   protocol |   | username  | password  | host | port |  database  |  params               |
```

各部分意义见下表:

- **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
  - **taos**: 表名使用 TDengine 连接器驱动。
  - **tmq**: 使用 TMQ 订阅数据。
  - **http/ws**: 使用 Websocket 创建连接。
  - **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
- **username/password**: 用于创建连接的用户名及密码。
- **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`,Websocket 连接默认为 `localhost:6041` 。
- **database**: 指定默认连接的数据库名。
- **params**:其他可选参数。

一个完整的 DSN 描述字符串示例如下:

```text
taos+ws://localhost:6041/test
```

表示使用 Websocket(`ws`)方式通过 `6041` 端口连接服务器 `localhost`,并指定默认数据库为 `test`。

这使得用户可以通过 DSN 指定连接方式:
132 133

```rust
134 135 136 137 138
use taos::*;

// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
139

140 141
//  use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
142 143
```

144
建立连接后,您可以进行相关数据库操作:
145 146

```rust
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
    // prepare database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
         TAGS (`groupid` INT, `location` BINARY(16))",
        // create child table
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
        // insert into child table
        "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
        // insert with NULL values
        "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
        // insert and automatically create table with tags if not exists
        "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
        // insert many records in a single sql
        "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
    ]).await?;

    assert_eq!(inserted, 6);
    let mut result = taos.query("select * from `meters`").await?;

    for field in result.fields() {
        println!("got field: {}", field.name());
177
    }
178 179

    let values = result.
180 181 182
}
```

183
查询数据可以通过两种方式:使用内建类型或 [serde](https://serde.rs) 序列化框架。
184

185 186 187 188 189 190 191 192
```rust
    // Query option 1, use rows stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("got value of {}: {}", name, value);
        }
    }
193

194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
    // Query options 2, use deserialization with serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
    }
209

210 211 212 213 214 215 216 217 218 219
    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;

    dbg!(records);
    Ok(())
```
220

221
## 使用示例
222

223
### 写入数据
224

225
#### SQL 写入
226

227
<RustInsert />
228

229
#### STMT 写入
230

231
<RustBind />
232 233 234 235 236 237 238

### 查询数据

<RustQuery />

## API 参考

239
### 连接构造器
240

241
通过 DSN 来构建一个连接器构造器。
242 243

```rust
244
let cfg = TaosBuilder::default().build()?;
245 246
```

247
使用 `builder` 对象创建多个连接:
248 249

```rust
250
let conn: Taos = cfg.build();
251 252 253 254
```

### 连接池

255
在复杂应用中,建议启用连接池。[taos] 的连接池使用 [r2d2] 实现。
256 257 258 259

如下,可以生成一个默认参数的连接池。

```rust
260
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
261 262 263 264 265
```

同样可以使用连接池的构造器,对连接池参数进行设置:

```rust
266 267 268 269 270 271 272 273 274
let dsn = "taos://localhost:6030";

let opts = PoolBuilder::new()
    .max_size(5000) // max connections
    .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
    .min_idle(Some(1000)) // minimal idle connections
    .connection_timeout(Duration::from_secs(2));

let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
275 276 277 278 279 280 281 282 283 284
```

在应用代码中,使用 `pool.get()?` 来获取一个连接对象 [Taos]。

```rust
let taos = pool.get()?;
```

### 连接

285
[Taos][struct.Taos] 对象提供了多个数据库操作的 API:
286 287 288 289

1. `exec`: 执行某个非查询类 SQL 语句,例如 `CREATE`,`ALTER`,`INSERT` 等。

    ```rust
290 291 292 293 294 295 296 297 298 299 300
    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
    ```

2. `exec_many`: 同时(顺序)执行多个 SQL 语句。

    ```rust
    taos.exec_many([
        "CREATE DATABASE test",
        "USE test",
        "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
301 302
    ```

303
3. `query`:执行查询语句,返回 [ResultSet] 对象。
304 305

    ```rust
306
    let mut q = taos.query("select * from log.logs").await?;
307 308
    ```

309
    [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):
310

311
    列信息使用 [.fields()] 方法获取:
312 313

    ```rust
314
    let cols = q.fields();
315
    for col in cols {
316
        println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
317 318 319 320 321 322
    }
    ```

    逐行获取数据:

    ```rust
323 324 325 326 327 328 329 330
    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
        for (col, (name, value)) in row.enumerate() {
            println!(
                "[{}] got value in col {} (named `{:>8}`): {}",
                nrows, col, name, value
            );
331
        }
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
        nrows += 1;
    }
    ```

    或使用 [serde](https://serde.rs) 序列化框架。

    ```rust
    #[derive(Debug, Deserialize)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
351
    }
352 353 354 355 356 357 358

    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
359 360 361 362
    ```

需要注意的是,需要使用 Rust 异步函数和异步运行时。

363
[Taos][struct.Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率:
364 365 366 367 368 369 370 371 372

- `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。
- `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。
- `.use_database(database: &str)`: 执行 `USE` 语句。

除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。

### 参数绑定接口

373
与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]:
374 375

```rust
376 377
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
378 379 380 381
```

参数绑定对象提供了一组接口用于实现参数绑定:

382
#### `.set_tbname(name)`
383 384 385

用于绑定表名。

386 387 388 389 390 391
```rust
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```

#### `.set_tags(&[tag])`
392 393 394 395

当 SQL 语句使用超级表时,用于绑定子表表名和标签值:

```rust
396 397 398
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
399 400
```

401
#### `.bind(&[column])`
402

403
用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
404 405

```rust
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
let params = vec![
    ColumnView::from_millis_timestamp(vec![164000000000]),
    ColumnView::from_bools(vec![true]),
    ColumnView::from_tiny_ints(vec![i8::MAX]),
    ColumnView::from_small_ints(vec![i16::MAX]),
    ColumnView::from_ints(vec![i32::MAX]),
    ColumnView::from_big_ints(vec![i64::MAX]),
    ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
    ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
    ColumnView::from_unsigned_ints(vec![u32::MAX]),
    ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
    ColumnView::from_floats(vec![f32::MAX]),
    ColumnView::from_doubles(vec![f64::MAX]),
    ColumnView::from_varchar(vec!["ABC"]),
    ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
423 424
```

425
#### `.execute()`
426

427
执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。
428 429 430 431 432 433 434 435 436 437

```rust
stmt.execute()?;

// next bind cycle.
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;
```

438 439 440
一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。

### 订阅
441

442 443 444
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。

从 DSN 开始,构建一个 TMQ 连接器。
445 446

```rust
447
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
448 449
```

450
创建消费者:
451

452 453 454
```rust
let mut consumer = tmq.build()?;
```
455

456
消费者可订阅一个或多个 `TOPIC`。
457

458 459 460
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
461

462
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
463

464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
488
            }
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
        }
        consumer.commit(offset).await?;
    }
}
```

停止订阅:

```rust
consumer.unsubscribe().await;
```

对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。

- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。
  
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs).
510

511
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。
512

513
[taos]: https://github.com/taosdata/rust-connector-taos
514
[r2d2]: https://crates.io/crates/r2d2
515 516 517 518
[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html
[struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html