06-rust.mdx 16.2 KB
Newer Older
1 2
---
title: TDengine Rust Connector
D
danielclow 已提交
3 4 5
sidebar_label: Rust
description: This document describes the TDengine Rust connector.
toc_max_heading_level: 4
6 7 8 9 10
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

G
gccgdb1234 已提交
11
import Preparition from "./_preparation.mdx"
12
import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx"
13
import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx"
14 15
import RustQuery from "../../07-develop/04-query-data/_rust.mdx"

D
danielclow 已提交
16
[![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos)
17

D
danielclow 已提交
18
`taos` is the official Rust connector for TDengine. Rust developers can develop applications to access the TDengine instance data.
19

D
danielclow 已提交
20 21 22
`taos` provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is the **WebSocket connection**, which connects to TDengine instances via the WebSocket interface provided by taosAdapter. You can specify a connection type with Cargo features. By default, both types are supported. The Websocket connection can be used on any platform. The native connection can be used on any platform that the TDengine Client supports.

The source code for the Rust connectors is located on [GitHub](https://github.com/taosdata/taos-connector-rust).
23 24 25

## Supported platforms

D
danielclow 已提交
26 27
Native connections are supported on the same platforms as the TDengine client driver.
Websocket connections are supported on all platforms that can run Go.
28 29 30

## Version support

D
danielclow 已提交
31
Please refer to [version support list](/reference/connector#version-support)
32

33
The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.
34 35 36

## Installation

D
danielclow 已提交
37
### Pre-installation preparation
38

39 40 41
* Install the Rust development toolchain
* If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver)

D
danielclow 已提交
42
# Add taos dependency
43

D
danielclow 已提交
44
Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows:
45

46
<Tabs defaultValue="default">
D
danielclow 已提交
47
<TabItem value="default" label="Support Both">
48

D
danielclow 已提交
49
In `cargo.toml`, add [taos][taos]:
50

D
dingbo 已提交
51
```toml
52 53
[dependencies]
# use default feature
54
taos = "*"
55 56
```

D
dingbo 已提交
57
</TabItem>
58

59
<TabItem value="rest" label="Websocket only">
D
danielclow 已提交
60

61
In `cargo.toml`, add [taos][taos] and enable the ws feature:
62 63 64

```toml
[dependencies]
65
taos = { version = "*", default-features = false, features = ["ws"] }
66 67 68 69
```

</TabItem>

70 71 72
<TabItem value="native" label="native connection only">

In `cargo.toml`, add [taos][taos] and enable the native feature:
73 74 75

```toml
[dependencies]
76
taos = { version = "*", default-features = false, features = ["native"] }
77 78
```

79
</TabItem>
80

81 82
</Tabs>

D
danielclow 已提交
83
## Establishing a connection
84

D
danielclow 已提交
85
[TaosBuilder] creates a connection constructor through the DSN connection description string.
86 87

```rust
88
let builder = TaosBuilder::from_dsn("taos://")?;
89 90
```

D
danielclow 已提交
91
You can now use this object to create the connection.
92 93

```rust
94
let conn = builder.build()?;
95 96 97 98 99
```

The connection object can create more than one.

```rust
100 101 102 103
let conn1 = builder.build()?;
let conn2 = builder.build()?;
```

D
danielclow 已提交
104
The structure of the DSN description string is as follows:
105 106 107 108 109

```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver|   protocol |   | username  | password  | host | port |  database  |  params               |
110 111
```

D
danielclow 已提交
112 113 114 115 116 117 118 119 120 121
The parameters are described as follows:

- **driver**: Specify a driver name so that the connector can choose which method to use to establish the connection. Supported driver names are as follows:
  - **taos**: Table names use the TDengine connector driver.
  - **tmq**: Use the TMQ to subscribe to data.
  - **http/ws**: Use Websocket to establish connections.
  - **https/wss**: Use Websocket to establish connections, and enable SSL/TLS.
- **protocol**: Specify which connection method to use. For example, `taos+ws://localhost:6041` uses Websocket to establish connections.
- **username/password**: Username and password used to create connections.
- **host/port**: Specifies the server and port to establish a connection. If you do not specify a hostname or port, native connections default to `localhost:6030` and Websocket connections default to `localhost:6041`.
122
- **database**: Specify the default database to connect to. It's optional.
D
danielclow 已提交
123 124 125
- **params**:Optional parameters.

A sample DSN description string is as follows:
126 127 128 129 130

```text
taos+ws://localhost:6041/test
```

D
danielclow 已提交
131
This indicates that the Websocket connection method is used on port 6041 to connect to the server localhost and use the database `test` by default.
132

D
danielclow 已提交
133
You can create DSNs to connect to servers in your environment.
134 135

```rust
D
danielclow 已提交
136
use taos::*;
137

138 139 140 141 142 143
// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();

//  use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
144 145
```

D
danielclow 已提交
146
After the connection is established, you can perform operations on your database.
147 148

```rust
149 150 151 152 153 154 155 156 157 158 159 160
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
    // prepare database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
161
         TAGS (`groupid` INT, `location` BINARY(24))",
162
        // create child table
163
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
164 165 166 167 168
        // insert into child table
        "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
        // insert with NULL values
        "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
        // insert and automatically create table with tags if not exists
169
        "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
170 171 172 173 174 175 176 177 178
        // insert many records in a single sql
        "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
    ]).await?;

    assert_eq!(inserted, 6);
    let mut result = taos.query("select * from `meters`").await?;

    for field in result.fields() {
        println!("got field: {}", field.name());
179
    }
180 181

    let values = result.
182 183 184
}
```

D
danielclow 已提交
185
There are two ways to query data: Using built-in types or the [serde](https://serde.rs) deserialization framework.
186

187 188 189 190 191 192 193 194
```rust
    // Query option 1, use rows stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("got value of {}: {}", name, value);
        }
    }
195

196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
    // Query options 2, use deserialization with serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
    }
211

212 213 214 215 216 217
    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
218

219 220 221
    dbg!(records);
    Ok(())
```
222

223
## Usage examples
224

225
### Write data
226

227
#### SQL Write
228

229
<RustInsert />
230

D
danielclow 已提交
231
#### STMT Write
232

233
<RustBind />
234

235
### Query data
236

D
danielclow 已提交
237
<RustQuery />
238 239 240

## API Reference

D
danielclow 已提交
241
### Connector Constructor
242

D
danielclow 已提交
243
You create a connector constructor by using a DSN.
244 245

```rust
D
danielclow 已提交
246
let cfg = TaosBuilder::default().build()?;
247 248
```

D
danielclow 已提交
249
You use the builder object to create multiple connections.
250 251

```rust
252
let conn: Taos = cfg.build();
253 254
```

D
danielclow 已提交
255
### Connection pooling
256

D
danielclow 已提交
257
In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2].
258

D
danielclow 已提交
259
As follows, a connection pool with default parameters can be generated.
260 261

```rust
262
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
263 264
```

D
danielclow 已提交
265
You can set the same connection pool parameters using the connection pool's constructor.
266 267

```rust
268 269 270 271 272 273 274 275 276
let dsn = "taos://localhost:6030";

let opts = PoolBuilder::new()
    .max_size(5000) // max connections
    .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
    .min_idle(Some(1000)) // minimal idle connections
    .connection_timeout(Duration::from_secs(2));

let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
277 278
```

D
danielclow 已提交
279
In the application code, use `pool.get()? ` to get a connection object [Taos].
280 281

```rust
D
danielclow 已提交
282
let taos = pool.get()?;
283 284
```

D
danielclow 已提交
285
# Connectors
286

D
danielclow 已提交
287
The [Taos][struct.Taos] object provides an API to perform operations on multiple databases.
288

D
danielclow 已提交
289
1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT`, etc.
290 291 292 293 294

    ```rust
    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
    ```

D
danielclow 已提交
295
2. `exec_many`: Run multiple SQL statements simultaneously or in order.
296 297

    ```rust
298 299 300 301 302
    taos.exec_many([
        "CREATE DATABASE test",
        "USE test",
        "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
303 304
    ```

D
danielclow 已提交
305
3. `query`: Run a query statement and return a [ResultSet] object.
306 307

    ```rust
D
danielclow 已提交
308
    let mut q = taos.query("select * from log.logs").await?;
309 310
    ```

D
danielclow 已提交
311
    The [ResultSet] object stores query result data and the names, types, and lengths of returned columns
312

D
danielclow 已提交
313
    You can obtain column information by using [.fields()].
314

D
dingbo 已提交
315
    ```rust
316
    let cols = q.fields();
317
    for col in cols {
318
        println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
319 320 321
    }
    ```

D
danielclow 已提交
322
    It fetches data line by line.
323 324

    ```rust
325 326 327 328 329 330 331 332
    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
        for (col, (name, value)) in row.enumerate() {
            println!(
                "[{}] got value in col {} (named `{:>8}`): {}",
                nrows, col, name, value
            );
333
        }
334 335 336 337
        nrows += 1;
    }
    ```

D
danielclow 已提交
338
    Or use the [serde](https://serde.rs) deserialization framework.
339 340 341 342 343 344 345 346 347 348 349 350 351 352

    ```rust
    #[derive(Debug, Deserialize)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
353
    }
354 355 356 357 358 359 360

    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
361 362 363 364
    ```

Note that Rust asynchronous functions and an asynchronous runtime are required.

D
danielclow 已提交
365
[Taos][struct.Taos] provides Rust methods for some SQL statements to reduce the number of `format!`s.
366 367 368 369 370

- `.describe(table: &str)`: Executes `DESCRIBE` and returns a Rust data structure.
- `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement.
- `.use_database(database: &str)`: Executes the `USE` statement.

D
danielclow 已提交
371 372 373
In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage.

### Bind Interface
374

D
danielclow 已提交
375
Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement.
376 377

```rust
378 379
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
380 381 382 383
```

The bind object provides a set of interfaces for implementing parameter binding.

384
#### `.set_tbname(name)`
385 386 387

To bind table names.

388 389 390 391 392 393
```rust
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```

#### `.set_tags(&[tag])`
394

D
danielclow 已提交
395
Bind sub-table table names and tag values when the SQL statement uses a super table.
396 397

```rust
398 399
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
D
danielclow 已提交
400
stmt.set_tags(&[Value::VarChar("taos".to_string())])?;
401 402
```

403
#### `.bind(&[column])`
404

D
danielclow 已提交
405
Bind value types. Use the [ColumnView] structure to create and bind the required types.
406 407

```rust
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
let params = vec![
    ColumnView::from_millis_timestamp(vec![164000000000]),
    ColumnView::from_bools(vec![true]),
    ColumnView::from_tiny_ints(vec![i8::MAX]),
    ColumnView::from_small_ints(vec![i16::MAX]),
    ColumnView::from_ints(vec![i32::MAX]),
    ColumnView::from_big_ints(vec![i64::MAX]),
    ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
    ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
    ColumnView::from_unsigned_ints(vec![u32::MAX]),
    ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
    ColumnView::from_floats(vec![f32::MAX]),
    ColumnView::from_doubles(vec![f64::MAX]),
    ColumnView::from_varchar(vec!["ABC"]),
    ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
425 426
```

427
#### `.execute()`
428

D
danielclow 已提交
429
Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`.
430 431

```rust
D
danielclow 已提交
432
stmt.execute()?;
433 434

// next bind cycle.
D
danielclow 已提交
435 436 437
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;
438 439
```

D
danielclow 已提交
440
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
441

D
danielclow 已提交
442
### Subscriptions
443

D
danielclow 已提交
444
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
445

D
danielclow 已提交
446
You create a TMQ connector by using a DSN.
447 448

```rust
449
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
450 451
```

D
danielclow 已提交
452
Create a consumer:
453

454 455 456
```rust
let mut consumer = tmq.build()?;
```
457

D
danielclow 已提交
458
A single consumer can subscribe to one or more topics.
459

460 461 462
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
463

D
danielclow 已提交
464
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
465

466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
490
            }
491 492 493 494 495 496 497 498 499 500 501 502
        }
        consumer.commit(offset).await?;
    }
}
```

Unsubscribe:

```rust
consumer.unsubscribe().await;
```

D
danielclow 已提交
503
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
504

D
danielclow 已提交
505 506 507 508 509 510 511
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
  
For more information, see [GitHub sample file](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs).
512

D
danielclow 已提交
513
For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos).
514

D
danielclow 已提交
515
[taos]: https://github.com/taosdata/rust-connector-taos
516
[r2d2]: https://crates.io/crates/r2d2
D
danielclow 已提交
517 518 519 520
[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html
[struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html