rust.mdx 15.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
---
toc_max_heading_level: 4
sidebar_position: 5
sidebar_label: Rust
title: TDengine Rust Connector
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

import Preparation from "./_preparation.mdx"
import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx"
13
import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx"
14 15
import RustQuery from "../../07-develop/04-query-data/_rust.mdx"

16
[`taos`][taos] is the official Rust language connector for TDengine. Rust developers can develop applications to access the TDengine instance data.
17

18
Rust connector provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is **Websocket connection**, which connects to TDengine instances via taosAdapter service.
19

20
The source code is hosted on [taosdata/taos-connector-rust](https://github.com/taosdata/taos-connector-rust).
21 22 23 24 25 26 27 28 29 30

## Supported platforms

The platforms supported by native connections are the same as those supported by the TDengine client driver.
REST connections are supported on all platforms that can run Rust.

## Version support

Please refer to [version support list](/reference/connector#version-support).

31
The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.
32 33 34 35

## Installation

### Pre-installation
36

37 38 39
* Install the Rust development toolchain
* If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver)

40
### Add dependencies
41

42
Add the dependency to the [Rust](https://rust-lang.org) project as follows, depending on the connection method selected.
43

44 45
<Tabs defaultValue="default">
<TabItem value="default" label="Both">
46

47
Add [taos] to the `Cargo.toml` file.
48

D
dingbo 已提交
49
```toml
50 51
[dependencies]
# use default feature
52
taos = "*"
53 54
```

D
dingbo 已提交
55
</TabItem>
56
<TabItem value="native" label="Native only">
57

58
Add [taos] to the `Cargo.toml` file.
59 60 61

```toml
[dependencies]
62
taos = { version = "*", default-features = false, features = ["native"] }
63 64 65
```

</TabItem>
66
<TabItem value="rest" label="Websocket only">
67

68
Add [taos] to the `Cargo.toml` file and enable the `ws` feature.
69 70 71

```toml
[dependencies]
72
taos = { version = "*", default-features = false, features = ["ws"] }
73 74
```

75 76 77
</TabItem>
</Tabs>

78 79
## Create a connection

80
In rust connector, we use a DSN connection string as a connection builder. For example,
81 82

```rust
83
let builder = TaosBuilder::from_dsn("taos://")?;
84 85
```

86
You can now use connection client to create the connection.
87 88

```rust
89
let conn = builder.build()?;
90 91 92 93 94
```

The connection object can create more than one.

```rust
95 96 97 98 99 100 101 102 103 104 105 106
let conn1 = builder.build()?;
let conn2 = builder.build()?;
```

DSN is short for **D**ata **S**ource **N**ame string - [a data structure used to describe a connection to a data source](https://en.wikipedia.org/wiki/Data_source_name).

A common DSN is basically constructed as this:

```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver|   protocol |   | username  | password  | host | port |  database  |  params               |
107 108
```

109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
- **Driver**: the main entrypoint to a processer. **Required**. In Rust connector, the supported driver names are listed here:
  - **taos**: the legacy TDengine connection data source.
  - **tmq**: subscription data source from TDengine.
  - **http/ws**: use websocket protocol via `ws://` scheme.
  - **https/wss**: use websocket protocol via `wss://` scheme.
- **Protocol**: the additional information appended to driver, which can be be used to support different kind of data sources. By default, leave it empty for native driver(only under feature "native"), and `ws/wss` for websocket driver (only under feature "ws"). **Optional**.
- **Username**: as its definition, is the username to the connection. **Optional**.
- **Password**: the password of the username. **Optional**.
- **Host**: address host to the datasource. **Optional**.
- **Port**: address port to the datasource. **Optional**.
- **Database**: database name or collection name in the datasource. **Optional**.
- **Params**: a key-value map for any other informations to the datasource. **Optional**.

Here is a simple DSN connection string example:

```text
taos+ws://localhost:6041/test
```

which means connect `localhost` with port `6041` via `ws` protocol, and make `test` as the default database.

So that you can use DSN to specify connection protocol at runtime:
131 132

```rust
133
use taos::*; // use it like a `prelude` mod, we need some traits at next.
134

135 136 137 138 139 140
// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();

//  use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
141 142
```

143
After connected, you can perform the following operations on the database.
144 145

```rust
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
    // prepare database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
         TAGS (`groupid` INT, `location` BINARY(16))",
        // create child table
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
        // insert into child table
        "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
        // insert with NULL values
        "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
        // insert and automatically create table with tags if not exists
        "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
        // insert many records in a single sql
        "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
    ]).await?;

    assert_eq!(inserted, 6);
    let mut result = taos.query("select * from `meters`").await?;

    for field in result.fields() {
        println!("got field: {}", field.name());
176
    }
177 178

    let values = result.
179 180 181
}
```

182
Rust connector provides two kinds of ways to fetch data:
183

184 185 186 187 188 189 190 191
```rust
    // Query option 1, use rows stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("got value of {}: {}", name, value);
        }
    }
192

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
    // Query options 2, use deserialization with serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
    }
208

209 210 211 212 213 214
    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
215

216 217 218
    dbg!(records);
    Ok(())
```
219

220
## Usage examples
221

222
### Write data
223

224
#### SQL Write
225

226
<RustInsert />
227

228
#### Stmt bind
229

230
<RustBind />
231

232
### Query data
233

234
<RustQuery />|
235 236 237

## API Reference

238
### Connector builder
239

240
Use DSN to directly construct a TaosBuilder object.
241 242

```rust
243
let builder = TaosBuilder::from_dsn("")? ;
244 245
```

246
Use `builder` to create many connections:
247 248

```rust
249
let conn: Taos = cfg.build();
250 251
```

252
### Connection pool
253

254
In complex applications, we recommend enabling connection pools. Connection pool for [taos] is implemented using [r2d2] by enabling "r2d2" feature.
255

256
Basically, a connection pool with default parameters can be generated as:
257 258

```rust
259
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
260 261
```

262
You can set the connection pool parameters using the `PoolBuilder`.
263 264

```rust
265 266 267 268 269 270 271 272 273
let dsn = "taos://localhost:6030";

let opts = PoolBuilder::new()
    .max_size(5000) // max connections
    .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
    .min_idle(Some(1000)) // minimal idle connections
    .connection_timeout(Duration::from_secs(2));

let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
274 275
```

276
In the application code, use `pool.get()?` to get a connection object [Taos].
277 278 279 280 281

```rust
let taos = pool.get()? ;
```

282
### Connection methods
283

284 285 286 287 288 289 290 291 292
The [Taos] connection struct provides several APIs for convenient use.

1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT` etc. and return affected rows (only meaningful to `INSERT`).

    ```rust
    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
    ```

2. `exec_many`: You can execute many SQL statements in order with `exec_many` method.
293 294

    ```rust
295 296 297 298 299
    taos.exec_many([
        "CREATE DATABASE test",
        "USE test",
        "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
300 301
    ```

302
3. `query`: Execute the query statement and return the [ResultSet] object.
303 304

    ```rust
305
    let mut q = taos.query("select * from log.logs").await?
306 307
    ```

308
    The [ResultSet] object stores the query result data and basic information about the returned columns (column name, type, length).
309

310
    Get filed information with `fields` method.
311

D
dingbo 已提交
312
    ```rust
313
    let cols = q.fields();
314
    for col in cols {
315
        println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
316 317 318
    }
    ```

319
    Users could fetch data by rows.
320 321

    ```rust
322 323 324 325 326 327 328 329
    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
        for (col, (name, value)) in row.enumerate() {
            println!(
                "[{}] got value in col {} (named `{:>8}`): {}",
                nrows, col, name, value
            );
330
        }
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
        nrows += 1;
    }
    ```

    Or use it with [serde](https://serde.rs) deserialization.

    ```rust
    #[derive(Debug, Deserialize)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
350
    }
351 352 353 354 355 356 357

    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
358 359 360 361
    ```

Note that Rust asynchronous functions and an asynchronous runtime are required.

362
[Taos] provides a few Rust methods that encapsulate SQL to reduce the frequency of `format!` code blocks.
363 364 365 366 367

- `.describe(table: &str)`: Executes `DESCRIBE` and returns a Rust data structure.
- `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement.
- `.use_database(database: &str)`: Executes the `USE` statement.

368
### Bind API
369

370
Similar to the C interface, Rust provides the bind interface's wrapping. First, create a bind object [Stmt] for a SQL command with the [Taos] object.
371 372

```rust
373 374
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
375 376 377 378
```

The bind object provides a set of interfaces for implementing parameter binding.

379
#### `.set_tbname(name)`
380 381 382

To bind table names.

383 384 385 386 387 388
```rust
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```

#### `.set_tags(&[tag])`
389

390
Bind tag values when the SQL statement uses a super table.
391 392

```rust
393 394 395
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
396 397
```

398
#### `.bind(&[column])`
399

400
Bind value types. Use the [ColumnView] structure to construct the desired type and bind.
401 402

```rust
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
let params = vec![
    ColumnView::from_millis_timestamp(vec![164000000000]),
    ColumnView::from_bools(vec![true]),
    ColumnView::from_tiny_ints(vec![i8::MAX]),
    ColumnView::from_small_ints(vec![i16::MAX]),
    ColumnView::from_ints(vec![i32::MAX]),
    ColumnView::from_big_ints(vec![i64::MAX]),
    ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
    ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
    ColumnView::from_unsigned_ints(vec![u32::MAX]),
    ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
    ColumnView::from_floats(vec![f32::MAX]),
    ColumnView::from_doubles(vec![f64::MAX]),
    ColumnView::from_varchar(vec!["ABC"]),
    ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
420 421
```

422
#### `.execute()`
423

424
Execute to insert all bind records. [Stmt] objects can be reused, re-bind, and executed after execution. Remember to call `add_batch` before `execute`.
425 426

```rust
427
stmt.add_batch()?.execute()?;
428 429 430 431

// next bind cycle.
// stmt.set_tbname()? ;
//stmt.bind()? ;
432
//stmt.add_batch().execute()? ;
433 434
```

435
A runnable example for bind can be found [here](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
436

437 438 439 440 441
### Subscription API

Users can subscribe a [TOPIC](../../../taos-sql/tmq/) with TMQ(the TDengine Message Queue) API.

Start from a TMQ builder:
442 443

```rust
444
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
445 446
```

447
Build a consumer:
448

449 450 451
```rust
let mut consumer = tmq.build()?;
```
452

453
Subscribe a topic:
454

455 456 457
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
458

459
Consume messages, and commit the offset for each message.
460

461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
485
            }
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
        }
        consumer.commit(offset).await?;
    }
}
```

Unsubscribe:

```rust
consumer.unsubscribe().await;
```

In TMQ DSN, you must choose to subscribe with a group id. Also, there's several options could be set:

- `group.id`: **Required**, a group id is any visible string you set.
- `client.id`: a optional client description string.
- `auto.offset.reset`: choose to subscribe from *earliest* or *latest*, default is *none* which means 'earliest'.
- `enable.auto.commit`: automatically commit with specified time interval. By default - in the recommended way _ you must use `commit` to ensure that you've consumed the messages correctly, otherwise, consumers will received repeated messages when re-subscribe.
- `auto.commit.interval.ms`: the auto commit interval in milliseconds.

Check the whole subscription example at [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs).
507

508
Please move to the Rust documentation hosting page for other related structure API usage instructions: <https://docs.rs/taos>.
509

510
[TDengine]: https://github.com/taosdata/TDengine
511
[r2d2]: https://crates.io/crates/r2d2
512 513 514 515 516
[Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[ResultSet]: https://docs.rs/taos/latest/taos/struct.ResultSet.html
[Value]: https://docs.rs/taos/latest/taos/enum.Value.html
[Stmt]: https://docs.rs/taos/latest/taos/stmt/struct.Stmt.html
[taos]: https://crates.io/crates/taos