--- title: TDengine Rust Connector sidebar_label: Rust description: This document describes the TDengine Rust connector. toc_max_heading_level: 4 --- import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import Preparition from "./_preparation.mdx" import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx" import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx" import RustSml from "../../07-develop/03-insert-data/_rust_schemaless.mdx" import RustQuery from "../../07-develop/04-query-data/_rust.mdx" [![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos) `taos` is the official Rust connector for TDengine. Rust developers can develop applications to access the TDengine instance data. `taos` provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is the **WebSocket connection**, which connects to TDengine instances via the WebSocket interface provided by taosAdapter. You can specify a connection type with Cargo features. By default, both types are supported. The Websocket connection can be used on any platform. The native connection can be used on any platform that the TDengine Client supports. The source code for the Rust connectors is located on [GitHub](https://github.com/taosdata/taos-connector-rust). ## Supported platforms Native connections are supported on the same platforms as the TDengine client driver. Websocket connections are supported on all platforms that can run Go. ## Version history | connector-rust version | TDengine version | major features | | :----------------: | :--------------: | :--------------------------------------------------: | | v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. | | v0.8.0 | 3.0.4.0 | Support schemaless insert. | | v0.7.6 | 3.0.3.0 | Support req_id in query. | | v0.6.0 | 3.0.0.0 | Base features. | The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues. ## Handling exceptions After the error is reported, the specific information of the error can be obtained: ```rust match conn.exec(sql) { Ok(_) => { Ok(()) } Err(e) => { eprintln!("ERROR: {:?}", e); Err(e) } } ``` ## TDengine DataType vs. Rust DataType TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Rust is as follows: | TDengine DataType | Rust DataType | | ----------------- | ----------------- | | TIMESTAMP | Timestamp | | INT | i32 | | BIGINT | i64 | | FLOAT | f32 | | DOUBLE | f64 | | SMALLINT | i16 | | TINYINT | i8 | | BOOL | bool | | BINARY | Vec | | NCHAR | String | | JSON | serde_json::Value | Note: Only TAG supports JSON types ## Installation Steps ### Pre-installation preparation * Install the Rust development toolchain * If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver) ### Install the connectors Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows: In `cargo.toml`, add [taos][taos]: ```toml [dependencies] # use default feature taos = "*" ``` In `cargo.toml`, add [taos][taos] and enable the ws feature: ```toml [dependencies] taos = { version = "*", default-features = false, features = ["ws"] } ``` In `cargo.toml`, add [taos][taos] and enable the native feature: ```toml [dependencies] taos = { version = "*", default-features = false, features = ["native"] } ``` ## Establishing a connection [TaosBuilder] creates a connection constructor through the DSN connection description string. ```rust let builder = TaosBuilder::from_dsn("taos://")?; ``` You can now use this object to create the connection. ```rust let conn = builder.build()?; ``` The connection object can create more than one. ```rust let conn1 = builder.build()?; let conn2 = builder.build()?; ``` The structure of the DSN description string is as follows: ```text [+]://[[:@]:][/][?=[&=]] |------|------------|---|-----------|-----------|------|------|------------|-----------------------| |driver| protocol | | username | password | host | port | database | params | ``` The parameters are described as follows: - **driver**: Specify a driver name so that the connector can choose which method to use to establish the connection. Supported driver names are as follows: - **taos**: Table names use the TDengine connector driver. - **tmq**: Use the TMQ to subscribe to data. - **http/ws**: Use Websocket to establish connections. - **https/wss**: Use Websocket to establish connections, and enable SSL/TLS. - **protocol**: Specify which connection method to use. For example, `taos+ws://localhost:6041` uses Websocket to establish connections. - **username/password**: Username and password used to create connections. - **host/port**: Specifies the server and port to establish a connection. If you do not specify a hostname or port, native connections default to `localhost:6030` and Websocket connections default to `localhost:6041`. - **database**: Specify the default database to connect to. It's optional. - **params**: Optional parameters. A sample DSN description string is as follows: ```text taos+ws://localhost:6041/test ``` This indicates that the Websocket connection method is used on port 6041 to connect to the server localhost and use the database `test` by default. You can create DSNs to connect to servers in your environment. ```rust use taos::*; // use native protocol. let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; let conn1 = builder.build(); // use websocket protocol. let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; let conn2 = builder2.build(); ``` After the connection is established, you can perform operations on your database. ```rust async fn demo(taos: &Taos, db: &str) -> Result<(), Error> { // prepare database taos.exec_many([ format!("DROP DATABASE IF EXISTS `{db}`"), format!("CREATE DATABASE `{db}`"), format!("USE `{db}`"), ]) .await?; let inserted = taos.exec_many([ // create super table "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ TAGS (`groupid` INT, `location` BINARY(24))", // create child table "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')", // insert into child table "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", // insert with NULL values "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", // insert and automatically create table with tags if not exists "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)", // insert many records in a single sql "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", ]).await?; assert_eq!(inserted, 6); let mut result = taos.query("select * from `meters`").await?; for field in result.fields() { println!("got field: {}", field.name()); } let values = result. } ``` There are two ways to query data: Using built-in types or the [serde](https://serde.rs) deserialization framework. ```rust // Query option 1, use rows stream. let mut rows = result.rows(); while let Some(row) = rows.try_next().await? { for (name, value) in row { println!("got value of {}: {}", name, value); } } // Query options 2, use deserialization with serde. #[derive(Debug, serde::Deserialize)] #[allow(dead_code)] struct Record { // deserialize timestamp to chrono::DateTime ts: DateTime, // float to f32 current: Option, // int to i32 voltage: Option, phase: Option, groupid: i32, // binary/varchar to String location: String, } let records: Vec = taos .query("select * from `meters`") .await? .deserialize() .try_collect() .await?; dbg!(records); Ok(()) ``` ## Usage examples ### Create database and tables ```rust use taos::*; #[tokio::main] async fn main() -> anyhow::Result<()> { let dsn = "taos://localhost:6030"; let builder = TaosBuilder::from_dsn(dsn)?; let taos = builder.build()?; let db = "query"; // create database taos.exec_many([ format!("DROP DATABASE IF EXISTS `{db}`"), format!("CREATE DATABASE `{db}`"), format!("USE `{db}`"), ]) .await?; // create table taos.exec_many([ // create super table "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ TAGS (`groupid` INT, `location` BINARY(16))", // create child table "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", ]).await?; } ``` ### Insert data ### Query data ### execute SQL with req_id This req_id can be used to request link tracing. ```rust let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?; ``` ### Writing data via parameter binding TDengine has significantly improved the bind APIs to support data writing (INSERT) scenarios. Writing data in this way avoids the resource consumption of SQL syntax parsing, resulting in significant write performance improvements in many cases. ### Schemaless Writing TDengine supports schemaless writing. It is compatible with InfluxDB's Line Protocol, OpenTSDB's telnet line protocol, and OpenTSDB's JSON format protocol. For more information, see [Schemaless Writing](../../schemaless). ### Schemaless with req_id This req_id can be used to request link tracing. ```rust let sml_data = SmlDataBuilder::default() .protocol(SchemalessProtocol::Line) .data(data) .req_id(100u64) .build()?; client.put(&sml_data)? ``` ### Data Subscription TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/). #### Create a Topic ```rust taos.exec_many([ // create topic for subscription format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") ]) .await?; ``` #### Create a Consumer You create a TMQ connector by using a DSN. ```rust let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; ``` Create a consumer: ```rust let mut consumer = tmq.build()?; ``` #### Subscribe to consume data A single consumer can subscribe to one or more topics. ```rust consumer.subscribe(["tmq_meters"]).await?; ``` The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed. ```rust { let mut stream = consumer.stream(); while let Some((offset, message)) = stream.try_next().await? { // get information from offset // the topic let topic = offset.topic(); // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); println!("* in vgroup id {vgroup_id} of topic {topic}\n"); if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { // one block for one table, get table name if needed let name = block.table_name(); let records: Vec = block.deserialize().try_collect()?; println!( "** table: {}, got {} records: {:#?}\n", name.unwrap(), records.len(), records ); } } consumer.commit(offset).await?; } } ``` Get assignments: Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0 ```rust let assignments = consumer.assignments().await.unwrap(); ``` #### Assignment subscription Offset Seek offset: Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0 ```rust consumer.offset_seek(topic, vgroup_id, offset).await; ``` #### Close subscriptions ```rust consumer.unsubscribe().await; ``` The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory. - `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis. - `client.id`: Subscriber client ID. - `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group. - `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential. - `auto.commit.interval.ms`: Interval for automatic commits. #### Full Sample Code For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs). ### Use with connection pool In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2]. As follows, a connection pool with default parameters can be generated. ```rust let pool = TaosBuilder::from_dsn(dsn)?.pool()?; ``` You can set the same connection pool parameters using the connection pool's constructor. ```rust let dsn = "taos://localhost:6030"; let opts = PoolBuilder::new() .max_size(5000) // max connections .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection .min_idle(Some(1000)) // minimal idle connections .connection_timeout(Duration::from_secs(2)); let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?; ``` In the application code, use `pool.get()? ` to get a connection object [Taos]. ```rust let taos = pool.get()?; ``` ### More sample programs The source code of the sample application is under `TDengine/examples/rust` : [rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust) ## Frequently Asked Questions For additional troubleshooting, see [FAQ](../../../train-faq/faq). ## API Reference The [Taos][struct.Taos] object provides an API to perform operations on multiple databases. 1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT`, etc. ```rust let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?; ``` 2. `exec_many`: Run multiple SQL statements simultaneously or in order. ```rust taos.exec_many([ "CREATE DATABASE test", "USE test", "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)", ]).await?; ``` 3. `query`: Run a query statement and return a [ResultSet] object. ```rust let mut q = taos.query("select * from log.logs").await?; ``` The [ResultSet] object stores query result data and the names, types, and lengths of returned columns You can obtain column information by using [.fields()]. ```rust let cols = q.fields(); for col in cols { println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes()); } ``` It fetches data line by line. ```rust let mut rows = result.rows(); let mut nrows = 0; while let Some(row) = rows.try_next().await? { for (col, (name, value)) in row.enumerate() { println!( "[{}] got value in col {} (named `{:>8}`): {}", nrows, col, name, value ); } nrows += 1; } ``` Or use the [serde](https://serde.rs) deserialization framework. ```rust #[derive(Debug, Deserialize)] struct Record { // deserialize timestamp to chrono::DateTime ts: DateTime, // float to f32 current: Option, // int to i32 voltage: Option, phase: Option, groupid: i32, // binary/varchar to String location: String, } let records: Vec = taos .query("select * from `meters`") .await? .deserialize() .try_collect() .await?; ``` Note that Rust asynchronous functions and an asynchronous runtime are required. [Taos][struct.Taos] provides Rust methods for some SQL statements to reduce the number of `format!`s. - `.describe(table: &str)`: Executes `DESCRIBE` and returns a Rust data structure. - `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement. - `.use_database(database: &str)`: Executes the `USE` statement. In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage. Bind Interface Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement. ```rust let mut stmt = Stmt::init(&taos).await?; stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; ``` The bind object provides a set of interfaces for implementing parameter binding. `.set_tbname(name)` To bind table names. ```rust let mut stmt = taos.stmt("insert into ? values(? ,?)")?; stmt.set_tbname("d0")?; ``` `.set_tags(&[tag])` Bind sub-table table names and tag values when the SQL statement uses a super table. ```rust let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?; stmt.set_tbname("d0")?; stmt.set_tags(&[Value::VarChar("taos".to_string())])?; ``` `.bind(&[column])` Bind value types. Use the [ColumnView] structure to create and bind the required types. ```rust let params = vec![ ColumnView::from_millis_timestamp(vec![164000000000]), ColumnView::from_bools(vec![true]), ColumnView::from_tiny_ints(vec![i8::MAX]), ColumnView::from_small_ints(vec![i16::MAX]), ColumnView::from_ints(vec![i32::MAX]), ColumnView::from_big_ints(vec![i64::MAX]), ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), ColumnView::from_unsigned_small_ints(vec![u16::MAX]), ColumnView::from_unsigned_ints(vec![u32::MAX]), ColumnView::from_unsigned_big_ints(vec![u64::MAX]), ColumnView::from_floats(vec![f32::MAX]), ColumnView::from_doubles(vec![f64::MAX]), ColumnView::from_varchar(vec!["ABC"]), ColumnView::from_nchar(vec!["涛思数据"]), ]; let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; ``` `.execute()` Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`. ```rust stmt.execute()?; // next bind cycle. //stmt.set_tbname()?; //stmt.bind()?; //stmt.execute()?; ``` For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs). For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos). [taos]: https://github.com/taosdata/rust-connector-taos [r2d2]: https://crates.io/crates/r2d2 [TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html [TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html [struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html [Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html