diff --git a/contrib/cJson b/contrib/cJson
new file mode 160000
index 0000000000000000000000000000000000000000..d348621ca93571343a56862df7de4ff3bc9b5667
--- /dev/null
+++ b/contrib/cJson
@@ -0,0 +1 @@
+Subproject commit d348621ca93571343a56862df7de4ff3bc9b5667
diff --git a/contrib/libuv b/contrib/libuv
new file mode 160000
index 0000000000000000000000000000000000000000..0c1fa696aa502eb749c2c4735005f41ba00a27b8
--- /dev/null
+++ b/contrib/libuv
@@ -0,0 +1 @@
+Subproject commit 0c1fa696aa502eb749c2c4735005f41ba00a27b8
diff --git a/contrib/lz4 b/contrib/lz4
new file mode 160000
index 0000000000000000000000000000000000000000..d44371841a2f1728a3f36839fd4b7e872d0927d3
--- /dev/null
+++ b/contrib/lz4
@@ -0,0 +1 @@
+Subproject commit d44371841a2f1728a3f36839fd4b7e872d0927d3
diff --git a/contrib/zlib b/contrib/zlib
new file mode 160000
index 0000000000000000000000000000000000000000..cacf7f1d4e3d44d871b605da3b647f07d718623f
--- /dev/null
+++ b/contrib/zlib
@@ -0,0 +1 @@
+Subproject commit cacf7f1d4e3d44d871b605da3b647f07d718623f
diff --git a/deps/jemalloc b/deps/jemalloc
deleted file mode 160000
index ea6b3e973b477b8061e0076bb257dbd7f3faa756..0000000000000000000000000000000000000000
--- a/deps/jemalloc
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit ea6b3e973b477b8061e0076bb257dbd7f3faa756
diff --git a/docs/en/01-index.md b/docs/en/01-index.md
index 01a8322ea044f57c5acf790d0742e6273ba0a7d6..71ab2ec318fead4e62287038fc8a0cb11538e745 100644
--- a/docs/en/01-index.md
+++ b/docs/en/01-index.md
@@ -17,17 +17,19 @@ This is the documentation structure for TDengine Cloud.
5. The [Visualization](./visual) section shows you how you can visualize the data that you store in TDengine, as well as how you can visualize and monitor the status of your TDengine Cloud instance(s) and databases.
-6. Data [Subscription](./tmq) is an advanced and useful feature of TDengine. It is similar to asynchronous publish/subscribe where a message published to a topic is immediately received by all subscribers to that topic. TDengine Subscriptions allow you to create event driven applications without having to install an external pub/sub messaging system.
+6. [Data Sharing](./data-sharing) is an advanced and useful feature of TDengine Cloud. In this section, we provide an easy way to share your data in the TDengine Cloud to others through only simple click operations.
-7. [Stream Processing](./stream) is another extremely useful feature of TDengine Cloud that obviates the need to install external stream processing systems like Kafka or Flink. TDengine's Stream Processing feature allows you to process incoming data streams in real time and push data to tables based on rules that you can define easily.
+7. Data [Subscription](./tmq) is an advanced and useful feature of TDengine. It is similar to asynchronous publish/subscribe where a message published to a topic is immediately received by all subscribers to that topic. TDengine Subscriptions allow you to create event driven applications without having to install an external pub/sub messaging system.
-8. TDengine provides sophisticated [Data Replication](./replication) features. You can replicate from Cloud to a private instance and vice versa. You can replicate between Cloud providers regardless of region and you can also replicate between edge instances and Cloud or edge instances and private centralized instances.
+8. [Stream Processing](./stream) is another extremely useful feature of TDengine Cloud that obviates the need to install external stream processing systems like Kafka or Flink. TDengine's Stream Processing feature allows you to process incoming data streams in real time and push data to tables based on rules that you can define easily.
-9. The [Developer Guide](./programming) is a must read if you are developing IoT or Big Data applications for time series data. In this section we introduce the database connection, data modeling, data ingestion, query, stream processing, cache, data subscription, user-defined functions (coming soon), and other functionality in detail. Sample code is provided for a variety of programming languages. In most cases, you can just copy and paste the sample code, make a few changes to accommodate your application, and it will work.
+9. TDengine provides sophisticated [Data Replication](./replication) features. You can replicate from Cloud to a private instance and vice versa. You can replicate between Cloud providers regardless of region and you can also replicate between edge instances and Cloud or edge instances and private centralized instances.
-10. The [TDengine SQL](./taos-sql) section provides comprehensive information about both standard SQL as well as TDengine's extensions for easy time series analysis.
+10. The [Developer Guide](./programming) is a must read if you are developing IoT or Big Data applications for time series data. In this section we introduce the database connection, data modeling, data ingestion, query, stream processing, cache, data subscription, user-defined functions (coming soon), and other functionality in detail. Sample code is provided for a variety of programming languages. In most cases, you can just copy and paste the sample code, make a few changes to accommodate your application, and it will work.
-11. The [Tools](./tools) section introduces the Taos CLI which gives you shell access to easily perform ad hoc queries on your instances and databases. Additionally, taosBenchmark is introduced. It is a tool that can help you generate large amounts of data very easily with simple configurations and test the performance of TDengine Cloud.
+11. The [TDengine SQL](./taos-sql) section provides comprehensive information about both standard SQL as well as TDengine's extensions for easy time series analysis.
+
+12. The [Tools](./tools) section introduces the Taos CLI which gives you shell access to easily perform ad hoc queries on your instances and databases. Additionally, taosBenchmark is introduced. It is a tool that can help you generate large amounts of data very easily with simple configurations and test the performance of TDengine Cloud.
diff --git a/docs/en/02-intro.md b/docs/en/02-intro.md
index dfb4995dc5c2c3736c3eed602c9a6fc29d44f8de..97343cafd1e3a37b1dfc7889ba581cb551c9657b 100644
--- a/docs/en/02-intro.md
+++ b/docs/en/02-intro.md
@@ -3,7 +3,7 @@ sidebar_label: Introduction
title: Introduction to TDengine Cloud Service
---
-TDengine Cloud, is the fast, elastic, serverless and cost effective time-series data processing service based on the popular open source time-series database, TDengine. With TDengine Cloud you get the highly optimized and purpose-built for IoT time-series platform, for which TDengine is known.
+TDengine Cloud, is the fast, elastic, serverless and cost effective time-series data processing service based on the popular open source time-series database, TDengine. With TDengine Cloud you get the highly optimized and purpose-built for IoT time-series platform, for which TDengine is known.
This section introduces the major features, competitive advantages and typical use-cases to help you get a high level overview of TDengine cloud service.
@@ -11,7 +11,7 @@ This section introduces the major features, competitive advantages and typical u
The major features are listed below:
-1. Data In
+1. Data In
- Supports [using SQL to insert](../data-in/insert-data).
- Supports [Telegraf](../data-in/telegraf/).
- Supports [Prometheus](../data-in/prometheus/).
@@ -21,13 +21,13 @@ The major features are listed below:
- Supports writing data to [Prometheus](../data-out/prometheus/).
- Supports exporting data via [data subscription](../tmq/).
3. Data Explorer: browse through databases and even run SQL queryies once you login.
-4. Visualization:
+4. Visualization:
- Supports [Grafana](../visual/grafana/)
- Supports Google data studio (to be released soon)
- - Supports Grafana cloud (to be released soon)
+ - Supports Grafana cloud (to be released soon)
6. [Stream Processing](../stream/): Not only is the continuous query is supported, but TDengine also supports event driven stream processing, so Flink or Spark is not needed for time-series data processing.
7. [Data Subscription](../tmq/): Application can subscribe a table or a set of tables. API is the same as Kafka, but you can specify filter conditions.
-8. Enterprise
+8. Enterprise
- Supports backuping data everyday.
- Supports replicating a database to another region or cloud.
- Supports VPC peering.
@@ -45,7 +45,7 @@ For more details on features, please read through the entire documentation.
By making full use of [characteristics of time series data](https://tdengine.com/tsdb/characteristics-of-time-series-data/) and its cloud native design, TDengine Cloud differentiates itself from other time series data cloud services, with the following advantages.
-- **Worry Free**: TDengine Cloud is a fast, elastic, serverless purpose built cloud platform for time-series data. It provides worry-free operations with a fully managed cloud service. You pay as you go.
+- **Worry Free**: TDengine Cloud is a fast, elastic, serverless purpose built cloud platform for time-series data. It provides worry-free operations with a fully managed cloud service. You pay as you go.
- **[Simplified Solution](https://tdengine.com/tdengine/simplified-time-series-data-solution/)**: Through built-in caching, stream processing and data subscription features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly.
@@ -78,4 +78,4 @@ On the left-hand side, there are data collection agents like OPC-UA, MQTT, Teleg
## Typical Use Cases
-As a high-performance and cloud native time-series database, TDengine's typical use case include but are not limited to IoT, Industrial Internet, Connected Vehicles, IT operation and maintenance, energy, financial markets and other fields. TDengine is a purpose-built database optimized for the characteristics of time series data. As such, it cannot be used to process data from web crawlers, social media, e-commerce, ERP, CRM and so on. More generally TDengine is not a suitable storage engine for non-time-series data.
+As a high-performance and cloud native time-series database, TDengine's typical use case include but are not limited to IoT, Industrial Internet, Connected Vehicles, IT operation and maintenance, energy, financial markets and other fields. TDengine is a purpose-built database optimized for the characteristics of time series data. As such, it cannot be used to process data from web crawlers, social media, e-commerce, ERP, CRM and so on. More generally TDengine is not a suitable storage engine for non-time-series data.
diff --git a/docs/en/11-data-sharing/index.md b/docs/en/11-data-sharing/index.md
new file mode 100644
index 0000000000000000000000000000000000000000..85dedbb6b244b5332a86c13a57afec3e7c155f5a
--- /dev/null
+++ b/docs/en/11-data-sharing/index.md
@@ -0,0 +1,436 @@
+---
+sidebar_label: Data Sharing
+title: Data Sharing
+description: Using topics to share data from TDengine.
+---
+import Tabs from "@theme/Tabs";
+import TabItem from "@theme/TabItem";
+
+The topic introduces how to share data from TDengine through the access control management of TDengine Cloud and the subscription interfaces of each supported connectors. The data owner first creates the topic through the topic wizard. Then adds the users or user groups which he wants to share the data with to the subscriber list of the topic. The subscriber of the topic can get the detail information how to access the shared data in TDengine in data subscription way. In this document we will briefly explain these main steps of data sharing.
+
+## Create Topic
+
+You can create the topic in Topics of TDengine Cloud. In the Create Topic dialog, you can choose wizard or SQL way to create the topic. In the wizard way, you need to input the topic name and select the database of the current TDengine instance. Then select the super table or specify the subquery with the super table or sub table. Also you can add fields selections or add result set and condition set for each field. In the following, you can get the detail of how to create the topic in three levels through wizard way. Additional, for SQL way, you can go to the [Data Subscription](../../tmq/) to get the details.
+
+### To Database
+
+The default selection in the Add New Topic dialog is database type. After select a database in the selection, you can click Confirm button to create a topic to a database.
+
+
+
+### To Super Table
+
+In the opened Add New Topic dialog, you can click STable type and select a specified super table from the selections. Then click Confirm button to create a topic to a super table.
+
+
+
+### With Subquery
+
+In the opened Add New Topic dialog, you can click Subquery type to show all subquery form items. The first item is Table Type and the default selection is STable. After you select or input a super table name, the following will show you all fields from the super table. You can check or uncheck each field for the sub query and also you can set the result set or condition set for each field. If you want to preview the SQL based on your chooses, click SQL Preiview to open a SQL dialog to view.
+
+
+
+You can select another Table Table Table and then select a table from the selections or input an existed table name. You can get all fields of the selected table. You can check or uncheck each field for the sub query and also you can set the result set or condition set for each field. If you want to preview the SQL based on your chooses, click SQL Preiview to open a SQL dialog to view.
+
+
+
+## Share Topic
+
+In each row of the topic list in the Topics page, you can click Share Topic action icon to the Share Topic page. Also you can directly click Share Topic tab to switch to the right location. In the Share Topic tab, you can get only one row for yourself in the Users page.
+
+### Users
+
+In the default tab Users of the Share Topic page, you can click **Add Users** button to add more users who are active in the current organization. In the opened Add New Users dialog, you can select the new users who you want to share the topic with. Then you can set the expired time for the sharing to these users.
+
+
+
+
+
+### User Groups
+
+You can click User Groups tab to switch to the User Groups page of the Share Topic. Then you can click **Add User Groups** button to add more user groups which are active in the current organization. In the opened Add New User Groups dialog, you can select the new user groups which you want to share the topic with. Then you can set the expired time for the sharing to these user groups.
+
+
+
+
+
+## Consume Shared Topic
+
+The shared user can get all topics which the creator shared with him, when he goes to the Topic page of Data Subscription. The user can click **Sample Code** icon of each topic **Action** area to the **Sample Code** page. Then he can follow the steps of the sample code how to consume the shared topic from TDengine instance.
+
+### Configure TDengine DSN
+
+
+
+
+```shell
+export TDENGINE_CLOUD_ENDPOINT="{TDC_GATEWAY}"
+export TDENGINE_CLOUD_TOKEN="{TDC_TOKEN}"
+```
+
+
+
+
+```shell
+set TDENGINE_CLOUD_ENDPOINT="{TDC_GATEWAY}"
+set TDENGINE_CLOUD_TOKEN="{TDC_TOKEN}"
+```
+
+
+
+
+```shell
+$env:TDENGINE_CLOUD_ENDPOINT="{TDC_GATEWAY}"
+$env:TDENGINE_CLOUD_TOKEN="{TDC_TOKEN}"
+```
+
+
+
+
+### Data Schema and API
+
+The related schemas and APIs in various languages are described as follows:
+
+
+
+
+```python
+class TaosConsumer():
+ def __init__(self, *topics, **configs)
+
+ def __iter__(self)
+
+ def __next__(self)
+
+ def sync_next(self)
+
+ def subscription(self)
+
+ def unsubscribe(self)
+
+ def close(self)
+
+ def __del__(self)
+```
+
+
+
+
+
+```go
+func NewConsumer(conf *Config) (*Consumer, error)
+
+func (c *Consumer) Close() error
+
+func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
+
+func (c *Consumer) FreeMessage(message unsafe.Pointer)
+
+func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
+
+func (c *Consumer) Subscribe(topics []string) error
+
+func (c *Consumer) Unsubscribe() error
+```
+
+
+
+
+
+```rust
+impl TBuilder for TmqBuilder
+ fn from_dsn(dsn: D) -> Result
+ fn build(&self) -> Result
+
+impl AsAsyncConsumer for Consumer
+ async fn subscribe, I: IntoIterator- + Send>(
+ &mut self,
+ topics: I,
+ ) -> Result<(), Self::Error>;
+ fn stream(
+ &self,
+ ) -> Pin<
+ Box<
+ dyn '_
+ + Send
+ + futures::Stream<
+ Item = Result<(Self::Offset, MessageSet), Self::Error>,
+ >,
+ >,
+ >;
+ async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
+
+ async fn unsubscribe(self);
+```
+
+For more information, see [Crate taos](https://docs.rs/taos).
+
+
+
+
+### Create a Consumer from Instance
+
+You configure the following parameters when creating a consumer:
+
+| Parameter | Type | Description | Remarks |
+| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
+| `td.connect.ip` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td.connect.user` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td.connect.pass` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td.connect.port` | string | Used in establishing a connection; same as `taos_connect` | |
+| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
+| `client.id` | string | Client ID | Maximum length: 192. |
+| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
+| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. |
+| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
+| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | |
+| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | |
+| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages |
+
+The method of specifying these parameters depends on the language used:
+
+
+
+
+
+Python programs use the following parameters:
+
+| Parameter | Type | Description | Remarks |
+| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- |
+| `td_connect_ip` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td_connect_user` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td_connect_pass` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td_connect_port` | string | Used in establishing a connection; same as `taos_connect` | |
+| `group_id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
+| `client_id` | string | Client ID | Maximum length: 192. |
+| `auto_offset_reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
+| `enable_auto_commit` | string | Commit automatically | Specify `true` or `false`. |
+| `auto_commit_interval_ms` | string | Interval for automatic commits, in milliseconds |
+| `enable_heartbeat_background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false`. |
+| `experimental_snapshot_enable` | string | Specify whether to consume messages from the WAL or from TSBS | Specify `true` or `false`. |
+| `msg_with_table_name` | string | Specify whether to deserialize table names from messages | Specify `true` or `false`.
+| `timeout` | int | Consumer pull timeout | |
+
+
+
+
+
+```go
+import (
+ "github.com/taosdata/driver-go/v3/ws/tmq"
+)
+endpoint := os.Getenv("TDENGINE_CLOUD_ENDPOINT")
+token := os.Getenv("TDENGINE_CLOUD_TOKEN")
+tmpDSN := fmt.Sprintf("ws://%s/rest/tmq?token=%s", endpoint, token)
+config := tmq.NewConfig(tmpDSN, 0)
+defer config.Destroy()
+err = config.SetGroupID("test_group")
+if err != nil {
+ panic(err)
+}
+err = config.SetClientID("test_consumer_ws") //
+if err != nil {
+ panic(err)
+}
+err = config.EnableHeartBeat()
+if err != nil {
+ panic(err)
+}
+err = config.SetAutoOffsetReset("earliest")
+if err != nil {
+ panic(err)
+}
+```
+
+
+
+
+
+```rust
+let mut dsnURL = format!("ws://{}/rest/tmq?token={}", std::env::var("TDENGINE_CLOUD_ENDPOINT"), std::env::var("TDENGINE_CLOUD_TOKEN"));
+let mut dsn: Dsn = dsnURL.parse()?;
+dsn.set("group.id", "test_group");
+dsn.set("client.id", "test_consumer_ws");
+dsn.set("auto.offset.reset", "earliest");
+
+let tmq = TmqBuilder::from_dsn(dsn)?;
+
+let mut consumer = tmq.build()?;
+```
+
+
+
+
+
+A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.
+
+### Subscribe to a Topic
+
+A single consumer can subscribe to multiple topics.
+
+
+
+
+```python
+consumer = TaosConsumer('{TDC_TOPIC}', group_id='test_group')
+```
+
+
+
+
+```go
+consumer, err := tmq.NewConsumer(config)
+if err != nil {
+ panic(err)
+}
+err = consumer.Subscribe([]string{"{TDC_TOPIC}"})
+if err != nil {
+ panic(err)
+}
+```
+
+
+
+
+```rust
+consumer.subscribe(["{TDC_TOPIC}"]).await?;
+```
+
+
+
+
+
+## Consume messages
+
+The following code demonstrates how to consume the messages in a queue.
+
+
+
+
+```python
+for msg in consumer:
+ for row in msg:
+ print(row)
+```
+
+
+
+
+
+```go
+for {
+ result, err := consumer.Poll(time.Second)
+ if err != nil {
+ panic(err)
+ }
+ fmt.Println(result)
+ consumer.Commit(context.Background(), result.Message)
+ consumer.FreeMessage(result.Message)
+}
+```
+
+
+
+
+
+```rust
+{
+ let mut stream = consumer.stream();
+
+ while let Some((offset, message)) = stream.try_next().await? {
+ // get information from offset
+
+ // the topic
+ let topic = offset.topic();
+ // the vgroup id, like partition id in kafka.
+ let vgroup_id = offset.vgroup_id();
+ println!("* in vgroup id {vgroup_id} of topic {topic}\n");
+
+ if let Some(data) = message.into_data() {
+ while let Some(block) = data.fetch_raw_block().await? {
+ // one block for one table, get table name if needed
+ let name = block.table_name();
+ let records: Vec = block.deserialize().try_collect()?;
+ println!(
+ "** table: {}, got {} records: {:#?}\n",
+ name.unwrap(),
+ records.len(),
+ records
+ );
+ }
+ }
+ consumer.commit(offset).await?;
+ }
+}
+```
+
+
+
+
+
+## Close the consumer
+
+After message consumption is finished, the consumer is unsubscribed.
+
+
+
+
+
+```py
+# Unsubscribe
+consumer.unsubscribe()
+# Close consumer
+consumer.close()
+```
+
+
+
+
+
+```go
+consumer.Close()
+```
+
+
+
+
+
+```rust
+consumer.unsubscribe().await;
+```
+
+
+
+
+
+### Sample Code
+
+The following are full sample codes about how to consume the shared topic:
+
+
+
+
+
+```python
+{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}}
+```
+
+
+
+
+
+```go
+{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}}
+```
+
+
+
+
+
+```rust
+{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}}
+```
+
+
+
+
diff --git a/docs/en/11-data-sharing/share/share-topic-adduser.webp b/docs/en/11-data-sharing/share/share-topic-adduser.webp
new file mode 100644
index 0000000000000000000000000000000000000000..82b39292af2d8bcf189cf702436aa2b565ca130f
Binary files /dev/null and b/docs/en/11-data-sharing/share/share-topic-adduser.webp differ
diff --git a/docs/en/11-data-sharing/share/share-topic-addusergroup.webp b/docs/en/11-data-sharing/share/share-topic-addusergroup.webp
new file mode 100644
index 0000000000000000000000000000000000000000..93747d41792555ace27813d8c64256ce36353dde
Binary files /dev/null and b/docs/en/11-data-sharing/share/share-topic-addusergroup.webp differ
diff --git a/docs/en/11-data-sharing/share/share-topic-usergroup.webp b/docs/en/11-data-sharing/share/share-topic-usergroup.webp
new file mode 100644
index 0000000000000000000000000000000000000000..1c7c5d2d6bf6635c589bf1af47eadd8be32d414b
Binary files /dev/null and b/docs/en/11-data-sharing/share/share-topic-usergroup.webp differ
diff --git a/docs/en/11-data-sharing/share/share-topic-users.webp b/docs/en/11-data-sharing/share/share-topic-users.webp
new file mode 100644
index 0000000000000000000000000000000000000000..ae80849cdebe05bdc19b27276a91ab339193c487
Binary files /dev/null and b/docs/en/11-data-sharing/share/share-topic-users.webp differ
diff --git a/docs/en/11-data-sharing/topic/add-topic-db.webp b/docs/en/11-data-sharing/topic/add-topic-db.webp
new file mode 100644
index 0000000000000000000000000000000000000000..1aee40bd35e42c946a3f9e659a1370f71f5b8c5e
Binary files /dev/null and b/docs/en/11-data-sharing/topic/add-topic-db.webp differ
diff --git a/docs/en/11-data-sharing/topic/add-topic-stable.webp b/docs/en/11-data-sharing/topic/add-topic-stable.webp
new file mode 100644
index 0000000000000000000000000000000000000000..1cc472439f4cddd68e7a1a3b97dab68216c3b9ec
Binary files /dev/null and b/docs/en/11-data-sharing/topic/add-topic-stable.webp differ
diff --git a/docs/en/11-data-sharing/topic/add-topic-sub-stable.webp b/docs/en/11-data-sharing/topic/add-topic-sub-stable.webp
new file mode 100644
index 0000000000000000000000000000000000000000..1d68c5fb8e972f9916119ed81df2e3b47871ef00
Binary files /dev/null and b/docs/en/11-data-sharing/topic/add-topic-sub-stable.webp differ
diff --git a/docs/en/11-data-sharing/topic/add-topic-sub-table.webp b/docs/en/11-data-sharing/topic/add-topic-sub-table.webp
new file mode 100644
index 0000000000000000000000000000000000000000..526bb86711b315c88a86b74511c38457f6efc314
Binary files /dev/null and b/docs/en/11-data-sharing/topic/add-topic-sub-table.webp differ
diff --git a/docs/en/11-tmq.md b/docs/en/11-tmq.md
index 8208925fea1ee1b219d1738ff607bddece968f3d..17b3f5caa062eaacb4216b7153e899040e702cc1 100644
--- a/docs/en/11-tmq.md
+++ b/docs/en/11-tmq.md
@@ -1,44 +1,262 @@
---
-sidebar_label: Subscription
-title: Data Subscritpion
-description: Use data subscription to get data from TDengine.
+sidebar_label: Data Subscription
+description: "The TDengine data subscription service automatically pushes data written in TDengine to subscribing clients."
+title: Data Subscription
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
+import Java from "./_sub_java.mdx";
+import Python from "./_sub_python.mdx";
+import Go from "./_sub_go.mdx";
+import Rust from "./_sub_rust.mdx";
+import Node from "./_sub_node.mdx";
+import CSharp from "./_sub_cs.mdx";
+import CDemo from "./_sub_c.mdx";
-This topic introduces how to read out data from TDengine using data subscription, which is an advanced feature in TDengine. To access the data in TDengine in data subscription way, you need to create topic, create consumer, subscribe to a topic, and consume data. In this document we will briefly explain these main steps of data subscription.
+TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka.
-## Create Topic
+To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications.
-A topic can be created on a database, on some selected columns,or on a supertable.
+By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart.
-### Topic on Columns
+To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.
-The most common way to create a topic is to create a topic on some specifically selected columns. The Syntax is like below:
-```sql
-CREATE TOPIC topic_name as subquery;
+
+## Data Schema and API
+
+The related schemas and APIs in various languages are described as follows:
+
+
+
+
+```c
+typedef struct tmq_t tmq_t;
+typedef struct tmq_conf_t tmq_conf_t;
+typedef struct tmq_list_t tmq_list_t;
+
+typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
+
+DLL_EXPORT tmq_list_t *tmq_list_new();
+DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
+DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
+DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
+DLL_EXPORT const char *tmq_err2str(int32_t code);
+
+DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
+DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
+DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
+DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
+DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
+DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
+
+enum tmq_conf_res_t {
+ TMQ_CONF_UNKNOWN = -2,
+ TMQ_CONF_INVALID = -1,
+ TMQ_CONF_OK = 0,
+};
+typedef enum tmq_conf_res_t tmq_conf_res_t;
+
+DLL_EXPORT tmq_conf_t *tmq_conf_new();
+DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
+DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
+DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
```
-You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as `SELECT *` and `SELECT ts, cl` are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note:
+For more information, see [C/C++ Connector](/reference/connector/cpp).
-- The schema of topics created in this manner is determined by the subscribed data.
-- You cannot modify (`ALTER MODIFY`) or delete (`ALTER DROP`) columns or tags that are used in a subscription or calculation.
-- Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error.
+The following example is based on the smart meter table described in Data Models. For complete sample code, see the C language section below.
+
+
+
+
+```java
+void subscribe(Collection topics) throws SQLException;
+
+void unsubscribe() throws SQLException;
+
+Set subscription() throws SQLException;
+
+ConsumerRecords poll(Duration timeout) throws SQLException;
+
+void commitAsync();
+
+void commitAsync(OffsetCommitCallback callback);
+
+void commitSync() throws SQLException;
+
+void close() throws SQLException;
+```
+
+
+
+
+
+```python
+class TaosConsumer():
+ def __init__(self, *topics, **configs)
+
+ def __iter__(self)
+
+ def __next__(self)
+
+ def sync_next(self)
+
+ def subscription(self)
+
+ def unsubscribe(self)
+
+ def close(self)
+
+ def __del__(self)
+```
+
+
+
+
+
+```go
+func NewConsumer(conf *Config) (*Consumer, error)
+
+func (c *Consumer) Close() error
+
+func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
+
+func (c *Consumer) FreeMessage(message unsafe.Pointer)
+
+func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
+
+func (c *Consumer) Subscribe(topics []string) error
+
+func (c *Consumer) Unsubscribe() error
+```
+
+
+
+
+
+```rust
+impl TBuilder for TmqBuilder
+ fn from_dsn(dsn: D) -> Result
+ fn build(&self) -> Result
+
+impl AsAsyncConsumer for Consumer
+ async fn subscribe, I: IntoIterator- + Send>(
+ &mut self,
+ topics: I,
+ ) -> Result<(), Self::Error>;
+ fn stream(
+ &self,
+ ) -> Pin<
+ Box<
+ dyn '_
+ + Send
+ + futures::Stream<
+ Item = Result<(Self::Offset, MessageSet), Self::Error>,
+ >,
+ >,
+ >;
+ async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
+
+ async fn unsubscribe(self);
+```
+
+For more information, see [Crate taos](https://docs.rs/taos).
+
+
+
+
+
+```js
+function TMQConsumer(config)
+
+function subscribe(topic)
+
+function consume(timeout)
+
+function subscription()
+
+function unsubscribe()
+
+function commit(msg)
+
+function close()
+```
+
+
+
+
-For example:
+```csharp
+ConsumerBuilder(IEnumerable> config)
+
+virtual IConsumer Build()
+
+Consumer(ConsumerBuilder builder)
+
+void Subscribe(IEnumerable topics)
+
+void Subscribe(string topic)
+
+ConsumeResult Consume(int millisecondsTimeout)
+
+List Subscription()
+
+void Unsubscribe()
+
+void Commit(ConsumeResult consumerResult)
+
+void Close()
+```
+
+
+
+
+## Insert Data into TDengine
+
+A database including one supertable and two subtables is created as follows:
+
+```sql
+DROP DATABASE IF EXISTS tmqdb;
+CREATE DATABASE tmqdb;
+CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
+CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
+CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
+INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
+INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
+```
+
+## Create a Topic
+
+The following SQL statement creates a topic in TDengine:
```sql
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```
-### Topic on SuperTable
+Multiple subscription types are supported.
+
+#### Subscribe to a Column
Syntax:
```sql
-CREATE TOPIC topic_name AS STABLE stb_name;
+CREATE TOPIC topic_name as subquery
+```
+
+You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as `SELECT *` and `SELECT ts, cl` are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note:
+
+- The schema of topics created in this manner is determined by the subscribed data.
+- You cannot modify (`ALTER MODIFY`) or delete (`ALTER DROP`) columns or tags that are used in a subscription or calculation.
+- Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error.
+
+### Subscribe to a Supertable
+
+Syntax:
+
+```sql
+CREATE TOPIC topic_name AS STABLE stb_name
```
Creating a topic in this manner differs from a `SELECT * from stbName` statement as follows:
@@ -48,7 +266,7 @@ Creating a topic in this manner differs from a `SELECT * from stbName` statement
- A different table schema may exist for every data block to be processed.
- The data returned does not include tags.
-### Topic on Database
+### Subscribe to a Database
Syntax:
@@ -58,93 +276,516 @@ CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
This SQL statement creates a subscription to all tables in the database. You can add the `WITH META` parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka.
-## Programming Model
+## Create a Consumer
-To subscribe the data from a created topic, the client program needs to follow the programming model described in this section.
+You configure the following parameters when creating a consumer:
-1. Create Consumer
+| Parameter | Type | Description | Remarks |
+| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
+| `td.connect.ip` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td.connect.user` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td.connect.pass` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td.connect.port` | string | Used in establishing a connection; same as `taos_connect` | |
+| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
+| `client.id` | string | Client ID | Maximum length: 192. |
+| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
+| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. |
+| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
+| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | |
+| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | |
+| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages |
-To create a consumer, you must use the APIs provided by TDengine connectors. Below is the sample code of using connectors of different languages.
+The method of specifying these parameters depends on the language used:
-2. Subscribe to a Topic
+
+
-A single consumer can subscribe to multiple topics.
+```c
+/* Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
+ an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) */
+tmq_conf_t* conf = tmq_conf_new();
+tmq_conf_set(conf, "enable.auto.commit", "true");
+tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
+tmq_conf_set(conf, "group.id", "cgrpName");
+tmq_conf_set(conf, "td.connect.user", "root");
+tmq_conf_set(conf, "td.connect.pass", "taosdata");
+tmq_conf_set(conf, "auto.offset.reset", "earliest");
+tmq_conf_set(conf, "experimental.snapshot.enable", "true");
+tmq_conf_set(conf, "msg.with.table.name", "true");
+tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
+
+tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
+tmq_conf_destroy(conf);
+```
-3. Consume messages
+
+
-4. Subscribe to a Topic
+Java programs use the following parameters:
-A single consumer can subscribe to multiple topics.
+| Parameter | Type | Description | Remarks |
+| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
+| `bootstrap.servers` | string |Connection address, such as `localhost:6030` |
+| `value.deserializer` | string | Value deserializer; to use this method, implement the `com.taosdata.jdbc.tmq.Deserializer` interface or inherit the `com.taosdata.jdbc.tmq.ReferenceDeserializer` type |
+| `value.deserializer.encoding` | string | Specify the encoding for string deserialization | |
-5. Consume Data
+Note: The `bootstrap.servers` parameter is used instead of `td.connect.ip` and `td.connect.port` to provide an interface that is consistent with Kafka.
-6. Close the consumer
+```java
+Properties properties = new Properties();
+properties.setProperty("enable.auto.commit", "true");
+properties.setProperty("auto.commit.interval.ms", "1000");
+properties.setProperty("group.id", "cgrpName");
+properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
+properties.setProperty("td.connect.user", "root");
+properties.setProperty("td.connect.pass", "taosdata");
+properties.setProperty("auto.offset.reset", "earliest");
+properties.setProperty("msg.with.table.name", "true");
+properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
-After message consumption is finished, the consumer is unsubscribed.
+TaosConsumer consumer = new TaosConsumer<>(properties);
+
+/* value deserializer definition. */
+import com.taosdata.jdbc.tmq.ReferenceDeserializer;
+
+public class MetersDeserializer extends ReferenceDeserializer {
+}
+```
+
+
+
+
+
+```go
+config := tmq.NewConfig()
+defer config.Destroy()
+err = config.SetGroupID("test")
+if err != nil {
+ panic(err)
+}
+err = config.SetAutoOffsetReset("earliest")
+if err != nil {
+ panic(err)
+}
+err = config.SetConnectIP("127.0.0.1")
+if err != nil {
+ panic(err)
+}
+err = config.SetConnectUser("root")
+if err != nil {
+ panic(err)
+}
+err = config.SetConnectPass("taosdata")
+if err != nil {
+ panic(err)
+}
+err = config.SetConnectPort("6030")
+if err != nil {
+ panic(err)
+}
+err = config.SetMsgWithTableName(true)
+if err != nil {
+ panic(err)
+}
+err = config.EnableHeartBeat()
+if err != nil {
+ panic(err)
+}
+err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
+ if result.ErrCode != 0 {
+ errStr := wrapper.TMQErr2Str(result.ErrCode)
+ err := errors.NewError(int(result.ErrCode), errStr)
+ panic(err)
+ }
+})
+if err != nil {
+ panic(err)
+}
+```
+
+
+
+
+
+```rust
+let mut dsn: Dsn = "taos://".parse()?;
+dsn.set("group.id", "group1");
+dsn.set("client.id", "test");
+dsn.set("auto.offset.reset", "earliest");
+
+let tmq = TmqBuilder::from_dsn(dsn)?;
+
+let mut consumer = tmq.build()?;
+```
+
+
+
+
+
+Python programs use the following parameters:
+
+| Parameter | Type | Description | Remarks |
+| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- |
+| `td_connect_ip` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td_connect_user` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td_connect_pass` | string | Used in establishing a connection; same as `taos_connect` | |
+| `td_connect_port` | string | Used in establishing a connection; same as `taos_connect` | |
+| `group_id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
+| `client_id` | string | Client ID | Maximum length: 192. |
+| `auto_offset_reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
+| `enable_auto_commit` | string | Commit automatically | Specify `true` or `false`. |
+| `auto_commit_interval_ms` | string | Interval for automatic commits, in milliseconds |
+| `enable_heartbeat_background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false`. |
+| `experimental_snapshot_enable` | string | Specify whether to consume messages from the WAL or from TSBS | Specify `true` or `false`. |
+| `msg_with_table_name` | string | Specify whether to deserialize table names from messages | Specify `true` or `false`.
+| `timeout` | int | Consumer pull timeout | |
+
+
+
+
+
+```js
+// Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
+// an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass)
+
+let consumer = taos.consumer({
+ 'enable.auto.commit': 'true',
+ 'auto.commit.interval.ms','1000',
+ 'group.id': 'tg2',
+ 'td.connect.user': 'root',
+ 'td.connect.pass': 'taosdata',
+ 'auto.offset.reset','earliest',
+ 'msg.with.table.name': 'true',
+ 'td.connect.ip','127.0.0.1',
+ 'td.connect.port','6030'
+ });
+```
-## Sample Code
+
+
+
+
+```csharp
+using TDengineTMQ;
+
+// Create consumer groups on demand (GourpID) and enable automatic commits (EnableAutoCommit),
+// an automatic commit interval (AutoCommitIntervalMs), and a username (TDConnectUser) and password (TDConnectPasswd)
+var cfg = new ConsumerConfig
+ {
+ EnableAutoCommit = "true"
+ AutoCommitIntervalMs = "1000"
+ GourpId = "TDengine-TMQ-C#",
+ TDConnectUser = "root",
+ TDConnectPasswd = "taosdata",
+ AutoOffsetReset = "earliest"
+ MsgWithTableName = "true",
+ TDConnectIp = "127.0.0.1",
+ TDConnectPort = "6030"
+ };
+
+var consumer = new ConsumerBuilder(cfg).Build();
+
+```
+
+
+
+
+
+A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.
-
+## Subscribe to a Topic
+A single consumer can subscribe to multiple topics.
+
+
-Will be available soon
+```c
+// Create a list of subscribed topics
+tmq_list_t* topicList = tmq_list_new();
+tmq_list_append(topicList, "topicName");
+// Enable subscription
+tmq_subscribe(tmq, topicList);
+tmq_list_destroy(topicList);
+
+```
-Will be available soon
+```java
+List topics = new ArrayList<>();
+topics.add("tmq_topic");
+consumer.subscribe(topics);
+```
+
+
+```go
+consumer, err := tmq.NewConsumer(config)
+if err != nil {
+ panic(err)
+}
+err = consumer.Subscribe([]string{"example_tmq_topic"})
+if err != nil {
+ panic(err)
+}
+```
-
+
+
-Will be available soon
+```rust
+consumer.subscribe(["tmq_meters"]).await?;
+```
-
+
-```rust
-{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}}
+```python
+consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
```
+
+
+```js
+// Create a list of subscribed topics
+let topics = ['topic_test']
+
+// Enable subscription
+consumer.subscribe(topics);
+```
+
+
+
+
+
+```csharp
+// Create a list of subscribed topics
+List topics = new List();
+topics.add("tmq_topic");
+// Enable subscription
+consumer.Subscribe(topics);
+```
+
+
+
+
+
+## Consume messages
+
+The following code demonstrates how to consume the messages in a queue.
+
+
+
+
+```c
+## Consume data
+while (running) {
+ TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
+ msg_process(msg);
+}
+```
+
+The `while` loop obtains a message each time it calls `tmq_consumer_poll()`. This message is exactly the same as the result returned by a query, and the same deserialization API can be used on it.
+
+
+
+
+```java
+while(running){
+ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
+ for (Meters meter : meters) {
+ processMsg(meter);
+ }
+}
+```
+
+
+
+
+
+```go
+for {
+ result, err := consumer.Poll(time.Second)
+ if err != nil {
+ panic(err)
+ }
+ fmt.Println(result)
+ consumer.Commit(context.Background(), result.Message)
+ consumer.FreeMessage(result.Message)
+}
+```
+
+
+
+
+
+```rust
+{
+ let mut stream = consumer.stream();
+
+ while let Some((offset, message)) = stream.try_next().await? {
+ // get information from offset
+
+ // the topic
+ let topic = offset.topic();
+ // the vgroup id, like partition id in kafka.
+ let vgroup_id = offset.vgroup_id();
+ println!("* in vgroup id {vgroup_id} of topic {topic}\n");
+
+ if let Some(data) = message.into_data() {
+ while let Some(block) = data.fetch_raw_block().await? {
+ // one block for one table, get table name if needed
+ let name = block.table_name();
+ let records: Vec = block.deserialize().try_collect()?;
+ println!(
+ "** table: {}, got {} records: {:#?}\n",
+ name.unwrap(),
+ records.len(),
+ records
+ );
+ }
+ }
+ consumer.commit(offset).await?;
+ }
+}
+```
+
+
-Will be available soon
+```python
+for msg in consumer:
+ for row in msg:
+ print(row)
+```
-Will be available soon
+```js
+while(true){
+ msg = consumer.consume(200);
+ // process message(consumeResult)
+ console.log(msg.topicPartition);
+ console.log(msg.block);
+ console.log(msg.fields)
+}
+```
-Will be available soon
+```csharp
+## Consume data
+while (true)
+{
+ var consumerRes = consumer.Consume(100);
+ // process ConsumeResult
+ ProcessMsg(consumerRes);
+ consumer.Commit(consumerRes);
+}
+```
-## Delete Topic
+## Close the consumer
+
+After message consumption is finished, the consumer is unsubscribed.
+
+
+
-Once a topic becomes useless, it can be deleted.
+```c
+/* Unsubscribe */
+tmq_unsubscribe(tmq);
+
+/* Close consumer object */
+tmq_consumer_close(tmq);
+```
+
+
+
+
+```java
+/* Unsubscribe */
+consumer.unsubscribe();
+
+/* Close consumer */
+consumer.close();
+```
+
+
+
+
+
+```go
+consumer.Close()
+```
+
+
+
+
+
+```rust
+consumer.unsubscribe().await;
+```
+
+
+
+
+
+```py
+# Unsubscribe
+consumer.unsubscribe()
+# Close consumer
+consumer.close()
+```
+
+
+
+
+```js
+consumer.unsubscribe();
+consumer.close();
+```
+
+
+
+
+
+```csharp
+// Unsubscribe
+consumer.Unsubscribe();
+
+// Close consumer
+consumer.Close();
+```
+
+
+
+
+
+## Delete a Topic
You can delete topics that are no longer useful. Note that you must unsubscribe all consumers from a topic before deleting it.
```sql
+/* Delete topic/
DROP TOPIC topic_name;
```
## Check Status
-At any time, you can check the status of existing topics and consumers.
-
1. Query all existing topics.
```sql
@@ -156,3 +797,45 @@ SHOW TOPICS;
```sql
SHOW CONSUMERS;
```
+
+3. Query the relationships between consumers and vgroups.
+
+```sql
+SHOW SUBSCRIPTIONS;
+```
+
+## Examples
+
+The following section shows sample code in various languages.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/en/_sub_c.mdx b/docs/en/_sub_c.mdx
new file mode 100644
index 0000000000000000000000000000000000000000..b0667268e9978533e84e68ea3fe5f285538df762
--- /dev/null
+++ b/docs/en/_sub_c.mdx
@@ -0,0 +1,3 @@
+```c
+{{#include docs/examples/c/tmq_example.c}}
+```
diff --git a/docs/en/_sub_cs.mdx b/docs/en/_sub_cs.mdx
new file mode 100644
index 0000000000000000000000000000000000000000..093b617e9bb9c7da7bc9392f91316b9f3342bae6
--- /dev/null
+++ b/docs/en/_sub_cs.mdx
@@ -0,0 +1,3 @@
+```csharp
+{{#include docs/examples/csharp/subscribe/Program.cs}}
+```
\ No newline at end of file
diff --git a/docs/en/_sub_go.mdx b/docs/en/_sub_go.mdx
new file mode 100644
index 0000000000000000000000000000000000000000..34b2aefd92c5eef75b59fbbba96b83da091722a7
--- /dev/null
+++ b/docs/en/_sub_go.mdx
@@ -0,0 +1,3 @@
+```go
+{{#include docs/examples/go/sub/main.go}}
+```
\ No newline at end of file
diff --git a/docs/en/_sub_java.mdx b/docs/en/_sub_java.mdx
new file mode 100644
index 0000000000000000000000000000000000000000..d14b5fd6095dd90f89dd2c2e828858585cfddff9
--- /dev/null
+++ b/docs/en/_sub_java.mdx
@@ -0,0 +1,11 @@
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
+{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
+{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
+```
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
+```
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
+```
\ No newline at end of file
diff --git a/docs/en/_sub_node.mdx b/docs/en/_sub_node.mdx
new file mode 100644
index 0000000000000000000000000000000000000000..3eeff0922a31a478dd34a77c6cb6471f51a57a8c
--- /dev/null
+++ b/docs/en/_sub_node.mdx
@@ -0,0 +1,3 @@
+```js
+{{#include docs/examples/node/nativeexample/subscribe_demo.js}}
+```
\ No newline at end of file
diff --git a/docs/en/_sub_python.mdx b/docs/en/_sub_python.mdx
new file mode 100644
index 0000000000000000000000000000000000000000..1309da5b416799492a6b85aae4b775e227c0ad6e
--- /dev/null
+++ b/docs/en/_sub_python.mdx
@@ -0,0 +1,3 @@
+```py
+{{#include docs/examples/python/tmq_example.py}}
+```
diff --git a/docs/en/_sub_rust.mdx b/docs/en/_sub_rust.mdx
new file mode 100644
index 0000000000000000000000000000000000000000..0021666a7024a9b63d6b9c38bf8a57b6eded6d66
--- /dev/null
+++ b/docs/en/_sub_rust.mdx
@@ -0,0 +1,3 @@
+```rust
+{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}}
+```
diff --git a/docs/examples/csharp/subscribe/Program.cs b/docs/examples/csharp/subscribe/Program.cs
new file mode 100644
index 0000000000000000000000000000000000000000..1fba209f22740e4efe5efb6996902159b2809035
--- /dev/null
+++ b/docs/examples/csharp/subscribe/Program.cs
@@ -0,0 +1,96 @@
+using System;
+using TDengineTMQ;
+using TDengineDriver;
+using System.Runtime.InteropServices;
+
+namespace TMQExample
+{
+ internal class SubscribeDemo
+ {
+ static void Main(string[] args)
+ {
+ IntPtr conn = GetConnection();
+ string topic = "topic_example";
+ //create topic
+ IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from meters");
+
+ if (TDengine.ErrorNo(res) != 0 )
+ {
+ throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
+ }
+
+ var cfg = new ConsumerConfig
+ {
+ GourpId = "group_1",
+ TDConnectUser = "root",
+ TDConnectPasswd = "taosdata",
+ MsgWithTableName = "true",
+ TDConnectIp = "127.0.0.1",
+ };
+
+ // create consumer
+ var consumer = new ConsumerBuilder(cfg)
+ .Build();
+
+ // subscribe
+ consumer.Subscribe(topic);
+
+ // consume
+ for (int i = 0; i < 5; i++)
+ {
+ var consumeRes = consumer.Consume(300);
+ // print consumeResult
+ foreach (KeyValuePair kv in consumeRes.Message)
+ {
+ Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
+
+ kv.Value.Metas.ForEach(meta =>
+ {
+ Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
+ });
+ Console.WriteLine("");
+ kv.Value.Datas.ForEach(data =>
+ {
+ Console.WriteLine(data.ToString());
+ });
+ }
+
+ consumer.Commit(consumeRes);
+ Console.WriteLine("\n================ {0} done ", i);
+
+ }
+
+ // retrieve topic list
+ List topics = consumer.Subscription();
+ topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
+
+ // unsubscribe
+ consumer.Unsubscribe();
+
+ // close consumer after use.Otherwise will lead memory leak.
+ consumer.Close();
+ TDengine.Close(conn);
+
+ }
+
+ static IntPtr GetConnection()
+ {
+ string host = "localhost";
+ short port = 6030;
+ string username = "root";
+ string password = "taosdata";
+ string dbname = "power";
+ var conn = TDengine.Connect(host, username, password, dbname, port);
+ if (conn == IntPtr.Zero)
+ {
+ throw new Exception("Connect to TDengine failed");
+ }
+ else
+ {
+ Console.WriteLine("Connect to TDengine success");
+ }
+ return conn;
+ }
+ }
+
+}
diff --git a/docs/examples/csharp/subscribe/subscribe.csproj b/docs/examples/csharp/subscribe/subscribe.csproj
new file mode 100644
index 0000000000000000000000000000000000000000..8ae1cf6bc6023558c28797a0d9fcccb2f2e87653
--- /dev/null
+++ b/docs/examples/csharp/subscribe/subscribe.csproj
@@ -0,0 +1,15 @@
+
+
+
+ Exe
+ net6.0
+ enable
+ enable
+ TMQExample.SubscribeDemo
+
+
+
+
+
+
+
diff --git a/docs/examples/python/tmq_example.py b/docs/examples/python/tmq_example.py
index cee036454ec4d3f4809576a1eee8ac054fcba056..a4625ca11accfbf7d263f4c1993f712987a136cb 100644
--- a/docs/examples/python/tmq_example.py
+++ b/docs/examples/python/tmq_example.py
@@ -1,6 +1,58 @@
import taos
-from taos.tmq import TaosConsumer
-consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
-for msg in consumer:
- for row in msg:
- print(row)
+from taos.tmq import *
+
+conn = taos.connect()
+
+print("init")
+conn.execute("drop topic if exists topic_ctb_column")
+conn.execute("drop database if exists py_tmq")
+conn.execute("create database if not exists py_tmq vgroups 2")
+conn.select_db("py_tmq")
+conn.execute(
+ "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
+)
+conn.execute("create table if not exists tb1 using stb1 tags(1)")
+conn.execute("create table if not exists tb2 using stb1 tags(2)")
+conn.execute("create table if not exists tb3 using stb1 tags(3)")
+
+print("create topic")
+conn.execute(
+ "create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1"
+)
+
+print("build consumer")
+conf = TaosTmqConf()
+conf.set("group.id", "tg2")
+conf.set("td.connect.user", "root")
+conf.set("td.connect.pass", "taosdata")
+conf.set("enable.auto.commit", "true")
+
+
+def tmq_commit_cb_print(tmq, resp, offset, param=None):
+ print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
+
+
+conf.set_auto_commit_cb(tmq_commit_cb_print, None)
+tmq = conf.new_consumer()
+
+print("build topic list")
+
+topic_list = TaosTmqList()
+topic_list.append("topic_ctb_column")
+
+print("basic consume loop")
+tmq.subscribe(topic_list)
+
+sub_list = tmq.subscription()
+
+print("subscribed topics: ", sub_list)
+
+while 1:
+ res = tmq.poll(1000)
+ if res:
+ topic = res.get_topic_name()
+ vg = res.get_vgroup_id()
+ db = res.get_db_name()
+ print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
+ for row in res:
+ print(row)
diff --git a/docs/examples/rust/cloud-example/examples/subscribe_demo.rs b/docs/examples/rust/cloud-example/examples/subscribe_demo.rs
index 14498f5cbb0f3398acaba6e8ae2d4c6aa67f6098..7551ad46b139f70c3e966fc3bfaf8e6cb58b17f8 100644
--- a/docs/examples/rust/cloud-example/examples/subscribe_demo.rs
+++ b/docs/examples/rust/cloud-example/examples/subscribe_demo.rs
@@ -18,15 +18,14 @@ struct Record {
async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([
- "use tmq",
// create child table
- "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
+ "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
- "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
+ "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
@@ -36,20 +35,22 @@ async fn prepare(taos: Taos) -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
- std::env::set_var("RUST_LOG", "debug");
- pretty_env_logger::init();
- let dsn = std::env::var("TDENGINE_CLOUD_DSN")?;
-
- let builder = TaosBuilder::from_dsn(&dsn)?;
+ let dsn = "taos://localhost:6030";
+ let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
+ let db = "tmq";
// prepare database
taos.exec_many([
- "DROP TOPIC IF EXISTS tmq_meters",
- "USE tmq",
- "CREATE STABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))",
- "CREATE TOPIC tmq_meters with META AS DATABASE tmq"
+ format!("DROP TOPIC IF EXISTS tmq_meters"),
+ format!("DROP DATABASE IF EXISTS `{db}`"),
+ format!("CREATE DATABASE `{db}`"),
+ format!("USE `{db}`"),
+ // create super table
+ format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
+ // create topic for subscription
+ format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`")
])
.await?;
@@ -58,21 +59,14 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(Duration::from_secs(1)).await;
// subscribe
- let dsn2 = format!("{dsn}&group.id=test");
- dbg!(&dsn2);
- let tmq = TmqBuilder::from_dsn(dsn2)?;
+ let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?;
- println!("start subscription");
-
- {
- let mut stream = consumer.stream();
-
- while let Some((offset, message)) = stream.try_next().await? {
- // get information from offset
- // the topic
+ consumer
+ .stream()
+ .try_for_each(|(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
@@ -80,20 +74,14 @@ async fn main() -> anyhow::Result<()> {
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
- // one block for one table, get table name if needed
- let name = block.table_name();
let records: Vec = block.deserialize().try_collect()?;
- println!(
- "** table: {}, got {} records: {:#?}\n",
- name.unwrap(),
- records.len(),
- records
- );
+ println!("** read {} records: {:#?}\n", records.len(), records);
}
}
consumer.commit(offset).await?;
- }
- }
+ Ok(())
+ })
+ .await?;
consumer.unsubscribe().await;
diff --git a/tools/taosadapter b/tools/taosadapter
new file mode 160000
index 0000000000000000000000000000000000000000..566540d4a7f59d859378ff4ae7bb64ed4f0bc096
--- /dev/null
+++ b/tools/taosadapter
@@ -0,0 +1 @@
+Subproject commit 566540d4a7f59d859378ff4ae7bb64ed4f0bc096
diff --git a/tools/taosws-rs b/tools/taosws-rs
new file mode 160000
index 0000000000000000000000000000000000000000..f406d516dfab06979ca02e4f0abfc4b924264a1d
--- /dev/null
+++ b/tools/taosws-rs
@@ -0,0 +1 @@
+Subproject commit f406d516dfab06979ca02e4f0abfc4b924264a1d