......@@ -31,21 +31,57 @@ Websocket connections are supported on all platforms that can run Go.
| connector-rust version | TDengine version | major features |
| :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | or later | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.0 | | Support schemaless insert. |
| v0.7.6 | | Support req_id in query. |
| v0.6.0 | | Base features. |
The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.
## Installation
## Handling exceptions
After the error is reported, the specific information of the error can be obtained:
match conn.exec(sql) {
Ok(_) => {
Err(e) => {
eprintln!("ERROR: {:?}", e);
## TDengine DataType vs. Rust DataType
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Rust is as follows:
| TDengine DataType | Rust DataType |
| ----------------- | ----------------- |
| TIMESTAMP | Timestamp |
| INT | i32 |
| BIGINT | i64 |
| FLOAT | f32 |
| DOUBLE | f64 |
| SMALLINT | i16 |
| TINYINT | i8 |
| BOOL | bool |
| BINARY | Vec<u8\> |
| NCHAR | String |
| JSON | serde_json::Value |
Note: Only TAG supports JSON types
## Installation Steps
### Pre-installation preparation
* Install the Rust development toolchain
* If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver)
### Add taos dependency
### Install the connectors
Depending on the connection method, add the [taos][taos] dependency in your Rust project as follows:
......@@ -146,7 +182,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
// use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
After the connection is established, you can perform operations on your database.
......@@ -228,41 +265,183 @@ There are two ways to query data: Using built-in types or the [serde](https://se
## Usage examples
### Write data
### Create database and tables
use taos::*;
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "query";
// create database
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
// create table
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
#### SQL Write
### Insert data
<RustInsert />
#### STMT Write
### Query data
<RustQuery />
### execute SQL with req_id
This req_id can be used to request link tracing.
let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?;
### Writing data via parameter binding
<RustBind />
#### Schemaless Write
### Schemaless Writing
<RustSml />
### Query data
### Schemaless with req_id
<RustQuery />
This req_id can be used to request link tracing.
## API Reference
let sml_data = SmlDataBuilder::default()
### Data Subscription
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
#### Create a Topic
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
### Connector Constructor
#### Create a Consumer
You create a connector constructor by using a DSN.
You create a TMQ connector by using a DSN.
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
Create a consumer:
let mut consumer = tmq.build()?;
#### Subscribe to consume data
A single consumer can subscribe to one or more topics.
let cfg = TaosBuilder::default().build()?;
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
"** table: {}, got {} records: {:#?}\n",
You use the builder object to create multiple connections.
Get assignments:
Version requirements connector-rust >= v0.8.8, TDengine >=
let conn: Taos = cfg.build();
let assignments = consumer.assignments().await.unwrap();
### Connection pooling
#### Assignment subscription Offset
Seek offset:
Version requirements connector-rust >= v0.8.8, TDengine >=
consumer.offset_seek(topic, vgroup_id, offset).await;
#### Close subscriptions
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
#### Full Sample Code
For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
### Use with connection pool
In complex applications, we recommend enabling connection pools. [taos] implements connection pools based on [r2d2].
......@@ -292,7 +471,17 @@ In the application code, use `pool.get()? ` to get a connection object [Taos].
let taos = pool.get()?;
### Connectors
### More sample programs
The source code of the sample application is under `TDengine/examples/rust` :
[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust)
## Frequently Asked Questions
For additional troubleshooting, see [FAQ](../../../train-faq/faq).
## API Reference
The [Taos][struct.Taos] object provides an API to perform operations on multiple databases.
......@@ -380,7 +569,7 @@ Note that Rust asynchronous functions and an asynchronous runtime are required.
In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage.
### Bind Interface
Bind Interface
Similar to the C interface, Rust provides the bind interface's wrapping. First, the [Taos][struct.taos] object creates a parameter binding object [Stmt] for an SQL statement.
......@@ -391,7 +580,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
The bind object provides a set of interfaces for implementing parameter binding.
#### `.set_tbname(name)`
To bind table names.
......@@ -400,7 +589,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
#### `.set_tags(&[tag])`
Bind sub-table table names and tag values when the SQL statement uses a super table.
......@@ -410,7 +599,7 @@ stmt.set_tbname("d0")?;
#### `.bind(&[column])`
Bind value types. Use the [ColumnView] structure to create and bind the required types.
......@@ -434,7 +623,7 @@ let params = vec![
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
#### `.execute()`
Execute SQL. [Stmt] objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with `.add_batch`.
......@@ -449,92 +638,6 @@ stmt.execute()?;
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
### Subscriptions
TDengine starts subscriptions through [TMQ](../../../taos-sql/tmq/).
You create a TMQ connector by using a DSN.
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
Create a consumer:
let mut consumer = tmq.build()?;
A single consumer can subscribe to one or more topics.
The TMQ is of [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) type. You can use the corresponding API to consume each message in the queue and then use `.commit` to mark them as consumed.
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
"** table: {}, got {} records: {:#?}\n",
Get assignments:
Version requirements connector-rust >= v0.8.8, TDengine >=
let assignments = consumer.assignments().await.unwrap();
Seek offset:
Version requirements connector-rust >= v0.8.8, TDengine >=
consumer.offset_seek(topic, vgroup_id, offset).await;
The following parameters can be configured for the TMQ DSN. Only `group.id` is mandatory.
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
- `client.id`: Subscriber client ID.
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
- `auto.commit.interval.ms`: Interval for automatic commits.
For more information, see [GitHub sample file](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos).
......@@ -30,21 +30,57 @@ Websocket 连接支持所有能运行 Rust 的平台。
| Rust 连接器版本 | TDengine 版本 | 主要功能 |
| :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | or later | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.8.0 | | 支持无模式写入。 |
| v0.7.6 | | 支持在请求中使用 req_id。 |
| v0.6.0 | | 基础功能。 |
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
## 安装
## 处理错误
match conn.exec(sql) {
Ok(_) => {
Err(e) => {
eprintln!("ERROR: {:?}", e);
## TDengine DataType 和 Rust DataType
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对应类型转换如下:
| TDengine DataType | Rust DataType |
| ----------------- | ----------------- |
| TIMESTAMP | Timestamp |
| INT | i32 |
| BIGINT | i64 |
| FLOAT | f32 |
| DOUBLE | f64 |
| SMALLINT | i16 |
| TINYINT | i8 |
| BOOL | bool |
| BINARY | Vec<u8\> |
| NCHAR | String |
| JSON | serde_json::Value |
**注意**:JSON 类型仅在 tag 中支持。
## 安装步骤
### 安装前准备
* 安装 Rust 开发工具链
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 添加 taos 依赖
### 安装连接器
根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖:
......@@ -151,7 +187,8 @@ let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
// use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
......@@ -233,41 +270,183 @@ async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
## 使用示例
### 写入数据
### 创建数据库和表
use taos::*;
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "query";
// create database
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
// create table
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
#### SQL 写入
### 插入数据
<RustInsert />
#### STMT 写入
### 查询数据
<RustQuery />
### 执行带有 req_id 的 SQL
此 req_id 可用于请求链路追踪。
let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?;
### 通过参数绑定写入数据
<RustBind />
#### Schemaless 写入
### 无模式写入
<RustSml />
### 查询数据
### 执行带有 req_id 的无模式写入
<RustQuery />
此 req_id 可用于请求链路追踪。
## API 参考
let sml_data = SmlDataBuilder::default()
### 数据订阅
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
#### 创建 Topic
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
### 连接构造器
#### 创建 Consumer
通过 DSN 来构建一个连接器构造器。
从 DSN 开始,构建一个 TMQ 连接器。
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build()?;
#### 订阅消费数据
消费者可订阅一个或多个 `TOPIC`。
let cfg = TaosBuilder::default().build()?;
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
"** table: {}, got {} records: {:#?}\n",
使用 `builder` 对象创建多个连接:
版本要求 connector-rust >= v0.8.8, TDengine >=
let conn: Taos = cfg.build();
let assignments = consumer.assignments().await.unwrap();
### 连接池
#### 指定订阅 Offset
版本要求 connector-rust >= v0.8.8, TDengine >=
consumer.offset_seek(topic, vgroup_id, offset).await;
#### 关闭订阅
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。
- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。
#### 完整示例
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
### 与连接池使用
在复杂应用中,建议启用连接池。[taos] 的连接池默认(异步模式)使用 [deadpool] 实现。
......@@ -295,7 +474,17 @@ let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).
let taos = pool.get()?;
### 连接
### 更多示例程序
示例程序源码位于 `TDengine/examples/rust` 下:
请参考:[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust)
## 常见问题
请参考 [FAQ](../../../train-faq/faq)
## API 参考
[Taos][struct.Taos] 对象提供了多个数据库操作的 API:
......@@ -383,7 +572,7 @@ let taos = pool.get()?;
除此之外,该结构也是 [参数绑定](#参数绑定接口) 和 [行协议接口](#行协议接口) 的入口,使用方法请参考具体的 API 说明。
### 参数绑定接口
与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]:
......@@ -394,7 +583,7 @@ stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
#### `.set_tbname(name)`
......@@ -403,7 +592,7 @@ let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
#### `.set_tags(&[tag])`
当 SQL 语句使用超级表时,用于绑定子表表名和标签值:
......@@ -413,7 +602,7 @@ stmt.set_tbname("d0")?;
#### `.bind(&[column])`
用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
......@@ -437,7 +626,7 @@ let params = vec![
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
#### `.execute()`
执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。
......@@ -452,92 +641,6 @@ stmt.execute()?;
一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。
### 订阅
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
从 DSN 开始,构建一个 TMQ 连接器。
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build()?;
消费者可订阅一个或多个 `TOPIC`。
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
"** table: {}, got {} records: {:#?}\n",
版本要求 connector-rust >= v0.8.8, TDengine >=
let assignments = consumer.assignments().await.unwrap();
版本要求 connector-rust >= v0.8.8, TDengine >=
consumer.offset_seek(topic, vgroup_id, offset).await;
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。
- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。
