06-rust.mdx 16.3 KB
Newer Older
1 2
---
title: TDengine Rust Connector
D
danielclow 已提交
3 4 5
sidebar_label: Rust
description: This document describes the TDengine Rust connector.
toc_max_heading_level: 4
6 7 8 9 10
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

G
gccgdb1234 已提交
11
import Preparition from "./_preparation.mdx"
12
import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx"
13
import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx"
A
Adam Ji 已提交
14
import RustSml from "../../07-develop/03-insert-data/_rust_schemaless.mdx"
15 16
import RustQuery from "../../07-develop/04-query-data/_rust.mdx"

D
danielclow 已提交
17
[![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos)
18

D
danielclow 已提交
19
`taos` is the official Rust connector for TDengine. Rust developers can develop applications to access the TDengine instance data.
20

D
danielclow 已提交
21 22 23
`taos` provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is the **WebSocket connection**, which connects to TDengine instances via the WebSocket interface provided by taosAdapter. You can specify a connection type with Cargo features. By default, both types are supported. The Websocket connection can be used on any platform. The native connection can be used on any platform that the TDengine Client supports.

The source code for the Rust connectors is located on [GitHub](https://github.com/taosdata/taos-connector-rust).
24 25 26

## Supported platforms

D
danielclow 已提交
27 28
Native connections are supported on the same platforms as the TDengine client driver.
Websocket connections are supported on all platforms that can run Go.
29 30 31

## Version support

D
danielclow 已提交
32
Please refer to [version support list](/reference/connector#version-support)
33

34
The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.
35 36 37

## Installation

D
danielclow 已提交
38
### Pre-installation preparation
39

40 41 42
* Install the Rust development toolchain
* If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver)

43
### Add taos dependency
44

D
danielclow 已提交
45
Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows:
46

47
<Tabs defaultValue="default">
D
danielclow 已提交
48
<TabItem value="default" label="Support Both">
49

D
danielclow 已提交
50
In `cargo.toml`, add [taos][taos]:
51

D
dingbo 已提交
52
```toml
53 54
[dependencies]
# use default feature
55
taos = "*"
56 57
```

D
dingbo 已提交
58
</TabItem>
59

60
<TabItem value="rest" label="Websocket only">
D
danielclow 已提交
61

62
In `cargo.toml`, add [taos][taos] and enable the ws feature:
63 64 65

```toml
[dependencies]
66
taos = { version = "*", default-features = false, features = ["ws"] }
67 68 69 70
```

</TabItem>

71 72 73
<TabItem value="native" label="native connection only">

In `cargo.toml`, add [taos][taos] and enable the native feature:
74 75 76

```toml
[dependencies]
77
taos = { version = "*", default-features = false, features = ["native"] }
78 79
```

80
</TabItem>
81

82 83
</Tabs>

D
danielclow 已提交
84
## Establishing a connection
85

D
danielclow 已提交
86
[TaosBuilder] creates a connection constructor through the DSN connection description string.
87 88

```rust
89
let builder = TaosBuilder::from_dsn("taos://")?;
90 91
```

D
danielclow 已提交
92
You can now use this object to create the connection.
93 94

```rust
95
let conn = builder.build()?;
96 97 98 99 100
```

The connection object can create more than one.

```rust
101 102 103 104
let conn1 = builder.build()?;
let conn2 = builder.build()?;
```

D
danielclow 已提交
105
The structure of the DSN description string is as follows:
106 107 108 109 110

```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver|   protocol |   | username  | password  | host | port |  database  |  params               |
111 112
```

D
danielclow 已提交
113 114 115 116 117 118 119 120 121 122
The parameters are described as follows:

- **driver**: Specify a driver name so that the connector can choose which method to use to establish the connection. Supported driver names are as follows:
  - **taos**: Table names use the TDengine connector driver.
  - **tmq**: Use the TMQ to subscribe to data.
  - **http/ws**: Use Websocket to establish connections.
  - **https/wss**: Use Websocket to establish connections, and enable SSL/TLS.
- **protocol**: Specify which connection method to use. For example, `taos+ws://localhost:6041` uses Websocket to establish connections.
- **username/password**: Username and password used to create connections.
- **host/port**: Specifies the server and port to establish a connection. If you do not specify a hostname or port, native connections default to `localhost:6030` and Websocket connections default to `localhost:6041`.
123
- **database**: Specify the default database to connect to. It's optional.
124
- **params**: Optional parameters.
D
danielclow 已提交
125 126

A sample DSN description string is as follows:
127 128 129 130 131

```text
taos+ws://localhost:6041/test
```

D
danielclow 已提交
132
This indicates that the Websocket connection method is used on port 6041 to connect to the server localhost and use the database `test` by default.
133

D
danielclow 已提交
134
You can create DSNs to connect to servers in your environment.
135 136

```rust
D
danielclow 已提交
137
use taos::*;
138

139 140 141 142 143 144
// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();

//  use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
145 146
```

D
danielclow 已提交
147
After the connection is established, you can perform operations on your database.
148 149

```rust
150 151 152 153 154 155 156 157 158 159 160 161
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
    // prepare database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
162
         TAGS (`groupid` INT, `location` BINARY(24))",
163
        // create child table
164
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
165 166 167 168 169
        // insert into child table
        "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
        // insert with NULL values
        "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
        // insert and automatically create table with tags if not exists
170
        "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
171 172 173 174 175 176 177 178 179
        // insert many records in a single sql
        "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
    ]).await?;

    assert_eq!(inserted, 6);
    let mut result = taos.query("select * from `meters`").await?;

    for field in result.fields() {
        println!("got field: {}", field.name());
180
    }
181 182

    let values = result.
183 184 185
}
```

D
danielclow 已提交
186
There are two ways to query data: Using built-in types or the [serde](https://serde.rs) deserialization framework.
187

188 189 190 191 192 193 194 195
```rust
    // Query option 1, use rows stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("got value of {}: {}", name, value);
        }
    }
196

197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
    // Query options 2, use deserialization with serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
    }
212

213 214 215 216 217 218
    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
219

220 221 222
    dbg!(records);
    Ok(())
```
223

224
## Usage examples
225

226
### Write data
227

228
#### SQL Write
229

230
<RustInsert />
231

D
danielclow 已提交
232
#### STMT Write
233

234
<RustBind />
235

A
Adam Ji 已提交
236 237 238 239
#### Schemaless Write

<RustSml />

240
### Query data
241

D
danielclow 已提交
242
<RustQuery />
243 244 245

## API Reference

D
danielclow 已提交
246
### Connector Constructor
247

D
danielclow 已提交
248
You create a connector constructor by using a DSN.
249 250

```rust
D
danielclow 已提交
251
let cfg = TaosBuilder::default().build()?;
252 253
```

D
danielclow 已提交
254
You use the builder object to create multiple connections.
255 256

```rust
257
let conn: Taos = cfg.build();
258 259
```

D
danielclow 已提交
260
### Connection pooling
261

D
danielclow 已提交
262
In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2].
263

D
danielclow 已提交
264
As follows, a connection pool with default parameters can be generated.
265 266

```rust
267
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
268 269
```

D
danielclow 已提交
270
You can set the same connection pool parameters using the connection pool's constructor.
271 272

```rust
273 274 275 276 277 278 279 280 281
let dsn = "taos://localhost:6030";

let opts = PoolBuilder::new()
    .max_size(5000) // max connections
    .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
    .min_idle(Some(1000)) // minimal idle connections
    .connection_timeout(Duration::from_secs(2));

let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
282 283
```

D
danielclow 已提交
284
In the application code, use `pool.get()? ` to get a connection object [Taos].
285 286

```rust
D
danielclow 已提交
287
let taos = pool.get()?;
288 289
```

290
### Connectors
291

D
danielclow 已提交
292
The [Taos][struct.Taos] object provides an API to perform operations on multiple databases.
293

D
danielclow 已提交
294
1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT`, etc.
295 296 297 298 299

    ```rust
    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
    ```

D
danielclow 已提交
300
2. `exec_many`: Run multiple SQL statements simultaneously or in order.
301 302

    ```rust
303 304 305 306 307
    taos.exec_many([
        "CREATE DATABASE test",
        "USE test",
        "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
308 309
    ```

D
danielclow 已提交
310
3. `query`: Run a query statement and return a [ResultSet] object.
311 312

    ```rust
D
danielclow 已提交
313
    let mut q = taos.query("select * from log.logs").await?;
314 315
    ```

D
danielclow 已提交
316
    The [ResultSet] object stores query result data and the names, types, and lengths of returned columns
317

D
danielclow 已提交
318
    You can obtain column information by using [.fields()].
319

D
dingbo 已提交
320
    ```rust
321
    let cols = q.fields();
322
    for col in cols {
323
        println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
324 325 326
    }
    ```

D
danielclow 已提交
327
    It fetches data line by line.
328 329

    ```rust
330 331 332 333 334 335 336 337
    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
        for (col, (name, value)) in row.enumerate() {
            println!(
                "[{}] got value in col {} (named `{:>8}`): {}",
                nrows, col, name, value
            );
338
        }
339 340 341 342
        nrows += 1;
    }
    ```

D
danielclow 已提交
343
    Or use the [serde](https://serde.rs) deserialization framework.
344 345 346 347 348 349 350 351 352 353 354 355 356 357

    ```rust
    #[derive(Debug, Deserialize)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
358
    }
359 360 361 362 363 364 365

    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
366 367 368 369
    ```

Note that Rust asynchronous functions and an asynchronous runtime are required.

D
danielclow 已提交
370
[Taos][struct.Taos] provides Rust methods for some SQL statements to reduce the number of `format!`s.
371 372 373 374 375

- `.describe(table: &str)`: Executes `DESCRIBE` and returns a Rust data structure.
- `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement.
- `.use_database(database: &str)`: Executes the `USE` statement.

D
danielclow 已提交
376 377 378
In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage.

### Bind Interface
379

D
danielclow 已提交
380
Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement.
381 382

```rust
383 384
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
385 386 387 388
```

The bind object provides a set of interfaces for implementing parameter binding.

389
#### `.set_tbname(name)`
390 391 392

To bind table names.

393 394 395 396 397 398
```rust
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```

#### `.set_tags(&[tag])`
399

D
danielclow 已提交
400
Bind sub-table table names and tag values when the SQL statement uses a super table.
401 402

```rust
403 404
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
D
danielclow 已提交
405
stmt.set_tags(&[Value::VarChar("taos".to_string())])?;
406 407
```

408
#### `.bind(&[column])`
409

D
danielclow 已提交
410
Bind value types. Use the [ColumnView] structure to create and bind the required types.
411 412

```rust
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
let params = vec![
    ColumnView::from_millis_timestamp(vec![164000000000]),
    ColumnView::from_bools(vec![true]),
    ColumnView::from_tiny_ints(vec![i8::MAX]),
    ColumnView::from_small_ints(vec![i16::MAX]),
    ColumnView::from_ints(vec![i32::MAX]),
    ColumnView::from_big_ints(vec![i64::MAX]),
    ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
    ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
    ColumnView::from_unsigned_ints(vec![u32::MAX]),
    ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
    ColumnView::from_floats(vec![f32::MAX]),
    ColumnView::from_doubles(vec![f64::MAX]),
    ColumnView::from_varchar(vec!["ABC"]),
    ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
430 431
```

432
#### `.execute()`
433

D
danielclow 已提交
434
Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`.
435 436

```rust
D
danielclow 已提交
437
stmt.execute()?;
438 439

// next bind cycle.
D
danielclow 已提交
440 441 442
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;
443 444
```

D
danielclow 已提交
445
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
446

D
danielclow 已提交
447
### Subscriptions
448

D
danielclow 已提交
449
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
450

D
danielclow 已提交
451
You create a TMQ connector by using a DSN.
452 453

```rust
454
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
455 456
```

D
danielclow 已提交
457
Create a consumer:
458

459 460 461
```rust
let mut consumer = tmq.build()?;
```
462

D
danielclow 已提交
463
A single consumer can subscribe to one or more topics.
464

465 466 467
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
468

D
danielclow 已提交
469
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
470

471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
495
            }
496 497 498 499 500 501 502 503 504 505 506 507
        }
        consumer.commit(offset).await?;
    }
}
```

Unsubscribe:

```rust
consumer.unsubscribe().await;
```

D
danielclow 已提交
508
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
509

D
danielclow 已提交
510 511 512 513 514 515 516
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
  
For more information, see [GitHub sample file](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs).
517

D
danielclow 已提交
518
For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos).
519

D
danielclow 已提交
520
[taos]: https://github.com/taosdata/rust-connector-taos
521
[r2d2]: https://crates.io/crates/r2d2
D
danielclow 已提交
522 523 524 525
[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html
[struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html