--- toc_max_heading_level: 4 sidebar_label: Rust title: TDengine Rust Connector --- import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import Preparation from "./_preparation.mdx" import RustInsert from "../07-develop/03-insert-data/_rust_sql.mdx" import RustBind from "../07-develop/03-insert-data/_rust_stmt.mdx" import RustSml from "../07-develop/03-insert-data/_rust_schemaless.mdx" import RustQuery from "../07-develop/04-query-data/_rust.mdx" [![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos) `taos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。 `taos` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 **Websocket 连接**,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 `features`)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。 该 Rust 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-rust)。 ## 支持的平台 原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 Websocket 连接支持所有能运行 Rust 的平台。 ## 版本支持 请参考[版本支持列表](../#版本支持) Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。 ## 安装 ### 安装前准备 * 安装 Rust 开发工具链 * 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) ### 添加 taos 依赖 根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖: 在 `Cargo.toml` 文件中添加 [taos][taos]: ```toml [dependencies] # use default feature taos = "*" ``` 在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。 ```toml [dependencies] taos = { version = "*", default-features = false, features = ["ws"] } ``` 当仅启用 `ws` 特性时,可同时指定 `r2d2` 使得在同步(blocking/sync)模式下使用 [r2d2] 作为连接池: ```toml [dependencies] taos = { version = "*", default-features = false, features = ["r2d2", "ws"] } ``` 在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性: ```toml [dependencies] taos = { version = "*", default-features = false, features = ["native"] } ``` ## 建立连接 [TaosBuilder] 通过 DSN 连接描述字符串创建一个连接构造器。 ```rust let builder = TaosBuilder::from_dsn("taos://")?; ``` 现在您可以使用该对象创建连接: ```rust let conn = builder.build()?; ``` 连接对象可以创建多个: ```rust let conn1 = builder.build()?; let conn2 = builder.build()?; ``` DSN 描述字符串基本结构如下: ```text [+]://[[:@]:][/][?=[&=]] |------|------------|---|-----------|-----------|------|------|------------|-----------------------| |driver| protocol | | username | password | host | port | database | params | ``` 各部分意义见下表: - **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名: - **taos**: 表名使用 TDengine 连接器驱动。 - **tmq**: 使用 TMQ 订阅数据。 - **http/ws**: 使用 Websocket 创建连接。 - **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。 - **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。 - **username/password**: 用于创建连接的用户名及密码。 - **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`,Websocket 连接默认为 `localhost:6041` 。 - **database**: 指定默认连接的数据库名,可选参数。 - **params**:其他可选参数。 一个完整的 DSN 描述字符串示例如下: ```text taos+ws://localhost:6041/test ``` 表示使用 Websocket(`ws`)方式通过 `6041` 端口连接服务器 `localhost`,并指定默认数据库为 `test`。 这使得用户可以通过 DSN 指定连接方式: ```rust use taos::*; // use native protocol. let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; let conn1 = builder.build(); // use websocket protocol. let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; ``` 建立连接后,您可以进行相关数据库操作: ```rust async fn demo(taos: &Taos, db: &str) -> Result<(), Error> { // prepare database taos.exec_many([ format!("DROP DATABASE IF EXISTS `{db}`"), format!("CREATE DATABASE `{db}`"), format!("USE `{db}`"), ]) .await?; let inserted = taos.exec_many([ // create super table "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ TAGS (`groupid` INT, `location` BINARY(24))", // create child table "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')", // insert into child table "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", // insert with NULL values "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", // insert and automatically create table with tags if not exists "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)", // insert many records in a single sql "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", ]).await?; assert_eq!(inserted, 6); let mut result = taos.query("select * from `meters`").await?; for field in result.fields() { println!("got field: {}", field.name()); } let values = result. } ``` 查询数据可以通过两种方式:使用内建类型或 [serde](https://serde.rs) 序列化框架。 ```rust // Query option 1, use rows stream. let mut rows = result.rows(); while let Some(row) = rows.try_next().await? { for (name, value) in row { println!("got value of {}: {}", name, value); } } // Query options 2, use deserialization with serde. #[derive(Debug, serde::Deserialize)] #[allow(dead_code)] struct Record { // deserialize timestamp to chrono::DateTime ts: DateTime, // float to f32 current: Option, // int to i32 voltage: Option, phase: Option, groupid: i32, // binary/varchar to String location: String, } let records: Vec = taos .query("select * from `meters`") .await? .deserialize() .try_collect() .await?; dbg!(records); Ok(()) ``` ## 使用示例 ### 写入数据 #### SQL 写入 #### STMT 写入 #### Schemaless 写入 ### 查询数据 ## API 参考 ### 连接构造器 通过 DSN 来构建一个连接器构造器。 ```rust let cfg = TaosBuilder::default().build()?; ``` 使用 `builder` 对象创建多个连接: ```rust let conn: Taos = cfg.build(); ``` ### 连接池 在复杂应用中,建议启用连接池。[taos] 的连接池默认(异步模式)使用 [deadpool] 实现。 如下,可以生成一个默认参数的连接池。 ```rust let pool: Pool = TaosBuilder::from_dsn("taos:///") .unwrap() .pool() .unwrap(); ``` 同样可以使用连接池的构造器,对连接池参数进行设置: ```rust let pool: Pool = Pool::builder(Manager::from_dsn(self.dsn.clone()).unwrap().0) .max_size(88) // 最大连接数 .build() .unwrap(); ``` 在应用代码中,使用 `pool.get()?` 来获取一个连接对象 [Taos]。 ```rust let taos = pool.get()?; ``` ### 连接 [Taos][struct.Taos] 对象提供了多个数据库操作的 API: 1. `exec`: 执行某个非查询类 SQL 语句,例如 `CREATE`,`ALTER`,`INSERT` 等。 ```rust let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?; ``` 2. `exec_many`: 同时(顺序)执行多个 SQL 语句。 ```rust taos.exec_many([ "CREATE DATABASE test", "USE test", "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)", ]).await?; ``` 3. `query`:执行查询语句,返回 [ResultSet] 对象。 ```rust let mut q = taos.query("select * from log.logs").await?; ``` [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度): 列信息使用 [.fields()] 方法获取: ```rust let cols = q.fields(); for col in cols { println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes()); } ``` 逐行获取数据: ```rust let mut rows = result.rows(); let mut nrows = 0; while let Some(row) = rows.try_next().await? { for (col, (name, value)) in row.enumerate() { println!( "[{}] got value in col {} (named `{:>8}`): {}", nrows, col, name, value ); } nrows += 1; } ``` 或使用 [serde](https://serde.rs) 序列化框架。 ```rust #[derive(Debug, Deserialize)] struct Record { // deserialize timestamp to chrono::DateTime ts: DateTime, // float to f32 current: Option, // int to i32 voltage: Option, phase: Option, groupid: i32, // binary/varchar to String location: String, } let records: Vec = taos .query("select * from `meters`") .await? .deserialize() .try_collect() .await?; ``` 需要注意的是,需要使用 Rust 异步函数和异步运行时。 [Taos][struct.Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率: - `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。 - `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。 - `.use_database(database: &str)`: 执行 `USE` 语句。 除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。 ### 参数绑定接口 与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]: ```rust let mut stmt = Stmt::init(&taos).await?; stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; ``` 参数绑定对象提供了一组接口用于实现参数绑定: #### `.set_tbname(name)` 用于绑定表名。 ```rust let mut stmt = taos.stmt("insert into ? values(? ,?)")?; stmt.set_tbname("d0")?; ``` #### `.set_tags(&[tag])` 当 SQL 语句使用超级表时,用于绑定子表表名和标签值: ```rust let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?; stmt.set_tbname("d0")?; stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; ``` #### `.bind(&[column])` 用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定: ```rust let params = vec![ ColumnView::from_millis_timestamp(vec![164000000000]), ColumnView::from_bools(vec![true]), ColumnView::from_tiny_ints(vec![i8::MAX]), ColumnView::from_small_ints(vec![i16::MAX]), ColumnView::from_ints(vec![i32::MAX]), ColumnView::from_big_ints(vec![i64::MAX]), ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), ColumnView::from_unsigned_small_ints(vec![u16::MAX]), ColumnView::from_unsigned_ints(vec![u32::MAX]), ColumnView::from_unsigned_big_ints(vec![u64::MAX]), ColumnView::from_floats(vec![f32::MAX]), ColumnView::from_doubles(vec![f64::MAX]), ColumnView::from_varchar(vec!["ABC"]), ColumnView::from_nchar(vec!["涛思数据"]), ]; let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; ``` #### `.execute()` 执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。 ```rust stmt.execute()?; // next bind cycle. //stmt.set_tbname()?; //stmt.bind()?; //stmt.execute()?; ``` 一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。 ### 订阅 TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。 从 DSN 开始,构建一个 TMQ 连接器。 ```rust let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; ``` 创建消费者: ```rust let mut consumer = tmq.build()?; ``` 消费者可订阅一个或多个 `TOPIC`。 ```rust consumer.subscribe(["tmq_meters"]).await?; ``` TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。 ```rust { let mut stream = consumer.stream(); while let Some((offset, message)) = stream.try_next().await? { // get information from offset // the topic let topic = offset.topic(); // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); println!("* in vgroup id {vgroup_id} of topic {topic}\n"); if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { // one block for one table, get table name if needed let name = block.table_name(); let records: Vec = block.deserialize().try_collect()?; println!( "** table: {}, got {} records: {:#?}\n", name.unwrap(), records.len(), records ); } } consumer.commit(offset).await?; } } ``` 获取消费进度: 版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0 ```rust let assignments = consumer.assignments().await.unwrap(); ``` 按照指定的进度消费: 版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0 ```rust consumer.offset_seek(topic, vgroup_id, offset).await; ``` 停止订阅: ```rust consumer.unsubscribe().await; ``` 对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。 - `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。 - `client.id`: 可选的订阅客户端识别项。 - `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。 - `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。 - `auto.commit.interval.ms`: 自动标记的时间间隔。 完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs). 其他相关结构体 API 使用说明请移步 Rust 文档托管网页:。 [taos]: https://github.com/taosdata/rust-connector-taos [deadpool]: https://crates.io/crates/deadpool [r2d2]: https://crates.io/crates/r2d2 [TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html [TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html [struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html [Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html