--- toc_max_heading_level: 4 sidebar_label: Rust title: TDengine Rust Connector --- import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import Preparation from "./_preparation.mdx" import RustInsert from "../07-develop/03-insert-data/_rust_sql.mdx" import RustBind from "../07-develop/03-insert-data/_rust_stmt.mdx" import RustSml from "../07-develop/03-insert-data/_rust_schemaless.mdx" import RustQuery from "../07-develop/04-query-data/_rust.mdx" [![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos) `taos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。 `taos` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 **Websocket 连接**,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 `features`)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。 该 Rust 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-rust)。 ## 支持的平台 原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 Websocket 连接支持所有能运行 Rust 的平台。 ## 版本历史 | Rust 连接器版本 | TDengine 版本 | 主要功能 | | :----------------: | :--------------: | :--------------------------------------------------: | | v0.8.12 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 | | v0.8.0 | 3.0.4.0 | 支持无模式写入。 | | v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 | | v0.6.0 | 3.0.0.0 | 基础功能。 | Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。 ## 处理错误 在报错后,可以获取到错误的具体信息: ```rust match conn.exec(sql) { Ok(_) => { Ok(()) } Err(e) => { eprintln!("ERROR: {:?}", e); Err(e) } } ``` ## TDengine DataType 和 Rust DataType TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对应类型转换如下: | TDengine DataType | Rust DataType | | ----------------- | ----------------- | | TIMESTAMP | Timestamp | | INT | i32 | | BIGINT | i64 | | FLOAT | f32 | | DOUBLE | f64 | | SMALLINT | i16 | | TINYINT | i8 | | BOOL | bool | | BINARY | Vec | | NCHAR | String | | JSON | serde_json::Value | **注意**:JSON 类型仅在 tag 中支持。 ## 安装步骤 ### 安装前准备 * 安装 Rust 开发工具链 * 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) ### 安装连接器 根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖: 在 `Cargo.toml` 文件中添加 [taos][taos]: ```toml [dependencies] # use default feature taos = "*" ``` 在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。 ```toml [dependencies] taos = { version = "*", default-features = false, features = ["ws"] } ``` 当仅启用 `ws` 特性时,可同时指定 `r2d2` 使得在同步(blocking/sync)模式下使用 [r2d2] 作为连接池: ```toml [dependencies] taos = { version = "*", default-features = false, features = ["r2d2", "ws"] } ``` 在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性: ```toml [dependencies] taos = { version = "*", default-features = false, features = ["native"] } ``` ## 建立连接 [TaosBuilder] 通过 DSN 连接描述字符串创建一个连接构造器。 ```rust let builder = TaosBuilder::from_dsn("taos://")?; ``` 现在您可以使用该对象创建连接: ```rust let conn = builder.build()?; ``` 连接对象可以创建多个: ```rust let conn1 = builder.build()?; let conn2 = builder.build()?; ``` DSN 描述字符串基本结构如下: ```text [+]://[[:@]:][/][?=[&=]] |------|------------|---|-----------|-----------|------|------|------------|-----------------------| |driver| protocol | | username | password | host | port | database | params | ``` 各部分意义见下表: - **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名: - **taos**: 表名使用 TDengine 连接器驱动。 - **tmq**: 使用 TMQ 订阅数据。 - **http/ws**: 使用 Websocket 创建连接。 - **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。 - **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。 - **username/password**: 用于创建连接的用户名及密码。 - **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`,Websocket 连接默认为 `localhost:6041` 。 - **database**: 指定默认连接的数据库名,可选参数。 - **params**:其他可选参数。 一个完整的 DSN 描述字符串示例如下: ```text taos+ws://localhost:6041/test ``` 表示使用 Websocket(`ws`)方式通过 `6041` 端口连接服务器 `localhost`,并指定默认数据库为 `test`。 这使得用户可以通过 DSN 指定连接方式: ```rust use taos::*; // use native protocol. let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; let conn1 = builder.build(); // use websocket protocol. let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; let conn2 = builder2.build(); ``` 建立连接后,您可以进行相关数据库操作: ```rust async fn demo(taos: &Taos, db: &str) -> Result<(), Error> { // prepare database taos.exec_many([ format!("DROP DATABASE IF EXISTS `{db}`"), format!("CREATE DATABASE `{db}`"), format!("USE `{db}`"), ]) .await?; let inserted = taos.exec_many([ // create super table "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ TAGS (`groupid` INT, `location` BINARY(24))", // create child table "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')", // insert into child table "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", // insert with NULL values "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", // insert and automatically create table with tags if not exists "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)", // insert many records in a single sql "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", ]).await?; assert_eq!(inserted, 6); let mut result = taos.query("select * from `meters`").await?; for field in result.fields() { println!("got field: {}", field.name()); } let values = result. } ``` 查询数据可以通过两种方式:使用内建类型或 [serde](https://serde.rs) 序列化框架。 ```rust // Query option 1, use rows stream. let mut rows = result.rows(); while let Some(row) = rows.try_next().await? { for (name, value) in row { println!("got value of {}: {}", name, value); } } // Query options 2, use deserialization with serde. #[derive(Debug, serde::Deserialize)] #[allow(dead_code)] struct Record { // deserialize timestamp to chrono::DateTime ts: DateTime, // float to f32 current: Option, // int to i32 voltage: Option, phase: Option, groupid: i32, // binary/varchar to String location: String, } let records: Vec = taos .query("select * from `meters`") .await? .deserialize() .try_collect() .await?; dbg!(records); Ok(()) ``` ## 使用示例 ### 创建数据库和表 ```rust use taos::*; #[tokio::main] async fn main() -> anyhow::Result<()> { let dsn = "taos://localhost:6030"; let builder = TaosBuilder::from_dsn(dsn)?; let taos = builder.build()?; let db = "query"; // create database taos.exec_many([ format!("DROP DATABASE IF EXISTS `{db}`"), format!("CREATE DATABASE `{db}`"), format!("USE `{db}`"), ]) .await?; // create table taos.exec_many([ // create super table "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ TAGS (`groupid` INT, `location` BINARY(16))", // create child table "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", ]).await?; } ``` ### 插入数据 ### 查询数据 ### 执行带有 req_id 的 SQL 此 req_id 可用于请求链路追踪。 ```rust let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?; ``` ### 通过参数绑定写入数据 TDengine 的 Rust 连接器实现了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。 ### 无模式写入 TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。 ### 执行带有 req_id 的无模式写入 此 req_id 可用于请求链路追踪。 ```rust let sml_data = SmlDataBuilder::default() .protocol(SchemalessProtocol::Line) .data(data) .req_id(100u64) .build()?; client.put(&sml_data)? ``` ### 数据订阅 TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。 #### 创建 Topic ```rust taos.exec_many([ // create topic for subscription format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") ]) .await?; ``` #### 创建 Consumer 从 DSN 开始,构建一个 TMQ 连接器。 ```rust let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; ``` 创建消费者: ```rust let mut consumer = tmq.build()?; ``` #### 订阅消费数据 消费者可订阅一个或多个 `TOPIC`。 ```rust consumer.subscribe(["tmq_meters"]).await?; ``` TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。 ```rust { let mut stream = consumer.stream(); while let Some((offset, message)) = stream.try_next().await? { // get information from offset // the topic let topic = offset.topic(); // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); println!("* in vgroup id {vgroup_id} of topic {topic}\n"); if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { // one block for one table, get table name if needed let name = block.table_name(); let records: Vec = block.deserialize().try_collect()?; println!( "** table: {}, got {} records: {:#?}\n", name.unwrap(), records.len(), records ); } } consumer.commit(offset).await?; } } ``` 获取消费进度: 版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0 ```rust let assignments = consumer.assignments().await.unwrap(); ``` #### 指定订阅 Offset 按照指定的进度消费: 版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0 ```rust consumer.offset_seek(topic, vgroup_id, offset).await; ``` #### 关闭订阅 ```rust consumer.unsubscribe().await; ``` 对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。 - `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。 - `client.id`: 可选的订阅客户端识别项。 - `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。 - `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。 - `auto.commit.interval.ms`: 自动标记的时间间隔。 #### 完整示例 完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs). ### 与连接池使用 在复杂应用中,建议启用连接池。[taos] 的连接池默认(异步模式)使用 [deadpool] 实现。 如下,可以生成一个默认参数的连接池。 ```rust let pool: Pool = TaosBuilder::from_dsn("taos:///") .unwrap() .pool() .unwrap(); ``` 同样可以使用连接池的构造器,对连接池参数进行设置: ```rust let pool: Pool = Pool::builder(Manager::from_dsn(self.dsn.clone()).unwrap().0) .max_size(88) // 最大连接数 .build() .unwrap(); ``` 在应用代码中,使用 `pool.get()?` 来获取一个连接对象 [Taos]。 ```rust let taos = pool.get()?; ``` ### 更多示例程序 示例程序源码位于 `TDengine/examples/rust` 下: 请参考:[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust) ## 常见问题 请参考 [FAQ](../../../train-faq/faq) ## API 参考 [Taos][struct.Taos] 对象提供了多个数据库操作的 API: 1. `exec`: 执行某个非查询类 SQL 语句,例如 `CREATE`,`ALTER`,`INSERT` 等。 ```rust let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?; ``` 2. `exec_many`: 同时(顺序)执行多个 SQL 语句。 ```rust taos.exec_many([ "CREATE DATABASE test", "USE test", "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)", ]).await?; ``` 3. `query`:执行查询语句,返回 [ResultSet] 对象。 ```rust let mut q = taos.query("select * from log.logs").await?; ``` [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度): 列信息使用 [.fields()] 方法获取: ```rust let cols = q.fields(); for col in cols { println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes()); } ``` 逐行获取数据: ```rust let mut rows = result.rows(); let mut nrows = 0; while let Some(row) = rows.try_next().await? { for (col, (name, value)) in row.enumerate() { println!( "[{}] got value in col {} (named `{:>8}`): {}", nrows, col, name, value ); } nrows += 1; } ``` 或使用 [serde](https://serde.rs) 序列化框架。 ```rust #[derive(Debug, Deserialize)] struct Record { // deserialize timestamp to chrono::DateTime ts: DateTime, // float to f32 current: Option, // int to i32 voltage: Option, phase: Option, groupid: i32, // binary/varchar to String location: String, } let records: Vec = taos .query("select * from `meters`") .await? .deserialize() .try_collect() .await?; ``` 需要注意的是,需要使用 Rust 异步函数和异步运行时。 [Taos][struct.Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率: - `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。 - `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。 - `.use_database(database: &str)`: 执行 `USE` 语句。 除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。 参数绑定接口 与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]: ```rust let mut stmt = Stmt::init(&taos).await?; stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; ``` 参数绑定对象提供了一组接口用于实现参数绑定: `.set_tbname(name)` 用于绑定表名。 ```rust let mut stmt = taos.stmt("insert into ? values(? ,?)")?; stmt.set_tbname("d0")?; ``` `.set_tags(&[tag])` 当 SQL 语句使用超级表时,用于绑定子表表名和标签值: ```rust let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?; stmt.set_tbname("d0")?; stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; ``` `.bind(&[column])` 用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定: ```rust let params = vec![ ColumnView::from_millis_timestamp(vec![164000000000]), ColumnView::from_bools(vec![true]), ColumnView::from_tiny_ints(vec![i8::MAX]), ColumnView::from_small_ints(vec![i16::MAX]), ColumnView::from_ints(vec![i32::MAX]), ColumnView::from_big_ints(vec![i64::MAX]), ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), ColumnView::from_unsigned_small_ints(vec![u16::MAX]), ColumnView::from_unsigned_ints(vec![u32::MAX]), ColumnView::from_unsigned_big_ints(vec![u64::MAX]), ColumnView::from_floats(vec![f32::MAX]), ColumnView::from_doubles(vec![f64::MAX]), ColumnView::from_varchar(vec!["ABC"]), ColumnView::from_nchar(vec!["涛思数据"]), ]; let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; ``` `.execute()` 执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。 ```rust stmt.execute()?; // next bind cycle. //stmt.set_tbname()?; //stmt.bind()?; //stmt.execute()?; ``` 一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。 其他相关结构体 API 使用说明请移步 Rust 文档托管网页:。 [taos]: https://github.com/taosdata/rust-connector-taos [deadpool]: https://crates.io/crates/deadpool [r2d2]: https://crates.io/crates/r2d2 [TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html [TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html [struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html [Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html