26-rust.mdx 19.0 KB
Newer Older
1 2 3 4 5 6 7 8 9
---
toc_max_heading_level: 4
sidebar_label: Rust
title: TDengine Rust Connector
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

10
import Preparation from "./_preparation.mdx"
G
gccgdb1234 已提交
11 12
import RustInsert from "../07-develop/03-insert-data/_rust_sql.mdx"
import RustBind from "../07-develop/03-insert-data/_rust_stmt.mdx"
A
Adam Ji 已提交
13
import RustSml from "../07-develop/03-insert-data/_rust_schemaless.mdx"
G
gccgdb1234 已提交
14
import RustQuery from "../07-develop/04-query-data/_rust.mdx"
15

16
[![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos)
17

18
`taos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。
19

20
`taos` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 **Websocket 连接**,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 `features`)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。
21

22
该 Rust 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-rust)。
23 24 25 26

## 支持的平台

原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。
27
Websocket 连接支持所有能运行 Rust 的平台。
28

A
Adam Ji 已提交
29 30 31 32
## 版本历史

|   Rust 连接器版本   |   TDengine 版本   |                        主要功能                       |
| :----------------: | :--------------: | :--------------------------------------------------: |
A
Adam Ji 已提交
33
|       v0.8.12       |      3.0.5.0 or later    | 消息订阅:获取消费进度及按照指定进度开始消费。 |
A
Adam Ji 已提交
34 35 36
|       v0.8.0       |      3.0.4.0     | 支持无模式写入。 |
|       v0.7.6       |      3.0.3.0     | 支持在请求中使用 req_id。 |
|       v0.6.0       |      3.0.0.0     | 基础功能。 |
37

38
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
39

A
Adam Ji 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
## 处理错误

在报错后,可以获取到错误的具体信息:

```rust
match conn.exec(sql) {
    Ok(_) => {
        Ok(())
    }
    Err(e) => {
        eprintln!("ERROR: {:?}", e);
        Err(e)
    }
}
```

## TDengine DataType 和 Rust DataType

TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对应类型转换如下:

| TDengine DataType | Rust DataType     |
| ----------------- | ----------------- |
| TIMESTAMP         | Timestamp         |
| INT               | i32               |
| BIGINT            | i64               |
| FLOAT             | f32               |
| DOUBLE            | f64               |
| SMALLINT          | i16               |
| TINYINT           | i8                |
| BOOL              | bool              |
| BINARY            | Vec<u8\>          |
| NCHAR             | String            |
| JSON              | serde_json::Value |

**注意**:JSON 类型仅在 tag 中支持。

## 安装步骤
77 78

### 安装前准备
79

80
* 安装 Rust 开发工具链
G
gccgdb1234 已提交
81
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
82

A
Adam Ji 已提交
83
### 安装连接器
84

85
根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖:
86

87 88
<Tabs defaultValue="default">
<TabItem value="default" label="同时支持">
89

90
在 `Cargo.toml` 文件中添加 [taos][taos]:
91 92 93 94

```toml
[dependencies]
# use default feature
95
taos = "*"
96 97
```

L
Linhe Huo 已提交
98 99
</TabItem>

100
<TabItem value="rest" label="仅 Websocket">
101

102
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。
103 104 105

```toml
[dependencies]
106
taos = { version = "*", default-features = false, features = ["ws"] }
107 108
```

109 110 111 112 113 114 115
当仅启用 `ws` 特性时,可同时指定 `r2d2` 使得在同步(blocking/sync)模式下使用 [r2d2] 作为连接池:

```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["r2d2", "ws"] }
```

116 117
</TabItem>

118 119 120
<TabItem value="native" label="仅原生连接">

在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性:
121 122 123

```toml
[dependencies]
124
taos = { version = "*", default-features = false, features = ["native"] }
125 126
```

127 128 129
</TabItem>
</Tabs>

130 131
## 建立连接

132
[TaosBuilder] 通过 DSN 连接描述字符串创建一个连接构造器。
133 134

```rust
135
let builder = TaosBuilder::from_dsn("taos://")?;
136 137 138 139 140
```

现在您可以使用该对象创建连接:

```rust
141
let conn = builder.build()?;
142 143 144 145 146
```

连接对象可以创建多个:

```rust
147 148
let conn1 = builder.build()?;
let conn2 = builder.build()?;
149 150
```

151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
DSN 描述字符串基本结构如下:

```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver|   protocol |   | username  | password  | host | port |  database  |  params               |
```

各部分意义见下表:

- **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
  - **taos**: 表名使用 TDengine 连接器驱动。
  - **tmq**: 使用 TMQ 订阅数据。
  - **http/ws**: 使用 Websocket 创建连接。
  - **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
- **username/password**: 用于创建连接的用户名及密码。
- **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`,Websocket 连接默认为 `localhost:6041` 。
169
- **database**: 指定默认连接的数据库名,可选参数。
170 171 172 173 174 175 176 177 178 179 180
- **params**:其他可选参数。

