06-rust.mdx 17.1 KB
Newer Older
1 2
---
title: TDengine Rust Connector
D
danielclow 已提交
3 4 5
sidebar_label: Rust
description: This document describes the TDengine Rust connector.
toc_max_heading_level: 4
6 7 8 9 10
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

G
gccgdb1234 已提交
11
import Preparition from "./_preparation.mdx"
12
import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx"
13
import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx"
A
Adam Ji 已提交
14
import RustSml from "../../07-develop/03-insert-data/_rust_schemaless.mdx"
15 16
import RustQuery from "../../07-develop/04-query-data/_rust.mdx"

D
danielclow 已提交
17
[![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos)
18

D
danielclow 已提交
19
`taos` is the official Rust connector for TDengine. Rust developers can develop applications to access the TDengine instance data.
20

D
danielclow 已提交
21 22 23
`taos` provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is the **WebSocket connection**, which connects to TDengine instances via the WebSocket interface provided by taosAdapter. You can specify a connection type with Cargo features. By default, both types are supported. The Websocket connection can be used on any platform. The native connection can be used on any platform that the TDengine Client supports.

The source code for the Rust connectors is located on [GitHub](https://github.com/taosdata/taos-connector-rust).
24 25 26

## Supported platforms

D
danielclow 已提交
27 28
Native connections are supported on the same platforms as the TDengine client driver.
Websocket connections are supported on all platforms that can run Go.
29

A
Adam Ji 已提交
30 31 32 33 34 35 36 37
## Version history

| connector-rust version |  TDengine version |                   major features                    |
| :----------------: | :--------------: | :--------------------------------------------------: |
|       v0.8.8       |      3.0.5.0     | TMQ: get assignments and seek offset. |
|       v0.8.0       |      3.0.4.0     | Support schemaless insert. |
|       v0.7.6       |      3.0.3.0     | Support req_id in query. |
|       v0.6.0       |      3.0.0.0     | Base features. |
38

39
The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.
40 41 42

## Installation

D
danielclow 已提交
43
### Pre-installation preparation
44

45 46 47
* Install the Rust development toolchain
* If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver)

48
### Add taos dependency
49

D
danielclow 已提交
50
Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows:
51

52
<Tabs defaultValue="default">
D
danielclow 已提交
53
<TabItem value="default" label="Support Both">
54

D
danielclow 已提交
55
In `cargo.toml`, add [taos][taos]:
56

D
dingbo 已提交
57
```toml
58 59
[dependencies]
# use default feature
60
taos = "*"
61 62
```

D
dingbo 已提交
63
</TabItem>
64

65
<TabItem value="rest" label="Websocket only">
D
danielclow 已提交
66

67
In `cargo.toml`, add [taos][taos] and enable the ws feature:
68 69 70

```toml
[dependencies]
71
taos = { version = "*", default-features = false, features = ["ws"] }
72 73 74 75
```

</TabItem>

76 77 78
<TabItem value="native" label="native connection only">

In `cargo.toml`, add [taos][taos] and enable the native feature:
79 80 81

```toml
[dependencies]
82
taos = { version = "*", default-features = false, features = ["native"] }
83 84
```

85
</TabItem>
86

87 88
</Tabs>

D
danielclow 已提交
89
## Establishing a connection
90

D
danielclow 已提交
91
[TaosBuilder] creates a connection constructor through the DSN connection description string.
92 93

```rust
94
let builder = TaosBuilder::from_dsn("taos://")?;
95 96
```

D
danielclow 已提交
97
You can now use this object to create the connection.
98 99

```rust
100
let conn = builder.build()?;
101 102 103 104 105
```

The connection object can create more than one.

```rust
106 107 108 109
let conn1 = builder.build()?;
let conn2 = builder.build()?;
```

D
danielclow 已提交
110
The structure of the DSN description string is as follows:
111 112 113 114 115

```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver|   protocol |   | username  | password  | host | port |  database  |  params               |
116 117
```

D
danielclow 已提交
118 119 120 121 122 123 124 125 126 127
The parameters are described as follows:

- **driver**: Specify a driver name so that the connector can choose which method to use to establish the connection. Supported driver names are as follows:
  - **taos**: Table names use the TDengine connector driver.
  - **tmq**: Use the TMQ to subscribe to data.
  - **http/ws**: Use Websocket to establish connections.
  - **https/wss**: Use Websocket to establish connections, and enable SSL/TLS.
- **protocol**: Specify which connection method to use. For example, `taos+ws://localhost:6041` uses Websocket to establish connections.
- **username/password**: Username and password used to create connections.
- **host/port**: Specifies the server and port to establish a connection. If you do not specify a hostname or port, native connections default to `localhost:6030` and Websocket connections default to `localhost:6041`.
128
- **database**: Specify the default database to connect to. It's optional.
129
- **params**: Optional parameters.
D
danielclow 已提交
130 131

A sample DSN description string is as follows:
132 133 134 135 136

```text
taos+ws://localhost:6041/test
```

D
danielclow 已提交
137
This indicates that the Websocket connection method is used on port 6041 to connect to the server localhost and use the database `test` by default.
138

D
danielclow 已提交
139
You can create DSNs to connect to servers in your environment.
140 141

```rust
D
danielclow 已提交
142
use taos::*;
143

144 145 146 147 148 149
// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();

//  use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
150 151
```

D
danielclow 已提交
152
After the connection is established, you can perform operations on your database.
153 154

```rust
155 156 157 158 159 160 161 162 163 164 165 166
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
    // prepare database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
167
         TAGS (`groupid` INT, `location` BINARY(24))",
168
        // create child table
169
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
170 171 172 173 174
        // insert into child table
        "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
        // insert with NULL values
        "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
        // insert and automatically create table with tags if not exists
175
        "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
176 177 178 179 180 181 182 183 184
        // insert many records in a single sql
        "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
    ]).await?;

    assert_eq!(inserted, 6);
    let mut result = taos.query("select * from `meters`").await?;

    for field in result.fields() {
        println!("got field: {}", field.name());
185
    }
186 187

    let values = result.
188 189 190
}
```

D
danielclow 已提交
191
There are two ways to query data: Using built-in types or the [serde](https://serde.rs) deserialization framework.
192

193 194 195 196 197 198 199 200
```rust
    // Query option 1, use rows stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("got value of {}: {}", name, value);
        }
    }
201

202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
    // Query options 2, use deserialization with serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
    }
217

218 219 220 221 222 223
    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
224

225 226 227
    dbg!(records);
    Ok(())
```
228

229
## Usage examples
230

231
### Write data
232

233
#### SQL Write
234

235
<RustInsert />
236

D
danielclow 已提交
237
#### STMT Write
238

239
<RustBind />
240

A
Adam Ji 已提交
241 242 243 244
#### Schemaless Write

<RustSml />

245
### Query data
246

D
danielclow 已提交
247
<RustQuery />
248 249 250

## API Reference

D
danielclow 已提交
251
### Connector Constructor
252

D
danielclow 已提交
253
You create a connector constructor by using a DSN.
254 255

```rust
D
danielclow 已提交
256
let cfg = TaosBuilder::default().build()?;
257 258
```

D
danielclow 已提交
259
You use the builder object to create multiple connections.
260 261

```rust
262
let conn: Taos = cfg.build();
263 264
```

D
danielclow 已提交
265
### Connection pooling
266

D
danielclow 已提交
267
In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2].
268

D
danielclow 已提交
269
As follows, a connection pool with default parameters can be generated.
270 271

```rust
272
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
273 274
```

D
danielclow 已提交
275
You can set the same connection pool parameters using the connection pool's constructor.
276 277

```rust
278 279 280 281 282 283 284 285 286
let dsn = "taos://localhost:6030";

let opts = PoolBuilder::new()
    .max_size(5000) // max connections
    .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
    .min_idle(Some(1000)) // minimal idle connections
    .connection_timeout(Duration::from_secs(2));

let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
287 288
```

D
danielclow 已提交
289
In the application code, use `pool.get()? ` to get a connection object [Taos].
290 291

```rust
D
danielclow 已提交
292
let taos = pool.get()?;
293 294
```

295
### Connectors
296

D
danielclow 已提交
297
The [Taos][struct.Taos] object provides an API to perform operations on multiple databases.
298

D
danielclow 已提交
299
1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT`, etc.
300 301 302 303 304

    ```rust
    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
    ```

D
danielclow 已提交
305
2. `exec_many`: Run multiple SQL statements simultaneously or in order.
306 307

    ```rust
308 309 310 311 312
    taos.exec_many([
        "CREATE DATABASE test",
        "USE test",
        "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
313 314
    ```

D
danielclow 已提交
315
3. `query`: Run a query statement and return a [ResultSet] object.
316 317

    ```rust
D
danielclow 已提交
318
    let mut q = taos.query("select * from log.logs").await?;
319 320
    ```

D
danielclow 已提交
321
    The [ResultSet] object stores query result data and the names, types, and lengths of returned columns
322

D
danielclow 已提交
323
    You can obtain column information by using [.fields()].
324

D
dingbo 已提交
325
    ```rust
326
    let cols = q.fields();
327
    for col in cols {
328
        println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
329 330 331
    }
    ```

D
danielclow 已提交
332
    It fetches data line by line.
333 334

    ```rust
335 336 337 338 339 340 341 342
    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
        for (col, (name, value)) in row.enumerate() {
            println!(
                "[{}] got value in col {} (named `{:>8}`): {}",
                nrows, col, name, value
            );
343
        }
344 345 346 347
        nrows += 1;
    }
    ```

D
danielclow 已提交
348
    Or use the [serde](https://serde.rs) deserialization framework.
349 350 351 352 353 354 355 356 357 358 359 360 361 362

    ```rust
    #[derive(Debug, Deserialize)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
363
    }
364 365 366 367 368 369 370

    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
371 372 373 374
    ```

Note that Rust asynchronous functions and an asynchronous runtime are required.

D
danielclow 已提交
375
[Taos][struct.Taos] provides Rust methods for some SQL statements to reduce the number of `format!`s.
376 377 378 379 380

- `.describe(table: &str)`: Executes `DESCRIBE` and returns a Rust data structure.
- `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement.
- `.use_database(database: &str)`: Executes the `USE` statement.

D
danielclow 已提交
381 382 383
In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage.

### Bind Interface
384

D
danielclow 已提交
385
Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement.
386 387

```rust
388 389
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
390 391 392 393
```

The bind object provides a set of interfaces for implementing parameter binding.

394
#### `.set_tbname(name)`
395 396 397

To bind table names.

398 399 400 401 402 403
```rust
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```

#### `.set_tags(&[tag])`
404

D
danielclow 已提交
405
Bind sub-table table names and tag values when the SQL statement uses a super table.
406 407

```rust
408 409
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
D
danielclow 已提交
410
stmt.set_tags(&[Value::VarChar("taos".to_string())])?;
411 412
```

413
#### `.bind(&[column])`
414

D
danielclow 已提交
415
Bind value types. Use the [ColumnView] structure to create and bind the required types.
416 417

```rust
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
let params = vec![
    ColumnView::from_millis_timestamp(vec![164000000000]),
    ColumnView::from_bools(vec![true]),
    ColumnView::from_tiny_ints(vec![i8::MAX]),
    ColumnView::from_small_ints(vec![i16::MAX]),
    ColumnView::from_ints(vec![i32::MAX]),
    ColumnView::from_big_ints(vec![i64::MAX]),
    ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
    ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
    ColumnView::from_unsigned_ints(vec![u32::MAX]),
    ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
    ColumnView::from_floats(vec![f32::MAX]),
    ColumnView::from_doubles(vec![f64::MAX]),
    ColumnView::from_varchar(vec!["ABC"]),
    ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
435 436
```

437
#### `.execute()`
438

D
danielclow 已提交
439
Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`.
440 441

```rust
D
danielclow 已提交
442
stmt.execute()?;
443 444

// next bind cycle.
D
danielclow 已提交
445 446 447
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;
448 449
```

D
danielclow 已提交
450
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
451

D
danielclow 已提交
452
### Subscriptions
453

D
danielclow 已提交
454
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
455

D
danielclow 已提交
456
You create a TMQ connector by using a DSN.
457 458

```rust
459
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
460 461
```

D
danielclow 已提交
462
Create a consumer:
463

464 465 466
```rust
let mut consumer = tmq.build()?;
```
467

D
danielclow 已提交
468
A single consumer can subscribe to one or more topics.
469

470 471 472
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
473

D
danielclow 已提交
474
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
475

476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
500
            }
501 502 503 504 505 506
        }
        consumer.commit(offset).await?;
    }
}
```

A
Adam Ji 已提交
507 508
Get assignments:

A
Adam Ji 已提交
509
Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
A
Adam Ji 已提交
510

A
Adam Ji 已提交
511 512 513 514 515 516
```rust
let assignments = consumer.assignments().await.unwrap();
```

Seek offset:

A
Adam Ji 已提交
517
Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0
A
Adam Ji 已提交
518

A
Adam Ji 已提交
519 520 521 522
```rust
consumer.offset_seek(topic, vgroup_id, offset).await;
```

523 524 525 526 527 528
Unsubscribe:

```rust
consumer.unsubscribe().await;
```

D
danielclow 已提交
529
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
530

D
danielclow 已提交
531 532 533 534 535 536
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
  
A
Adam Ji 已提交
537
For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
538

D
danielclow 已提交
539
For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos).
540

D
danielclow 已提交
541
[taos]: https://github.com/taosdata/rust-connector-taos
542
[r2d2]: https://crates.io/crates/r2d2
D
danielclow 已提交
543 544 545 546
[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html
[struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html