--- toc_max_heading_level: 4 sidebar_position: 5 sidebar_label: Rust title: TDengine Rust Connector --- import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import Preparation from "./_preparation.mdx" import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx" import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx" import RustQuery from "../../07-develop/04-query-data/_rust.mdx" [`taos`][taos] is the official Rust language connector for TDengine. Rust developers can develop applications to access the TDengine instance data. Rust connector provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is **Websocket connection**, which connects to TDengine instances via taosAdapter service. The source code is hosted on [taosdata/taos-connector-rust](https://github.com/taosdata/taos-connector-rust). ## Supported platforms The platforms supported by native connections are the same as those supported by the TDengine client driver. REST connections are supported on all platforms that can run Rust. ## Version support Please refer to [version support list](/reference/connector#version-support). The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues. ## Installation ### Pre-installation * Install the Rust development toolchain * If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver) ### Add dependencies Add the dependency to the [Rust](https://rust-lang.org) project as follows, depending on the connection method selected. Add [taos] to the `Cargo.toml` file. ```toml [dependencies] # use default feature taos = "*" ``` Add [taos] to the `Cargo.toml` file. ```toml [dependencies] taos = { version = "*", default-features = false, features = ["native"] } ``` Add [taos] to the `Cargo.toml` file and enable the `ws` feature. ```toml [dependencies] taos = { version = "*", default-features = false, features = ["ws"] } ``` ## Create a connection In rust connector, we use a DSN connection string as a connection builder. For example, ```rust let builder = TaosBuilder::from_dsn("taos://")?; ``` You can now use connection client to create the connection. ```rust let conn = builder.build()?; ``` The connection object can create more than one. ```rust let conn1 = builder.build()?; let conn2 = builder.build()?; ``` DSN is short for **D**ata **S**ource **N**ame string - [a data structure used to describe a connection to a data source](https://en.wikipedia.org/wiki/Data_source_name). A common DSN is basically constructed as this: ```text [+]://[[:@]:][/][?=[&=]] |------|------------|---|-----------|-----------|------|------|------------|-----------------------| |driver| protocol | | username | password | host | port | database | params | ``` - **Driver**: the main entrypoint to a processer. **Required**. In Rust connector, the supported driver names are listed here: - **taos**: the legacy TDengine connection data source. - **tmq**: subscription data source from TDengine. - **http/ws**: use websocket protocol via `ws://` scheme. - **https/wss**: use websocket protocol via `wss://` scheme. - **Protocol**: the additional information appended to driver, which can be be used to support different kind of data sources. By default, leave it empty for native driver(only under feature "native"), and `ws/wss` for websocket driver (only under feature "ws"). **Optional**. - **Username**: as its definition, is the username to the connection. **Optional**. - **Password**: the password of the username. **Optional**. - **Host**: address host to the datasource. **Optional**. - **Port**: address port to the datasource. **Optional**. - **Database**: database name or collection name in the datasource. **Optional**. - **Params**: a key-value map for any other informations to the datasource. **Optional**. Here is a simple DSN connection string example: ```text taos+ws://localhost:6041/test ``` which means connect `localhost` with port `6041` via `ws` protocol, and make `test` as the default database. So that you can use DSN to specify connection protocol at runtime: ```rust use taos::*; // use it like a `prelude` mod, we need some traits at next. // use native protocol. let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; let conn1 = builder.build(); // use websocket protocol. let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; ``` After connected, you can perform the following operations on the database. ```rust async fn demo(taos: &Taos, db: &str) -> Result<(), Error> { // prepare database taos.exec_many([ format!("DROP DATABASE IF EXISTS `{db}`"), format!("CREATE DATABASE `{db}`"), format!("USE `{db}`"), ]) .await?; let inserted = taos.exec_many([ // create super table "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ TAGS (`groupid` INT, `location` BINARY(16))", // create child table "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", // insert into child table "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", // insert with NULL values "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", // insert and automatically create table with tags if not exists "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", // insert many records in a single sql "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", ]).await?; assert_eq!(inserted, 6); let mut result = taos.query("select * from `meters`").await?; for field in result.fields() { println!("got field: {}", field.name()); } let values = result. } ``` Rust connector provides two kinds of ways to fetch data: ```rust // Query option 1, use rows stream. let mut rows = result.rows(); while let Some(row) = rows.try_next().await? { for (name, value) in row { println!("got value of {}: {}", name, value); } } // Query options 2, use deserialization with serde. #[derive(Debug, serde::Deserialize)] #[allow(dead_code)] struct Record { // deserialize timestamp to chrono::DateTime ts: DateTime, // float to f32 current: Option, // int to i32 voltage: Option, phase: Option, groupid: i32, // binary/varchar to String location: String, } let records: Vec = taos .query("select * from `meters`") .await? .deserialize() .try_collect() .await?; dbg!(records); Ok(()) ``` ## Usage examples ### Write data #### SQL Write #### Stmt bind ### Query data | ## API Reference ### Connector builder Use DSN to directly construct a TaosBuilder object. ```rust let builder = TaosBuilder::from_dsn("")? ; ``` Use `builder` to create many connections: ```rust let conn: Taos = cfg.build(); ``` ### Connection pool In complex applications, we recommend enabling connection pools. Connection pool for [taos] is implemented using [r2d2] by enabling "r2d2" feature. Basically, a connection pool with default parameters can be generated as: ```rust let pool = TaosBuilder::from_dsn(dsn)?.pool()?; ``` You can set the connection pool parameters using the `PoolBuilder`. ```rust let dsn = "taos://localhost:6030"; let opts = PoolBuilder::new() .max_size(5000) // max connections .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection .min_idle(Some(1000)) // minimal idle connections .connection_timeout(Duration::from_secs(2)); let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?; ``` In the application code, use `pool.get()?` to get a connection object [Taos]. ```rust let taos = pool.get()? ; ``` ### Connection methods The [Taos] connection struct provides several APIs for convenient use. 1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT` etc. and return affected rows (only meaningful to `INSERT`). ```rust let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?; ``` 2. `exec_many`: You can execute many SQL statements in order with `exec_many` method. ```rust taos.exec_many([ "CREATE DATABASE test", "USE test", "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)", ]).await?; ``` 3. `query`: Execute the query statement and return the [ResultSet] object. ```rust let mut q = taos.query("select * from log.logs").await? ``` The [ResultSet] object stores the query result data and basic information about the returned columns (column name, type, length). Get filed information with `fields` method. ```rust let cols = q.fields(); for col in cols { println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes()); } ``` Users could fetch data by rows. ```rust let mut rows = result.rows(); let mut nrows = 0; while let Some(row) = rows.try_next().await? { for (col, (name, value)) in row.enumerate() { println!( "[{}] got value in col {} (named `{:>8}`): {}", nrows, col, name, value ); } nrows += 1; } ``` Or use it with [serde](https://serde.rs) deserialization. ```rust #[derive(Debug, Deserialize)] struct Record { // deserialize timestamp to chrono::DateTime ts: DateTime, // float to f32 current: Option, // int to i32 voltage: Option, phase: Option, groupid: i32, // binary/varchar to String location: String, } let records: Vec = taos .query("select * from `meters`") .await? .deserialize() .try_collect() .await?; ``` Note that Rust asynchronous functions and an asynchronous runtime are required. [Taos] provides a few Rust methods that encapsulate SQL to reduce the frequency of `format!` code blocks. - `.describe(table: &str)`: Executes `DESCRIBE` and returns a Rust data structure. - `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement. - `.use_database(database: &str)`: Executes the `USE` statement. ### Bind API Similar to the C interface, Rust provides the bind interface's wrapping. First, create a bind object [Stmt] for a SQL command with the [Taos] object. ```rust let mut stmt = Stmt::init(&taos).await?; stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; ``` The bind object provides a set of interfaces for implementing parameter binding. #### `.set_tbname(name)` To bind table names. ```rust let mut stmt = taos.stmt("insert into ? values(? ,?)")?; stmt.set_tbname("d0")?; ``` #### `.set_tags(&[tag])` Bind tag values when the SQL statement uses a super table. ```rust let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?; stmt.set_tbname("d0")?; stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; ``` #### `.bind(&[column])` Bind value types. Use the [ColumnView] structure to construct the desired type and bind. ```rust let params = vec![ ColumnView::from_millis_timestamp(vec![164000000000]), ColumnView::from_bools(vec![true]), ColumnView::from_tiny_ints(vec![i8::MAX]), ColumnView::from_small_ints(vec![i16::MAX]), ColumnView::from_ints(vec![i32::MAX]), ColumnView::from_big_ints(vec![i64::MAX]), ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), ColumnView::from_unsigned_small_ints(vec![u16::MAX]), ColumnView::from_unsigned_ints(vec![u32::MAX]), ColumnView::from_unsigned_big_ints(vec![u64::MAX]), ColumnView::from_floats(vec![f32::MAX]), ColumnView::from_doubles(vec![f64::MAX]), ColumnView::from_varchar(vec!["ABC"]), ColumnView::from_nchar(vec!["涛思数据"]), ]; let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; ``` #### `.execute()` Execute to insert all bind records. [Stmt] objects can be reused, re-bind, and executed after execution. Remember to call `add_batch` before `execute`. ```rust stmt.add_batch()?.execute()?; // next bind cycle. // stmt.set_tbname()? ; //stmt.bind()? ; //stmt.add_batch().execute()? ; ``` A runnable example for bind can be found [here](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs). ### Subscription API Users can subscribe a [TOPIC](../../../taos-sql/tmq/) with TMQ(the TDengine Message Queue) API. Start from a TMQ builder: ```rust let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; ``` Build a consumer: ```rust let mut consumer = tmq.build()?; ``` Subscribe a topic: ```rust consumer.subscribe(["tmq_meters"]).await?; ``` Consume messages, and commit the offset for each message. ```rust { let mut stream = consumer.stream(); while let Some((offset, message)) = stream.try_next().await? { // get information from offset // the topic let topic = offset.topic(); // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); println!("* in vgroup id {vgroup_id} of topic {topic}\n"); if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { // one block for one table, get table name if needed let name = block.table_name(); let records: Vec = block.deserialize().try_collect()?; println!( "** table: {}, got {} records: {:#?}\n", name.unwrap(), records.len(), records ); } } consumer.commit(offset).await?; } } ``` Unsubscribe: ```rust consumer.unsubscribe().await; ``` In TMQ DSN, you must choose to subscribe with a group id. Also, there's several options could be set: - `group.id`: **Required**, a group id is any visible string you set. - `client.id`: a optional client description string. - `auto.offset.reset`: choose to subscribe from *earliest* or *latest*, default is *none* which means 'earliest'. - `enable.auto.commit`: automatically commit with specified time interval. By default - in the recommended way _ you must use `commit` to ensure that you've consumed the messages correctly, otherwise, consumers will received repeated messages when re-subscribe. - `auto.commit.interval.ms`: the auto commit interval in milliseconds. Check the whole subscription example at [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs). Please move to the Rust documentation hosting page for other related structure API usage instructions: . [TDengine]: https://github.com/taosdata/TDengine [r2d2]: https://crates.io/crates/r2d2 [Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html [ResultSet]: https://docs.rs/taos/latest/taos/struct.ResultSet.html [Value]: https://docs.rs/taos/latest/taos/enum.Value.html [Stmt]: https://docs.rs/taos/latest/taos/stmt/struct.Stmt.html [taos]: https://crates.io/crates/taos