一个完整的 DSN 描述字符串示例如下:

```text
taos+ws://localhost:6041/test
```

表示使用 Websocket(`ws`)方式通过 `6041` 端口连接服务器 `localhost`,并指定默认数据库为 `test`。

这使得用户可以通过 DSN 指定连接方式:
181 182

```rust
183 184 185 186 187
use taos::*;

// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
188

189
//  use websocket protocol.
A
Adam Ji 已提交
190 191
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
192 193
```

194
建立连接后,您可以进行相关数据库操作:
195 196

```rust
197 198 199 200 201 202 203 204 205 206 207 208
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
    // prepare database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
209
         TAGS (`groupid` INT, `location` BINARY(24))",
210
        // create child table
211
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
212 213 214 215 216
        // insert into child table
        "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
        // insert with NULL values
        "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
        // insert and automatically create table with tags if not exists
217
        "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
218 219 220 221 222 223 224 225 226
        // insert many records in a single sql
        "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
    ]).await?;

    assert_eq!(inserted, 6);
    let mut result = taos.query("select * from `meters`").await?;

    for field in result.fields() {
        println!("got field: {}", field.name());
227
    }
228 229

    let values = result.
230 231 232
}
```

233
查询数据可以通过两种方式:使用内建类型或 [serde](https://serde.rs) 序列化框架。
234

235 236 237 238 239 240 241 242
```rust
    // Query option 1, use rows stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("got value of {}: {}", name, value);
        }
    }
243

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
    // Query options 2, use deserialization with serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
    }
259

260 261 262 263 264 265 266 267 268 269
    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;

    dbg!(records);
    Ok(())
```
270

271
## 使用示例
272

A
Adam Ji 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
### 创建数据库和表

```rust
use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let dsn = "taos://localhost:6030";
    let builder = TaosBuilder::from_dsn(dsn)?;

    let taos = builder.build()?;

    let db = "query";

    // create database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    // create table
    taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
         TAGS (`groupid` INT, `location` BINARY(16))",
        // create child table
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
    ]).await?;
}
```
305

A
Adam Ji 已提交
306
### 插入数据
307

308
<RustInsert />
309

A
Adam Ji 已提交
310 311 312 313 314 315 316 317 318 319 320 321 322
### 查询数据

<RustQuery />

### 执行带有 req_id 的 SQL

此 req_id 可用于请求链路追踪。

```rust
let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?;
```

### 通过参数绑定写入数据
323

324
<RustBind />
325

A
Adam Ji 已提交
326
### 无模式写入
A
Adam Ji 已提交
327 328 329

<RustSml />

A
Adam Ji 已提交
330
### 执行带有 req_id 的无模式写入
331

A
Adam Ji 已提交
332
此 req_id 可用于请求链路追踪。
333

A
Adam Ji 已提交
334 335 336 337 338 339 340 341 342 343 344
```rust
let sml_data = SmlDataBuilder::default()
    .protocol(SchemalessProtocol::Line)
    .data(data)
    .req_id(100u64)
    .build()?;

client.put(&sml_data)?
```

### 数据订阅
345

A
Adam Ji 已提交
346
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
347

A
Adam Ji 已提交
348
#### 创建 Topic
349 350

```rust
A
Adam Ji 已提交
351 352 353 354 355
taos.exec_many([
    // create topic for subscription
    format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
])
.await?;
356 357
```

A
Adam Ji 已提交
358 359 360
#### 创建 Consumer

从 DSN 开始,构建一个 TMQ 连接器。
361 362

```rust
A
Adam Ji 已提交
363
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
364 365
```

A
Adam Ji 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
创建消费者:

```rust
let mut consumer = tmq.build()?;
```

#### 订阅消费数据

消费者可订阅一个或多个 `TOPIC`。

```rust
consumer.subscribe(["tmq_meters"]).await?;
```

TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。

```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
            }
        }
        consumer.commit(offset).await?;
    }
}
```

获取消费进度:

版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0

```rust
let assignments = consumer.assignments().await.unwrap();
```

#### 指定订阅 Offset

按照指定的进度消费:

版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0

```rust
consumer.offset_seek(topic, vgroup_id, offset).await;
```

#### 关闭订阅

```rust
consumer.unsubscribe().await;
```

对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。

- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。

#### 完整示例

完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).

