From 74cd6a971ec7cfd62dc64d4818e0319bf3924e06 Mon Sep 17 00:00:00 2001 From: Linhe Huo Date: Thu, 11 Aug 2022 19:55:23 +0800 Subject: [PATCH] docs(rust): update rust documentations and examples for 3.0 (#16009) Closes - [TD-17895](https://jira.taosdata.com:18080/browse/TD-17895) - [TD-18191](https://jira.taosdata.com:18080/browse/TD-18191) - [TD-18197](https://jira.taosdata.com:18080/browse/TD-18197) --- .../07-develop/03-insert-data/_rust_line.mdx | 1 - .../03-insert-data/_rust_opts_json.mdx | 1 - .../03-insert-data/_rust_opts_telnet.mdx | 1 - docs/en/14-reference/03-connector/rust.mdx | 512 +++++++++++------- docs/examples/rust/Cargo.toml | 2 +- docs/examples/rust/nativeexample/Cargo.toml | 9 +- .../rust/nativeexample/examples/connect.rs | 22 +- .../nativeexample/examples/stmt_example.rs | 42 +- .../nativeexample/examples/subscribe_demo.rs | 104 +++- docs/examples/rust/restexample/Cargo.toml | 8 +- .../rust/restexample/examples/connect.rs | 21 +- .../restexample/examples/insert_example.rs | 39 +- .../restexample/examples/query_example.rs | 42 +- .../rust/schemalessexample/Cargo.toml | 7 - .../examples/influxdb_line_example.rs | 22 - .../examples/opentsdb_json_example.rs | 25 - .../examples/opentsdb_telnet_example.rs | 28 - .../rust/schemalessexample/src/main.rs | 3 - docs/zh/14-reference/03-connector/rust.mdx | 489 +++++++++++------ 19 files changed, 817 insertions(+), 561 deletions(-) delete mode 100644 docs/examples/rust/schemalessexample/Cargo.toml delete mode 100644 docs/examples/rust/schemalessexample/examples/influxdb_line_example.rs delete mode 100644 docs/examples/rust/schemalessexample/examples/opentsdb_json_example.rs delete mode 100644 docs/examples/rust/schemalessexample/examples/opentsdb_telnet_example.rs delete mode 100644 docs/examples/rust/schemalessexample/src/main.rs diff --git a/docs/en/07-develop/03-insert-data/_rust_line.mdx b/docs/en/07-develop/03-insert-data/_rust_line.mdx index dbb35d76bc..25d322f8a7 100644 --- a/docs/en/07-develop/03-insert-data/_rust_line.mdx +++ b/docs/en/07-develop/03-insert-data/_rust_line.mdx @@ -1,3 +1,2 @@ ```rust -{{#include docs/examples/rust/schemalessexample/examples/influxdb_line_example.rs}} ``` diff --git a/docs/en/07-develop/03-insert-data/_rust_opts_json.mdx b/docs/en/07-develop/03-insert-data/_rust_opts_json.mdx index cc2055510b..25d322f8a7 100644 --- a/docs/en/07-develop/03-insert-data/_rust_opts_json.mdx +++ b/docs/en/07-develop/03-insert-data/_rust_opts_json.mdx @@ -1,3 +1,2 @@ ```rust -{{#include docs/examples/rust/schemalessexample/examples/opentsdb_json_example.rs}} ``` diff --git a/docs/en/07-develop/03-insert-data/_rust_opts_telnet.mdx b/docs/en/07-develop/03-insert-data/_rust_opts_telnet.mdx index 109c0c5d01..25d322f8a7 100644 --- a/docs/en/07-develop/03-insert-data/_rust_opts_telnet.mdx +++ b/docs/en/07-develop/03-insert-data/_rust_opts_telnet.mdx @@ -1,3 +1,2 @@ ```rust -{{#include docs/examples/rust/schemalessexample/examples/opentsdb_telnet_example.rs}} ``` diff --git a/docs/en/14-reference/03-connector/rust.mdx b/docs/en/14-reference/03-connector/rust.mdx index 56ca586c7e..ab06f72069 100644 --- a/docs/en/14-reference/03-connector/rust.mdx +++ b/docs/en/14-reference/03-connector/rust.mdx @@ -10,16 +10,14 @@ import TabItem from '@theme/TabItem'; import Preparation from "./_preparation.mdx" import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx" -import RustInfluxLine from "../../07-develop/03-insert-data/_rust_line.mdx" -import RustOpenTSDBTelnet from "../../07-develop/03-insert-data/_rust_opts_telnet.mdx" -import RustOpenTSDBJson from "../../07-develop/03-insert-data/_rust_opts_json.mdx" +import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx" import RustQuery from "../../07-develop/04-query-data/_rust.mdx" -`libtaos` is the official Rust language connector for TDengine. Rust developers can develop applications to access the TDengine instance data. +[`taos`][taos] is the official Rust language connector for TDengine. Rust developers can develop applications to access the TDengine instance data. -`libtaos` provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is **REST connection**, which connects to TDengine instances via taosAdapter's REST interface. +Rust connector provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is **Websocket connection**, which connects to TDengine instances via taosAdapter service. -The source code for `libtaos` is hosted on [GitHub](https://github.com/taosdata/libtaos-rs). +The source code is hosted on [taosdata/taos-connector-rust](https://github.com/taosdata/taos-connector-rust). ## Supported platforms @@ -30,241 +28,333 @@ REST connections are supported on all platforms that can run Rust. Please refer to [version support list](/reference/connector#version-support). -The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 2.4 or higher to avoid known issues. +The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues. ## Installation ### Pre-installation + * Install the Rust development toolchain * If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver) -### Adding libtaos dependencies +### Add dependencies -Add the [libtaos][libtaos] dependency to the [Rust](https://rust-lang.org) project as follows, depending on the connection method selected. +Add the dependency to the [Rust](https://rust-lang.org) project as follows, depending on the connection method selected. - - + + -Add [libtaos][libtaos] to the `Cargo.toml` file. +Add [taos] to the `Cargo.toml` file. ```toml [dependencies] # use default feature -libtaos = "*" +taos = "*" ``` - + -Add [libtaos][libtaos] to the `Cargo.toml` file and enable the `rest` feature. +Add [taos] to the `Cargo.toml` file. ```toml [dependencies] -# use rest feature -libtaos = { version = "*", features = ["rest"]} +taos = { version = "*", default-features = false, features = ["native"] } ``` - - + -### Using connection pools - -Please enable the `r2d2` feature in `Cargo.toml`. +Add [taos] to the `Cargo.toml` file and enable the `ws` feature. ```toml [dependencies] -# with taosc -libtaos = { version = "*", features = ["r2d2"] } -# or rest -libtaos = { version = "*", features = ["rest", "r2d2"] } +taos = { version = "*", default-features = false, features = ["ws"] } ``` + + + ## Create a connection -The [TaosCfgBuilder] provides the user with an API in the form of a constructor for the subsequent creation of connections or use of connection pools. +In rust connector, we use a DSN connection string as a connection builder. For example, ```rust -let cfg: TaosCfg = TaosCfgBuilder::default() - .ip("127.0.0.1") - .user("root") - .pass("taosdata") - .db("log") // do not set if not require a default database. - .port(6030u16) - .build() - .expect("TaosCfg builder error"); -} +let builder = TaosBuilder::from_dsn("taos://")?; ``` -You can now use this object to create the connection. +You can now use connection client to create the connection. ```rust -let conn = cfg.connect()? ; +let conn = builder.build()?; ``` The connection object can create more than one. ```rust -let conn = cfg.connect()? ; -let conn2 = cfg.connect()? ; +let conn1 = builder.build()?; +let conn2 = builder.build()?; +``` + +DSN is short for **D**ata **S**ource **N**ame string - [a data structure used to describe a connection to a data source](https://en.wikipedia.org/wiki/Data_source_name). + +A common DSN is basically constructed as this: + +```text +[+]://[[:@]:][/][?=[&=]] +|------|------------|---|-----------|-----------|------|------|------------|-----------------------| +|driver| protocol | | username | password | host | port | database | params | ``` -You can use connection pools in applications. +- **Driver**: the main entrypoint to a processer. **Required**. In Rust connector, the supported driver names are listed here: + - **taos**: the legacy TDengine connection data source. + - **tmq**: subscription data source from TDengine. + - **http/ws**: use websocket protocol via `ws://` scheme. + - **https/wss**: use websocket protocol via `wss://` scheme. +- **Protocol**: the additional information appended to driver, which can be be used to support different kind of data sources. By default, leave it empty for native driver(only under feature "native"), and `ws/wss` for websocket driver (only under feature "ws"). **Optional**. +- **Username**: as its definition, is the username to the connection. **Optional**. +- **Password**: the password of the username. **Optional**. +- **Host**: address host to the datasource. **Optional**. +- **Port**: address port to the datasource. **Optional**. +- **Database**: database name or collection name in the datasource. **Optional**. +- **Params**: a key-value map for any other informations to the datasource. **Optional**. + +Here is a simple DSN connection string example: + +```text +taos+ws://localhost:6041/test +``` + +which means connect `localhost` with port `6041` via `ws` protocol, and make `test` as the default database. + +So that you can use DSN to specify connection protocol at runtime: ```rust -let pool = r2d2::Pool::builder() - .max_size(10000) // max connections - .build(cfg)? ; +use taos::*; // use it like a `prelude` mod, we need some traits at next. -// ... -// Use pool to get connection -let conn = pool.get()? ; +// use native protocol. +let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; +let conn1 = builder.build(); + +// use websocket protocol. +let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; ``` -After that, you can perform the following operations on the database. +After connected, you can perform the following operations on the database. ```rust -async fn demo() -> Result<(), Error> { - // get connection ... - - // create database - conn.exec("create database if not exists demo").await? - // change database context - conn.exec("use demo").await? - // create table - conn.exec("create table if not exists tb1 (ts timestamp, v int)").await? - // insert - conn.exec("insert into tb1 values(now, 1)").await? - // query - let rows = conn.query("select * from tb1").await? - for row in rows.rows { - println!("{}", row.into_iter().join(",")); +async fn demo(taos: &Taos, db: &str) -> Result<(), Error> { + // prepare database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + let inserted = taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ + TAGS (`groupid` INT, `location` BINARY(16))", + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + + assert_eq!(inserted, 6); + let mut result = taos.query("select * from `meters`").await?; + + for field in result.fields() { + println!("got field: {}", field.name()); } + + let values = result. } ``` -## Usage examples - -### Write data +Rust connector provides two kinds of ways to fetch data: -#### SQL Write +```rust + // Query option 1, use rows stream. + let mut rows = result.rows(); + while let Some(row) = rows.try_next().await? { + for (name, value) in row { + println!("got value of {}: {}", name, value); + } + } - + // Query options 2, use deserialization with serde. + #[derive(Debug, serde::Deserialize)] + #[allow(dead_code)] + struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, + } -#### InfluxDB line protocol write + let records: Vec = taos + .query("select * from `meters`") + .await? + .deserialize() + .try_collect() + .await?; - + dbg!(records); + Ok(()) +``` -#### OpenTSDB Telnet line protocol write +## Usage examples - +### Write data -#### OpenTSDB JSON line protocol write +#### SQL Write - + -### Query data +#### Stmt bind - + -### More sample programs +### Query data -| Program Path | Program Description | -| -------------- | ----------------------------------------------------------------------------- | -| [demo.rs] | Basic API Usage Examples | -| [bailongma-rs] | Using TDengine as the Prometheus remote storage API adapter for the storage backend, using the r2d2 connection pool | +| ## API Reference -### Connection constructor API - -The [Builder Pattern](https://doc.rust-lang.org/1.0.0/style/ownership/builders.html) constructor pattern is Rust's solution for handling complex data types or optional configuration types. The [libtaos] implementation uses the connection constructor [TaosCfgBuilder] as the entry point for the TDengine Rust connector. The [TaosCfgBuilder] provides optional configuration of servers, ports, databases, usernames, passwords, etc. - -Using the `default()` method, you can construct a [TaosCfg] with default parameters for subsequent connections to the database or establishing connection pools. +### Connector builder -```rust -let cfg = TaosCfgBuilder::default().build()? ; -``` - -Using the constructor pattern, the user can set on-demand. +Use DSN to directly construct a TaosBuilder object. ```rust -let cfg = TaosCfgBuilder::default() - .ip("127.0.0.1") - .user("root") - .pass("taosdata") - .db("log") - .port(6030u16) - .build()? ; +let builder = TaosBuilder::from_dsn("")? ; ``` -Create TDengine connection using [TaosCfg] object. +Use `builder` to create many connections: ```rust -let conn: Taos = cfg.connect(); +let conn: Taos = cfg.build(); ``` -### Connection pooling +### Connection pool -In complex applications, we recommend enabling connection pools. Connection pool for [libtaos] is implemented using [r2d2]. +In complex applications, we recommend enabling connection pools. Connection pool for [taos] is implemented using [r2d2] by enabling "r2d2" feature. -As follows, a connection pool with default parameters can be generated. +Basically, a connection pool with default parameters can be generated as: ```rust -let pool = r2d2::Pool::new(cfg)? ; +let pool = TaosBuilder::from_dsn(dsn)?.pool()?; ``` -You can set the same connection pool parameters using the connection pool's constructor. +You can set the connection pool parameters using the `PoolBuilder`. ```rust - use std::time::Duration; - let pool = r2d2::Pool::builder() - .max_size(5000) // max connections - .max_lifetime(Some(Duration::from_minutes(100))) // lifetime of each connection - .min_idle(Some(1000)) // minimal idle connections - .connection_timeout(Duration::from_minutes(2)) - .build(cfg); +let dsn = "taos://localhost:6030"; + +let opts = PoolBuilder::new() + .max_size(5000) // max connections + .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection + .min_idle(Some(1000)) // minimal idle connections + .connection_timeout(Duration::from_secs(2)); + +let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?; ``` -In the application code, use `pool.get()? ` to get a connection object [Taos]. +In the application code, use `pool.get()?` to get a connection object [Taos]. ```rust let taos = pool.get()? ; ``` -The [Taos] structure is the connection manager in [libtaos] and provides two main APIs. +### Connection methods -1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT`, etc. +The [Taos] connection struct provides several APIs for convenient use. + +1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT` etc. and return affected rows (only meaningful to `INSERT`). + + ```rust + let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?; + ``` + +2. `exec_many`: You can execute many SQL statements in order with `exec_many` method. ```rust - taos.exec().await? + taos.exec_many([ + "CREATE DATABASE test", + "USE test", + "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)", + ]).await?; ``` -2. `query`: Execute the query statement and return the [TaosQueryData] object. +3. `query`: Execute the query statement and return the [ResultSet] object. ```rust - let q = taos.query("select * from log.logs").await? + let mut q = taos.query("select * from log.logs").await? ``` - The [TaosQueryData] object stores the query result data and basic information about the returned columns (column name, type, length). + The [ResultSet] object stores the query result data and basic information about the returned columns (column name, type, length). - Column information is stored using [ColumnMeta]. + Get filed information with `fields` method. ```rust - let cols = &q.column_meta; + let cols = q.fields(); for col in cols { - println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes); + println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes()); } ``` - It fetches data line by line. + Users could fetch data by rows. ```rust - for (i, row) in q.rows.iter().enumerate() { - for (j, cell) in row.iter().enumerate() { - println!("cell({}, {}) data: {}", i, j, cell); + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); } + nrows += 1; + } + ``` + + Or use it with [serde](https://serde.rs) deserialization. + + ```rust + #[derive(Debug, Deserialize)] + struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, } + + let records: Vec = taos + .query("select * from `meters`") + .await? + .deserialize() + .try_collect() + .await?; ``` Note that Rust asynchronous functions and an asynchronous runtime are required. @@ -275,110 +365,152 @@ Note that Rust asynchronous functions and an asynchronous runtime are required. - `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement. - `.use_database(database: &str)`: Executes the `USE` statement. -In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage. - -### Bind Interface +### Bind API -Similar to the C interface, Rust provides the bind interface's wrapping. First, create a bind object [Stmt] for a SQL command from the [Taos] object. +Similar to the C interface, Rust provides the bind interface's wrapping. First, create a bind object [Stmt] for a SQL command with the [Taos] object. ```rust -let mut stmt: Stmt = taos.stmt("insert into ? values(? ,?)") ? ; +let mut stmt = Stmt::init(&taos).await?; +stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; ``` The bind object provides a set of interfaces for implementing parameter binding. -##### `.set_tbname(tbname: impl ToCString)` +#### `.set_tbname(name)` To bind table names. -##### `.set_tbname_tags(tbname: impl ToCString, tags: impl IntoParams)` +```rust +let mut stmt = taos.stmt("insert into ? values(? ,?)")?; +stmt.set_tbname("d0")?; +``` + +#### `.set_tags(&[tag])` -Bind sub-table table names and tag values when the SQL statement uses a super table. +Bind tag values when the SQL statement uses a super table. ```rust -let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)") ? ; -// tags can be created with any supported type, here is an example using JSON -let v = Field::Json(serde_json::from_str("{\"tag1\":\"one, two, three, four, five, six, seven, eight, nine, ten\"}").unwrap()); -stmt.set_tbname_tags("tb0", [&tag])? ; +let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?; +stmt.set_tbname("d0")?; +stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; ``` -##### `.bind(params: impl IntoParams)` +#### `.bind(&[column])` -Bind value types. Use the [Field] structure to construct the desired type and bind. +Bind value types. Use the [ColumnView] structure to construct the desired type and bind. ```rust -let ts = Field::Timestamp(Timestamp::now()); -let value = Field::Float(0.0); -stmt.bind(vec![ts, value].iter())? ; +let params = vec![ + ColumnView::from_millis_timestamp(vec![164000000000]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), +]; +let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; ``` -##### `.execute()` +#### `.execute()` -Execute SQL.[Stmt] objects can be reused, re-binded, and executed after execution. +Execute to insert all bind records. [Stmt] objects can be reused, re-bind, and executed after execution. Remember to call `add_batch` before `execute`. ```rust -stmt.execute()? ; +stmt.add_batch()?.execute()?; // next bind cycle. // stmt.set_tbname()? ; //stmt.bind()? ; -//stmt.execute()? ; +//stmt.add_batch().execute()? ; ``` -### Line protocol interface +A runnable example for bind can be found [here](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs). -The line protocol interface supports multiple modes and different precision and requires the introduction of constants in the schemaless module to set. +### Subscription API + +Users can subscribe a [TOPIC](../../../taos-sql/tmq/) with TMQ(the TDengine Message Queue) API. + +Start from a TMQ builder: ```rust -use libtaos::*; -use libtaos::schemaless::*; +let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; ``` -- InfluxDB line protocol +Build a consumer: - ```rust - let lines = [ - "st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"pass\",c2=false 1626006833639000000" - "st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"abc\",c4=4f64 1626006833639000000" - ]; - taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANOSECONDS)? ; - ``` +```rust +let mut consumer = tmq.build()?; +``` -- OpenTSDB Telnet Protocol +Subscribe a topic: - ```rust - let lines = ["sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0"]; - taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)? ; - ``` +```rust +consumer.subscribe(["tmq_meters"]).await?; +``` -- OpenTSDB JSON protocol +Consume messages, and commit the offset for each message. - ```rust - let lines = [r#" - { - "metric": "st", - "timestamp": 1626006833, - "value": 10, - "tags": { - "t1": true, - "t2": false, - "t3": 10, - "t4": "123_abc_.! @#$%^&*:;,. /? |+-=()[]{}<>" +```rust +{ + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); } - }"#]; - taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)? ; - ``` + } + consumer.commit(offset).await?; + } +} +``` + +Unsubscribe: + +```rust +consumer.unsubscribe().await; +``` + +In TMQ DSN, you must choose to subscribe with a group id. Also, there's several options could be set: + +- `group.id`: **Required**, a group id is any visible string you set. +- `client.id`: a optional client description string. +- `auto.offset.reset`: choose to subscribe from *earliest* or *latest*, default is *none* which means 'earliest'. +- `enable.auto.commit`: automatically commit with specified time interval. By default - in the recommended way _ you must use `commit` to ensure that you've consumed the messages correctly, otherwise, consumers will received repeated messages when re-subscribe. +- `auto.commit.interval.ms`: the auto commit interval in milliseconds. + +Check the whole subscription example at [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs). -Please move to the Rust documentation hosting page for other related structure API usage instructions: . +Please move to the Rust documentation hosting page for other related structure API usage instructions: . -[libtaos]: https://github.com/taosdata/libtaos-rs -[tdengine]: https://github.com/taosdata/TDengine -[bailongma-rs]: https://github.com/taosdata/bailongma-rs +[TDengine]: https://github.com/taosdata/TDengine [r2d2]: https://crates.io/crates/r2d2 -[demo.rs]: https://github.com/taosdata/libtaos-rs/blob/main/examples/demo.rs -[TaosCfgBuilder]: https://docs.rs/libtaos/latest/libtaos/struct.TaosCfgBuilder.html -[TaosCfg]: https://docs.rs/libtaos/latest/libtaos/struct.TaosCfg.html -[Taos]: https://docs.rs/libtaos/latest/libtaos/struct.Taos.html -[TaosQueryData]: https://docs.rs/libtaos/latest/libtaos/field/struct.TaosQueryData.html -[Field]: https://docs.rs/libtaos/latest/libtaos/field/enum.Field.html -[Stmt]: https://docs.rs/libtaos/latest/libtaos/stmt/struct.Stmt.html +[Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html +[ResultSet]: https://docs.rs/taos/latest/taos/struct.ResultSet.html +[Value]: https://docs.rs/taos/latest/taos/enum.Value.html +[Stmt]: https://docs.rs/taos/latest/taos/stmt/struct.Stmt.html +[taos]: https://crates.io/crates/taos diff --git a/docs/examples/rust/Cargo.toml b/docs/examples/rust/Cargo.toml index 114407e69e..136d09ffbb 100644 --- a/docs/examples/rust/Cargo.toml +++ b/docs/examples/rust/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["restexample", "nativeexample", "schemalessexample"] +members = ["restexample", "nativeexample"] diff --git a/docs/examples/rust/nativeexample/Cargo.toml b/docs/examples/rust/nativeexample/Cargo.toml index 64fd10a3e9..cdf739d357 100644 --- a/docs/examples/rust/nativeexample/Cargo.toml +++ b/docs/examples/rust/nativeexample/Cargo.toml @@ -5,6 +5,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -libtaos = { version = "0.4.3" } -tokio = { version = "*", features = ["rt", "macros", "rt-multi-thread"] } -bstr = { version = "*" } +anyhow = "1" +chrono = "0.4" +serde = { version = "1", features = ["derive"] } +tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } + +taos = { version = "0.*" } diff --git a/docs/examples/rust/nativeexample/examples/connect.rs b/docs/examples/rust/nativeexample/examples/connect.rs index 8e27458de5..fb226d8710 100644 --- a/docs/examples/rust/nativeexample/examples/connect.rs +++ b/docs/examples/rust/nativeexample/examples/connect.rs @@ -1,19 +1,9 @@ -use libtaos::*; +use taos::*; -fn taos_connect() -> Result { - TaosCfgBuilder::default() - .ip("localhost") - .user("root") - .pass("taosdata") - // .db("log") // remove comment if you want to connect to database log by default. - .port(6030u16) - .build() - .expect("TaosCfg builder error") - .connect() -} - -fn main() { +#[tokio::main] +async fn main() -> Result<(), Error> { #[allow(unused_variables)] - let taos = taos_connect().unwrap(); - println!("Connected") + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + println!("Connected"); + Ok(()) } diff --git a/docs/examples/rust/nativeexample/examples/stmt_example.rs b/docs/examples/rust/nativeexample/examples/stmt_example.rs index 190f8c1ef6..26084746f2 100644 --- a/docs/examples/rust/nativeexample/examples/stmt_example.rs +++ b/docs/examples/rust/nativeexample/examples/stmt_example.rs @@ -1,38 +1,40 @@ -use bstr::BString; -use libtaos::*; +use taos::*; #[tokio::main] -async fn main() -> Result<(), Error> { - let taos = TaosCfg::default().connect().expect("fail to connect"); +async fn main() -> anyhow::Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build()?; taos.create_database("power").await?; taos.use_database("power").await?; - taos.exec("CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?; - let mut stmt = taos.stmt("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; + taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?; + + let mut stmt = Stmt::init(&taos)?; + stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; // bind table name and tags stmt.set_tbname_tags( "d1001", - [ - Field::Binary(BString::from("California.SanFrancisco")), - Field::Int(2), - ], + &[Value::VarChar("San Fransico".into()), Value::Int(2)], )?; // bind values. let values = vec![ - Field::Timestamp(Timestamp::new(1648432611249, TimestampPrecision::Milli)), - Field::Float(10.3), - Field::Int(219), - Field::Float(0.31), + ColumnView::from_millis_timestamp(vec![1648432611249]), + ColumnView::from_floats(vec![10.3]), + ColumnView::from_ints(vec![219]), + ColumnView::from_floats(vec![0.31]), ]; stmt.bind(&values)?; // bind one more row let values2 = vec![ - Field::Timestamp(Timestamp::new(1648432611749, TimestampPrecision::Milli)), - Field::Float(12.6), - Field::Int(218), - Field::Float(0.33), + ColumnView::from_millis_timestamp(vec![1648432611749]), + ColumnView::from_floats(vec![12.6]), + ColumnView::from_ints(vec![218]), + ColumnView::from_floats(vec![0.33]), ]; stmt.bind(&values2)?; - // execute - stmt.execute()?; + + stmt.add_batch()?; + + // execute. + let rows = stmt.execute()?; + assert_eq!(rows, 2); Ok(()) } diff --git a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs index 4febff9be7..7e0a347948 100644 --- a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs +++ b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs @@ -1,3 +1,101 @@ -fn main() { - -} \ No newline at end of file +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +// Query options 2, use deserialization with serde. +#[derive(Debug, serde::Deserialize)] +#[allow(dead_code)] +struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, +} + +async fn prepare(taos: Taos) -> anyhow::Result<()> { + let inserted = taos.exec_many([ + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + assert_eq!(inserted, 6); + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + let db = "tmq"; + + // prepare database + taos.exec_many([ + format!("DROP TOPIC IF EXISTS tmq_meters"), + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + // create super table + format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"), + // create topic for subscription + format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") + ]) + .await?; + + let task = tokio::spawn(prepare(taos)); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // subscribe + let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; + + let mut consumer = tmq.build()?; + consumer.subscribe(["tmq_meters"]).await?; + + { + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } + } + + consumer.unsubscribe().await; + + task.await??; + + Ok(()) +} diff --git a/docs/examples/rust/restexample/Cargo.toml b/docs/examples/rust/restexample/Cargo.toml index a5f89f8a3b..5fffe215d4 100644 --- a/docs/examples/rust/restexample/Cargo.toml +++ b/docs/examples/rust/restexample/Cargo.toml @@ -4,5 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] -libtaos = { version = "0.4.3", features = ["rest"] } -tokio = { version = "*", features = ["rt", "macros", "rt-multi-thread"] } +anyhow = "1" +chrono = "0.4" +serde = { version = "1", features = ["derive"] } +tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } + +taos = { version = "0.*" } diff --git a/docs/examples/rust/restexample/examples/connect.rs b/docs/examples/rust/restexample/examples/connect.rs index b3718342c4..fb226d8710 100644 --- a/docs/examples/rust/restexample/examples/connect.rs +++ b/docs/examples/rust/restexample/examples/connect.rs @@ -1,20 +1,9 @@ -use libtaos::*; - -fn taos_connect() -> Result { - TaosCfgBuilder::default() - .ip("localhost") - .user("root") - .pass("taosdata") - // .db("log") // remove comment if you want to connect to database log by default. - .port(6030u16) - .build() - .expect("TaosCfg builder error") - .connect() -} +use taos::*; #[tokio::main] -async fn main() { +async fn main() -> Result<(), Error> { #[allow(unused_variables)] - let taos = taos_connect().expect("connect error"); - println!("Connected") + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + println!("Connected"); + Ok(()) } diff --git a/docs/examples/rust/restexample/examples/insert_example.rs b/docs/examples/rust/restexample/examples/insert_example.rs index 9261536f62..27b2bb4788 100644 --- a/docs/examples/rust/restexample/examples/insert_example.rs +++ b/docs/examples/rust/restexample/examples/insert_example.rs @@ -1,18 +1,29 @@ -use libtaos::*; +use taos::*; #[tokio::main] -async fn main() -> Result<(), Error> { - let taos = TaosCfg::default().connect().expect("fail to connect"); - taos.create_database("power").await?; - taos.exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?; - let sql = "INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) - power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) - power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) - power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)"; - let result = taos.query(sql).await?; - println!("{:?}", result); +async fn main() -> anyhow::Result<()> { + let dsn = "ws://"; + let taos = TaosBuilder::from_dsn(dsn)?.build()?; + + + taos.exec_many([ + "DROP DATABASE IF EXISTS power", + "CREATE DATABASE power", + "USE power", + "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)" + ]).await?; + + let inserted = taos.exec("INSERT INTO + power.d1001 USING power.meters TAGS('San Francisco', 2) + VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) + ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) + power.d1002 USING power.meters TAGS('San Francisco', 3) + VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) + power.d1003 USING power.meters TAGS('Los Angeles', 2) + VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) + power.d1004 USING power.meters TAGS('Los Angeles', 3) + VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)").await?; + + assert_eq!(inserted, 8); Ok(()) } - -// output: -// TaosQueryData { column_meta: [ColumnMeta { name: "affected_rows", type_: Int, bytes: 4 }], rows: [[Int(8)]] } diff --git a/docs/examples/rust/restexample/examples/query_example.rs b/docs/examples/rust/restexample/examples/query_example.rs index bbe0cfaabf..1a11401bba 100644 --- a/docs/examples/rust/restexample/examples/query_example.rs +++ b/docs/examples/rust/restexample/examples/query_example.rs @@ -1,39 +1,25 @@ -use libtaos::*; +use taos::sync::*; -fn taos_connect() -> Result { - TaosCfgBuilder::default() - .ip("localhost") - .user("root") - .pass("taosdata") - .db("power") - .port(6030u16) - .build() - .expect("TaosCfg builder error") - .connect() -} - -#[tokio::main] -async fn main() -> Result<(), Error> { - let taos = taos_connect().expect("connect error"); - let result = taos.query("SELECT ts, current FROM meters LIMIT 2").await?; +fn main() -> anyhow::Result<()> { + let taos = TaosBuilder::from_dsn("ws:///power")?.build()?; + let mut result = taos.query("SELECT ts, current FROM meters LIMIT 2")?; // print column names - let meta: Vec = result.column_meta; - for column in meta { - print!("{}\t", column.name) - } - println!(); + let meta = result.fields(); + println!("{}", meta.iter().map(|field| field.name()).join("\t")); + // print rows - let rows: Vec> = result.rows; + let rows = result.rows(); for row in rows { - for field in row { - print!("{}\t", field); + let row = row?; + for (_name, value) in row { + print!("{}\t", value); } println!(); } Ok(()) } -// output: +// output(suppose you are in +8 timezone): // ts current -// 2022-03-28 09:56:51.249 10.3 -// 2022-03-28 09:56:51.749 12.6 +// 2018-10-03T14:38:05+08:00 10.3 +// 2018-10-03T14:38:15+08:00 12.6 diff --git a/docs/examples/rust/schemalessexample/Cargo.toml b/docs/examples/rust/schemalessexample/Cargo.toml deleted file mode 100644 index 32c6a12231..0000000000 --- a/docs/examples/rust/schemalessexample/Cargo.toml +++ /dev/null @@ -1,7 +0,0 @@ -[package] -name = "schemalessexample" -version = "0.1.0" -edition = "2021" - -[dependencies] -libtaos = { version = "0.4.3" } diff --git a/docs/examples/rust/schemalessexample/examples/influxdb_line_example.rs b/docs/examples/rust/schemalessexample/examples/influxdb_line_example.rs deleted file mode 100644 index 64d1a3c9ac..0000000000 --- a/docs/examples/rust/schemalessexample/examples/influxdb_line_example.rs +++ /dev/null @@ -1,22 +0,0 @@ -use libtaos::schemaless::*; -use libtaos::*; - -fn main() { - let taos = TaosCfg::default().connect().expect("fail to connect"); - taos.raw_query("CREATE DATABASE test").unwrap(); - taos.raw_query("USE test").unwrap(); - let lines = ["meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249", - "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250", - "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249", - "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"]; - let affected_rows = taos - .schemaless_insert( - &lines, - TSDB_SML_LINE_PROTOCOL, - TSDB_SML_TIMESTAMP_MILLISECONDS, - ) - .unwrap(); - println!("affected_rows={}", affected_rows); -} - -// run with: cargo run --example influxdb_line_example diff --git a/docs/examples/rust/schemalessexample/examples/opentsdb_json_example.rs b/docs/examples/rust/schemalessexample/examples/opentsdb_json_example.rs deleted file mode 100644 index e616915967..0000000000 --- a/docs/examples/rust/schemalessexample/examples/opentsdb_json_example.rs +++ /dev/null @@ -1,25 +0,0 @@ -use libtaos::schemaless::*; -use libtaos::*; - -fn main() { - let taos = TaosCfg::default().connect().expect("fail to connect"); - taos.raw_query("CREATE DATABASE test").unwrap(); - taos.raw_query("USE test").unwrap(); - let lines = [ - r#"[{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, - {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, - {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, - {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#, - ]; - - let affected_rows = taos - .schemaless_insert( - &lines, - TSDB_SML_JSON_PROTOCOL, - TSDB_SML_TIMESTAMP_NOT_CONFIGURED, - ) - .unwrap(); - println!("affected_rows={}", affected_rows); // affected_rows=4 -} - -// run with: cargo run --example opentsdb_json_example diff --git a/docs/examples/rust/schemalessexample/examples/opentsdb_telnet_example.rs b/docs/examples/rust/schemalessexample/examples/opentsdb_telnet_example.rs deleted file mode 100644 index c8cab7655a..0000000000 --- a/docs/examples/rust/schemalessexample/examples/opentsdb_telnet_example.rs +++ /dev/null @@ -1,28 +0,0 @@ -use libtaos::schemaless::*; -use libtaos::*; - -fn main() { - let taos = TaosCfg::default().connect().expect("fail to connect"); - taos.raw_query("CREATE DATABASE test").unwrap(); - taos.raw_query("USE test").unwrap(); - let lines = [ - "meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2", - "meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2", - "meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3", - "meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3", - "meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2", - "meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2", - "meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3", - "meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3", - ]; - let affected_rows = taos - .schemaless_insert( - &lines, - TSDB_SML_TELNET_PROTOCOL, - TSDB_SML_TIMESTAMP_NOT_CONFIGURED, - ) - .unwrap(); - println!("affected_rows={}", affected_rows); // affected_rows=8 -} - -// run with: cargo run --example opentsdb_telnet_example diff --git a/docs/examples/rust/schemalessexample/src/main.rs b/docs/examples/rust/schemalessexample/src/main.rs deleted file mode 100644 index e7a11a969c..0000000000 --- a/docs/examples/rust/schemalessexample/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/docs/zh/14-reference/03-connector/rust.mdx b/docs/zh/14-reference/03-connector/rust.mdx index 25a8409b6e..cc2a9986e9 100644 --- a/docs/zh/14-reference/03-connector/rust.mdx +++ b/docs/zh/14-reference/03-connector/rust.mdx @@ -10,222 +10,267 @@ import TabItem from '@theme/TabItem'; import Preparition from "./_preparition.mdx" import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx" -import RustInfluxLine from "../../07-develop/03-insert-data/_rust_line.mdx" -import RustOpenTSDBTelnet from "../../07-develop/03-insert-data/_rust_opts_telnet.mdx" -import RustOpenTSDBJson from "../../07-develop/03-insert-data/_rust_opts_json.mdx" +import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx" import RustQuery from "../../07-develop/04-query-data/_rust.mdx" -[![Crates.io](https://img.shields.io/crates/v/libtaos)](https://crates.io/crates/libtaos) ![Crates.io](https://img.shields.io/crates/d/libtaos) [![docs.rs](https://img.shields.io/docsrs/libtaos)](https://docs.rs/libtaos) +[![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos) -`libtaos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。 +`taos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。 -`libtaos` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 **REST 连接**,它通过 taosAdapter 的 REST 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 features)” 来指定使用哪种连接器。REST 连接支持任何平台,但原生连接支持所有 TDengine 客户端能运行的平台。 +`taos` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 **Websocket 连接**,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 `features`)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。 -`libtaos` 的源码托管在 [GitHub](https://github.com/taosdata/libtaos-rs)。 +该 Rust 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-rust)。 ## 支持的平台 原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 -REST 连接支持所有能运行 Rust 的平台。 +Websocket 连接支持所有能运行 Rust 的平台。 ## 版本支持 请参考[版本支持列表](/reference/connector#版本支持) -Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 2.4 版本以上的 TDengine,以避免已知问题。 +Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。 ## 安装 ### 安装前准备 + * 安装 Rust 开发工具链 * 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动) -### 添加 libtaos 依赖 +### 添加 taos 依赖 -根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [libtaos][libtaos] 依赖: +根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖: - - + + -在 `Cargo.toml` 文件中添加 [libtaos][libtaos]: +在 `Cargo.toml` 文件中添加 [taos][taos]: ```toml [dependencies] # use default feature -libtaos = "*" +taos = "*" ``` - - + + -在 `Cargo.toml` 文件中添加 [libtaos][libtaos],并启用 `rest` 特性。 +在 `Cargo.toml` 文件中添加 [taos][taos]: ```toml [dependencies] -# use rest feature -libtaos = { version = "*", features = ["rest"]} +taos = { version = "*", default-features = false, features = ["native"] } ``` - - + -### 使用连接池 - -请在 `Cargo.toml` 中启用 `r2d2` 特性。 +在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。 ```toml [dependencies] -# with taosc -libtaos = { version = "*", features = ["r2d2"] } -# or rest -libtaos = { version = "*", features = ["rest", "r2d2"] } +taos = { version = "*", default-features = false, features = ["ws"] } ``` + + + ## 建立连接 -[TaosCfgBuilder] 为使用者提供构造器形式的 API,以便于后续创建连接或使用连接池。 +[TaosBuilder] 通过 DSN 连接描述字符串创建一个连接构造器。 ```rust -let cfg: TaosCfg = TaosCfgBuilder::default() - .ip("127.0.0.1") - .user("root") - .pass("taosdata") - .db("log") // do not set if not require a default database. - .port(6030u16) - .build() - .expect("TaosCfg builder error"); -} +let builder = TaosBuilder::from_dsn("taos://")?; ``` 现在您可以使用该对象创建连接: ```rust -let conn = cfg.connect()?; +let conn = builder.build()?; ``` 连接对象可以创建多个: ```rust -let conn = cfg.connect()?; -let conn2 = cfg.connect()?; +let conn1 = builder.build()?; +let conn2 = builder.build()?; ``` -可以在应用中使用连接池: +DSN 描述字符串基本结构如下: + +```text +[+]://[[:@]:][/][?=[&=]] +|------|------------|---|-----------|-----------|------|------|------------|-----------------------| +|driver| protocol | | username | password | host | port | database | params | +``` + +各部分意义见下表: + +- **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名: + - **taos**: 表名使用 TDengine 连接器驱动。 + - **tmq**: 使用 TMQ 订阅数据。 + - **http/ws**: 使用 Websocket 创建连接。 + - **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。 +- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。 +- **username/password**: 用于创建连接的用户名及密码。 +- **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`,Websocket 连接默认为 `localhost:6041` 。 +- **database**: 指定默认连接的数据库名。 +- **params**:其他可选参数。 + +一个完整的 DSN 描述字符串示例如下: + +```text +taos+ws://localhost:6041/test +``` + +表示使用 Websocket(`ws`)方式通过 `6041` 端口连接服务器 `localhost`,并指定默认数据库为 `test`。 + +这使得用户可以通过 DSN 指定连接方式: ```rust -let pool = r2d2::Pool::builder() - .max_size(10000) // max connections - .build(cfg)?; +use taos::*; + +// use native protocol. +let builder = TaosBuilder::from_dsn("taos://localhost:6030")?; +let conn1 = builder.build(); -// ... -// Use pool to get connection -let conn = pool.get()?; +// use websocket protocol. +let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?; ``` -之后您可以对数据库进行相关操作: +建立连接后,您可以进行相关数据库操作: ```rust -async fn demo() -> Result<(), Error> { - // get connection ... - - // create database - conn.exec("create database if not exists demo").await?; - // change database context - conn.exec("use demo").await?; - // create table - conn.exec("create table if not exists tb1 (ts timestamp, v int)").await?; - // insert - conn.exec("insert into tb1 values(now, 1)").await?; - // query - let rows = conn.query("select * from tb1").await?; - for row in rows.rows { - println!("{}", row.into_iter().join(",")); +async fn demo(taos: &Taos, db: &str) -> Result<(), Error> { + // prepare database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + let inserted = taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ + TAGS (`groupid` INT, `location` BINARY(16))", + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + + assert_eq!(inserted, 6); + let mut result = taos.query("select * from `meters`").await?; + + for field in result.fields() { + println!("got field: {}", field.name()); } + + let values = result. } ``` -## 使用示例 +查询数据可以通过两种方式:使用内建类型或 [serde](https://serde.rs) 序列化框架。 -### 写入数据 +```rust + // Query option 1, use rows stream. + let mut rows = result.rows(); + while let Some(row) = rows.try_next().await? { + for (name, value) in row { + println!("got value of {}: {}", name, value); + } + } -#### SQL 写入 + // Query options 2, use deserialization with serde. + #[derive(Debug, serde::Deserialize)] + #[allow(dead_code)] + struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, + } - + let records: Vec = taos + .query("select * from `meters`") + .await? + .deserialize() + .try_collect() + .await?; + + dbg!(records); + Ok(()) +``` -#### InfluxDB 行协议写入 +## 使用示例 - +### 写入数据 -#### OpenTSDB Telnet 行协议写入 +#### SQL 写入 - + -#### OpenTSDB JSON 行协议写入 +#### STMT 写入 - + ### 查询数据 -### 更多示例程序 - -| 程序路径 | 程序说明 | -| -------------- | ----------------------------------------------------------------------------- | -| [demo.rs] | 基本API 使用示例 | -| [bailongma-rs] | 使用 TDengine 作为存储后端的 Prometheus 远程存储 API 适配器,使用 r2d2 连接池 | - ## API 参考 -### 连接构造器 API - -[Builder Pattern](https://doc.rust-lang.org/1.0.0/style/ownership/builders.html) 构造器模式是 Rust 处理复杂数据类型或可选配置类型的解决方案。[libtaos] 实现中,使用连接构造器 [TaosCfgBuilder] 作为 TDengine Rust 连接器的入口。[TaosCfgBuilder] 提供对服务器、端口、数据库、用户名和密码等的可选配置。 +### 连接构造器 -使用 `default()` 方法可以构建一个默认参数的 [TaosCfg],用于后续连接数据库或建立连接池。 +通过 DSN 来构建一个连接器构造器。 ```rust -let cfg = TaosCfgBuilder::default().build()?; +let cfg = TaosBuilder::default().build()?; ``` -使用构造器模式,用户可按需设置: +使用 `builder` 对象创建多个连接: ```rust -let cfg = TaosCfgBuilder::default() - .ip("127.0.0.1") - .user("root") - .pass("taosdata") - .db("log") - .port(6030u16) - .build()?; -``` - -使用 [TaosCfg] 对象创建 TDengine 连接: - -```rust -let conn: Taos = cfg.connect(); +let conn: Taos = cfg.build(); ``` ### 连接池 -在复杂应用中,建议启用连接池。[libtaos] 的连接池使用 [r2d2] 实现。 +在复杂应用中,建议启用连接池。[taos] 的连接池使用 [r2d2] 实现。 如下,可以生成一个默认参数的连接池。 ```rust -let pool = r2d2::Pool::new(cfg)?; +let pool = TaosBuilder::from_dsn(dsn)?.pool()?; ``` 同样可以使用连接池的构造器,对连接池参数进行设置: ```rust - use std::time::Duration; - let pool = r2d2::Pool::builder() - .max_size(5000) // max connections - .max_lifetime(Some(Duration::from_minutes(100))) // lifetime of each connection - .min_idle(Some(1000)) // minimal idle connections - .connection_timeout(Duration::from_minutes(2)) - .build(cfg); +let dsn = "taos://localhost:6030"; + +let opts = PoolBuilder::new() + .max_size(5000) // max connections + .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection + .min_idle(Some(1000)) // minimal idle connections + .connection_timeout(Duration::from_secs(2)); + +let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?; ``` 在应用代码中,使用 `pool.get()?` 来获取一个连接对象 [Taos]。 @@ -236,44 +281,85 @@ let taos = pool.get()?; ### 连接 -[Taos] 结构体是 [libtaos] 中的连接管理者,主要提供了两个 API: +[Taos][struct.Taos] 对象提供了多个数据库操作的 API: 1. `exec`: 执行某个非查询类 SQL 语句,例如 `CREATE`,`ALTER`,`INSERT` 等。 ```rust - taos.exec().await?; + let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?; + ``` + +2. `exec_many`: 同时(顺序)执行多个 SQL 语句。 + + ```rust + taos.exec_many([ + "CREATE DATABASE test", + "USE test", + "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)", + ]).await?; ``` -2. `query`:执行查询语句,返回 [TaosQueryData] 对象。 +3. `query`:执行查询语句,返回 [ResultSet] 对象。 ```rust - let q = taos.query("select * from log.logs").await?; + let mut q = taos.query("select * from log.logs").await?; ``` - [TaosQueryData] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度): + [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度): - 列信息使用 [ColumnMeta] 存储: + 列信息使用 [.fields()] 方法获取: ```rust - let cols = &q.column_meta; + let cols = q.fields(); for col in cols { - println!("name: {}, type: {:?}, bytes: {}", col.name, col.type_, col.bytes); + println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes()); } ``` 逐行获取数据: ```rust - for (i, row) in q.rows.iter().enumerate() { - for (j, cell) in row.iter().enumerate() { - println!("cell({}, {}) data: {}", i, j, cell); + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); } + nrows += 1; + } + ``` + + 或使用 [serde](https://serde.rs) 序列化框架。 + + ```rust + #[derive(Debug, Deserialize)] + struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, } + + let records: Vec = taos + .query("select * from `meters`") + .await? + .deserialize() + .try_collect() + .await?; ``` 需要注意的是,需要使用 Rust 异步函数和异步运行时。 -[Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率: +[Taos][struct.Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率: - `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。 - `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。 @@ -283,42 +369,61 @@ let taos = pool.get()?; ### 参数绑定接口 -与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]: +与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]: ```rust -let mut stmt: Stmt = taos.stmt("insert into ? values(?,?)")?; +let mut stmt = Stmt::init(&taos).await?; +stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?; ``` 参数绑定对象提供了一组接口用于实现参数绑定: -##### `.set_tbname(tbname: impl ToCString)` +#### `.set_tbname(name)` 用于绑定表名。 -##### `.set_tbname_tags(tbname: impl ToCString, tags: impl IntoParams)` +```rust +let mut stmt = taos.stmt("insert into ? values(? ,?)")?; +stmt.set_tbname("d0")?; +``` + +#### `.set_tags(&[tag])` 当 SQL 语句使用超级表时,用于绑定子表表名和标签值: ```rust -let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(?,?)")?; -// tags can be created with any supported type, here is an example using JSON -let v = Field::Json(serde_json::from_str("{\"tag1\":\"一二三四五六七八九十\"}").unwrap()); -stmt.set_tbname_tags("tb0", [&tag])?; +let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?; +stmt.set_tbname("d0")?; +stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; ``` -##### `.bind(params: impl IntoParams)` +#### `.bind(&[column])` -用于绑定值类型。使用 [Field] 结构体构建需要的类型并绑定: +用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定: ```rust -let ts = Field::Timestamp(Timestamp::now()); -let value = Field::Float(0.0); -stmt.bind(vec![ts, value].iter())?; +let params = vec![ + ColumnView::from_millis_timestamp(vec![164000000000]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), +]; +let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; ``` -##### `.execute()` +#### `.execute()` -执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。 +执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。 ```rust stmt.execute()?; @@ -329,60 +434,84 @@ stmt.execute()?; //stmt.execute()?; ``` -### 行协议接口 +一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。 + +### 订阅 -行协议接口支持多种模式和不同精度,需要引入 schemaless 模块中的常量以进行设置: +TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。 + +从 DSN 开始,构建一个 TMQ 连接器。 ```rust -use libtaos::*; -use libtaos::schemaless::*; +let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; ``` -- InfluxDB 行协议 +创建消费者: - ```rust - let lines = [ - "st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"pass\",c2=false 1626006833639000000" - "st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"abc\",c4=4f64 1626006833639000000" - ]; - taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANOSECONDS)?; - ``` +```rust +let mut consumer = tmq.build()?; +``` -- OpenTSDB Telnet 协议 +消费者可订阅一个或多个 `TOPIC`。 - ```rust - let lines = ["sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0"]; - taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)?; - ``` +```rust +consumer.subscribe(["tmq_meters"]).await?; +``` -- OpenTSDB JSON 协议 +TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。 - ```rust - let lines = [r#" - { - "metric": "st", - "timestamp": 1626006833, - "value": 10, - "tags": { - "t1": true, - "t2": false, - "t3": 10, - "t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>" +```rust +{ + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); } - }"#]; - taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)?; - ``` + } + consumer.commit(offset).await?; + } +} +``` + +停止订阅: + +```rust +consumer.unsubscribe().await; +``` + +对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。 + +- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。 +- `client.id`: 可选的订阅客户端识别项。 +- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。 +- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。 +- `auto.commit.interval.ms`: 自动标记的时间间隔。 + +完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs). -其他相关结构体 API 使用说明请移步 Rust 文档托管网页:。 +其他相关结构体 API 使用说明请移步 Rust 文档托管网页:。 -[libtaos]: https://github.com/taosdata/libtaos-rs -[tdengine]: https://github.com/taosdata/TDengine -[bailongma-rs]: https://github.com/taosdata/bailongma-rs +[taos]: https://github.com/taosdata/rust-connector-taos [r2d2]: https://crates.io/crates/r2d2 -[demo.rs]: https://github.com/taosdata/libtaos-rs/blob/main/examples/demo.rs -[TaosCfgBuilder]: https://docs.rs/libtaos/latest/libtaos/struct.TaosCfgBuilder.html -[TaosCfg]: https://docs.rs/libtaos/latest/libtaos/struct.TaosCfg.html -[Taos]: https://docs.rs/libtaos/latest/libtaos/struct.Taos.html -[TaosQueryData]: https://docs.rs/libtaos/latest/libtaos/field/struct.TaosQueryData.html -[Field]: https://docs.rs/libtaos/latest/libtaos/field/enum.Field.html -[Stmt]: https://docs.rs/libtaos/latest/libtaos/stmt/struct.Stmt.html +[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html +[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html +[struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html +[Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html -- GitLab