### 与连接池使用
450

451
在复杂应用中,建议启用连接池。[taos] 的连接池默认(异步模式)使用 [deadpool] 实现。
452 453 454 455

如下,可以生成一个默认参数的连接池。

```rust
456 457 458 459
let pool: Pool<TaosBuilder> = TaosBuilder::from_dsn("taos:///")
    .unwrap()
    .pool()
    .unwrap();
460 461 462 463 464
```

同样可以使用连接池的构造器,对连接池参数进行设置:

```rust
465 466 467 468
let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).unwrap().0)
    .max_size(88)  // 最大连接数
    .build()
    .unwrap();
469 470 471 472 473 474 475 476
```

在应用代码中,使用 `pool.get()?` 来获取一个连接对象 [Taos]。

```rust
let taos = pool.get()?;
```

A
Adam Ji 已提交
477 478 479 480 481 482 483 484 485 486 487
### 更多示例程序

示例程序源码位于 `TDengine/examples/rust` 下:

请参考:[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust)

## 常见问题

请参考 [FAQ](../../../train-faq/faq)

## API 参考
488

489
[Taos][struct.Taos] 对象提供了多个数据库操作的 API:
490 491 492 493

1. `exec`: 执行某个非查询类 SQL 语句,例如 `CREATE`,`ALTER`,`INSERT` 等。

    ```rust
494 495 496 497 498 499 500 501 502 503 504
    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
    ```

2. `exec_many`: 同时(顺序)执行多个 SQL 语句。

    ```rust
    taos.exec_many([
        "CREATE DATABASE test",
        "USE test",
        "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
505 506
    ```

507
3. `query`:执行查询语句,返回 [ResultSet] 对象。
508 509

    ```rust
510
    let mut q = taos.query("select * from log.logs").await?;
511 512
    ```

513
    [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):
514

515
    列信息使用 [.fields()] 方法获取:
516 517

    ```rust
518
    let cols = q.fields();
519
    for col in cols {
520
        println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
521 522 523 524 525 526
    }
    ```

    逐行获取数据:

    ```rust
527 528 529 530 531 532 533 534
    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
        for (col, (name, value)) in row.enumerate() {
            println!(
                "[{}] got value in col {} (named `{:>8}`): {}",
                nrows, col, name, value
            );
535
        }
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554
        nrows += 1;
    }
    ```

    或使用 [serde](https://serde.rs) 序列化框架。

    ```rust
    #[derive(Debug, Deserialize)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
555
    }
556 557 558 559 560 561 562

    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
563 564 565 566
    ```

需要注意的是,需要使用 Rust 异步函数和异步运行时。

567
[Taos][struct.Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率:
568 569 570 571 572 573 574

- `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。
- `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。
- `.use_database(database: &str)`: 执行 `USE` 语句。

除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。

A
Adam Ji 已提交
575
参数绑定接口
576

577
与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]:
578 579

```rust
580 581
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
582 583 584 585
```

参数绑定对象提供了一组接口用于实现参数绑定:

A
Adam Ji 已提交
586
`.set_tbname(name)`
587 588 589

用于绑定表名。

590 591 592 593 594
```rust
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```

A
Adam Ji 已提交
595
`.set_tags(&[tag])`
596 597 598 599

当 SQL 语句使用超级表时,用于绑定子表表名和标签值:

```rust
600 601 602
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
603 604
```

A
Adam Ji 已提交
605
`.bind(&[column])`
606

607
用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
608 609

```rust
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
let params = vec![
    ColumnView::from_millis_timestamp(vec![164000000000]),
    ColumnView::from_bools(vec![true]),
    ColumnView::from_tiny_ints(vec![i8::MAX]),
    ColumnView::from_small_ints(vec![i16::MAX]),
    ColumnView::from_ints(vec![i32::MAX]),
    ColumnView::from_big_ints(vec![i64::MAX]),
    ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
    ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
    ColumnView::from_unsigned_ints(vec![u32::MAX]),
    ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
    ColumnView::from_floats(vec![f32::MAX]),
    ColumnView::from_doubles(vec![f64::MAX]),
    ColumnView::from_varchar(vec!["ABC"]),
    ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
627 628
```

A
Adam Ji 已提交
629
`.execute()`
630

631
执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。
632 633 634 635 636 637 638 639 640 641

```rust
stmt.execute()?;

// next bind cycle.
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;
```

642 643
一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。

644

645
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。
646

647
[taos]: https://github.com/taosdata/rust-connector-taos
648
[deadpool]: https://crates.io/crates/deadpool
649
[r2d2]: https://crates.io/crates/r2d2
650 651 652 653
[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html
[struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html