提交 8baf3ae0 编写于 作者: L Li Ya Qiang

add the data sharing doc

上级 4f580681
cJson @ d348621c
Subproject commit d348621ca93571343a56862df7de4ff3bc9b5667
libuv @ 0c1fa696
Subproject commit 0c1fa696aa502eb749c2c4735005f41ba00a27b8
lz4 @ d4437184
Subproject commit d44371841a2f1728a3f36839fd4b7e872d0927d3
zlib @ cacf7f1d
Subproject commit cacf7f1d4e3d44d871b605da3b647f07d718623f
Subproject commit ea6b3e973b477b8061e0076bb257dbd7f3faa756
...@@ -17,17 +17,19 @@ This is the documentation structure for TDengine Cloud. ...@@ -17,17 +17,19 @@ This is the documentation structure for TDengine Cloud.
5. The [Visualization](./visual) section shows you how you can visualize the data that you store in TDengine, as well as how you can visualize and monitor the status of your TDengine Cloud instance(s) and databases. 5. The [Visualization](./visual) section shows you how you can visualize the data that you store in TDengine, as well as how you can visualize and monitor the status of your TDengine Cloud instance(s) and databases.
6. Data [Subscription](./tmq) is an advanced and useful feature of TDengine. It is similar to asynchronous publish/subscribe where a message published to a topic is immediately received by all subscribers to that topic. TDengine Subscriptions allow you to create event driven applications without having to install an external pub/sub messaging system. 6. [Data Sharing](./data-sharing) is an advanced and useful feature of TDengine Cloud. In this section, we provide an easy way to share your data in the TDengine Cloud to others through only simple click operations.
7. [Stream Processing](./stream) is another extremely useful feature of TDengine Cloud that obviates the need to install external stream processing systems like Kafka or Flink. TDengine's Stream Processing feature allows you to process incoming data streams in real time and push data to tables based on rules that you can define easily. 7. Data [Subscription](./tmq) is an advanced and useful feature of TDengine. It is similar to asynchronous publish/subscribe where a message published to a topic is immediately received by all subscribers to that topic. TDengine Subscriptions allow you to create event driven applications without having to install an external pub/sub messaging system.
8. TDengine provides sophisticated [Data Replication](./replication) features. You can replicate from Cloud to a private instance and vice versa. You can replicate between Cloud providers regardless of region and you can also replicate between edge instances and Cloud or edge instances and private centralized instances. 8. [Stream Processing](./stream) is another extremely useful feature of TDengine Cloud that obviates the need to install external stream processing systems like Kafka or Flink. TDengine's Stream Processing feature allows you to process incoming data streams in real time and push data to tables based on rules that you can define easily.
9. The [Developer Guide](./programming) is a must read if you are developing IoT or Big Data applications for time series data. In this section we introduce the database connection, data modeling, data ingestion, query, stream processing, cache, data subscription, user-defined functions (coming soon), and other functionality in detail. Sample code is provided for a variety of programming languages. In most cases, you can just copy and paste the sample code, make a few changes to accommodate your application, and it will work. 9. TDengine provides sophisticated [Data Replication](./replication) features. You can replicate from Cloud to a private instance and vice versa. You can replicate between Cloud providers regardless of region and you can also replicate between edge instances and Cloud or edge instances and private centralized instances.
10. The [TDengine SQL](./taos-sql) section provides comprehensive information about both standard SQL as well as TDengine's extensions for easy time series analysis. 10. The [Developer Guide](./programming) is a must read if you are developing IoT or Big Data applications for time series data. In this section we introduce the database connection, data modeling, data ingestion, query, stream processing, cache, data subscription, user-defined functions (coming soon), and other functionality in detail. Sample code is provided for a variety of programming languages. In most cases, you can just copy and paste the sample code, make a few changes to accommodate your application, and it will work.
11. The [Tools](./tools) section introduces the Taos CLI which gives you shell access to easily perform ad hoc queries on your instances and databases. Additionally, taosBenchmark is introduced. It is a tool that can help you generate large amounts of data very easily with simple configurations and test the performance of TDengine Cloud. 11. The [TDengine SQL](./taos-sql) section provides comprehensive information about both standard SQL as well as TDengine's extensions for easy time series analysis.
12. The [Tools](./tools) section introduces the Taos CLI which gives you shell access to easily perform ad hoc queries on your instances and databases. Additionally, taosBenchmark is introduced. It is a tool that can help you generate large amounts of data very easily with simple configurations and test the performance of TDengine Cloud.
<!-- 10. Finally, in the [FAQ](./faq) section, we try to preemptively answer questions that we anticipate. Of course, we will continue to add to this section all the time. --> <!-- 10. Finally, in the [FAQ](./faq) section, we try to preemptively answer questions that we anticipate. Of course, we will continue to add to this section all the time. -->
......
...@@ -3,7 +3,7 @@ sidebar_label: Introduction ...@@ -3,7 +3,7 @@ sidebar_label: Introduction
title: Introduction to TDengine Cloud Service title: Introduction to TDengine Cloud Service
--- ---
TDengine Cloud, is the fast, elastic, serverless and cost effective time-series data processing service based on the popular open source time-series database, TDengine. With TDengine Cloud you get the highly optimized and purpose-built for IoT time-series platform, for which TDengine is known. TDengine Cloud, is the fast, elastic, serverless and cost effective time-series data processing service based on the popular open source time-series database, TDengine. With TDengine Cloud you get the highly optimized and purpose-built for IoT time-series platform, for which TDengine is known.
This section introduces the major features, competitive advantages and typical use-cases to help you get a high level overview of TDengine cloud service. This section introduces the major features, competitive advantages and typical use-cases to help you get a high level overview of TDengine cloud service.
...@@ -11,7 +11,7 @@ This section introduces the major features, competitive advantages and typical u ...@@ -11,7 +11,7 @@ This section introduces the major features, competitive advantages and typical u
The major features are listed below: The major features are listed below:
1. Data In 1. Data In
- Supports [using SQL to insert](../data-in/insert-data). - Supports [using SQL to insert](../data-in/insert-data).
- Supports [Telegraf](../data-in/telegraf/). - Supports [Telegraf](../data-in/telegraf/).
- Supports [Prometheus](../data-in/prometheus/). - Supports [Prometheus](../data-in/prometheus/).
...@@ -21,13 +21,13 @@ The major features are listed below: ...@@ -21,13 +21,13 @@ The major features are listed below:
- Supports writing data to [Prometheus](../data-out/prometheus/). - Supports writing data to [Prometheus](../data-out/prometheus/).
- Supports exporting data via [data subscription](../tmq/). - Supports exporting data via [data subscription](../tmq/).
3. Data Explorer: browse through databases and even run SQL queryies once you login. 3. Data Explorer: browse through databases and even run SQL queryies once you login.
4. Visualization: 4. Visualization:
- Supports [Grafana](../visual/grafana/) - Supports [Grafana](../visual/grafana/)
- Supports Google data studio (to be released soon) - Supports Google data studio (to be released soon)
- Supports Grafana cloud (to be released soon) - Supports Grafana cloud (to be released soon)
6. [Stream Processing](../stream/): Not only is the continuous query is supported, but TDengine also supports event driven stream processing, so Flink or Spark is not needed for time-series data processing. 6. [Stream Processing](../stream/): Not only is the continuous query is supported, but TDengine also supports event driven stream processing, so Flink or Spark is not needed for time-series data processing.
7. [Data Subscription](../tmq/): Application can subscribe a table or a set of tables. API is the same as Kafka, but you can specify filter conditions. 7. [Data Subscription](../tmq/): Application can subscribe a table or a set of tables. API is the same as Kafka, but you can specify filter conditions.
8. Enterprise 8. Enterprise
- Supports backuping data everyday. - Supports backuping data everyday.
- Supports replicating a database to another region or cloud. - Supports replicating a database to another region or cloud.
- Supports VPC peering. - Supports VPC peering.
...@@ -45,7 +45,7 @@ For more details on features, please read through the entire documentation. ...@@ -45,7 +45,7 @@ For more details on features, please read through the entire documentation.
By making full use of [characteristics of time series data](https://tdengine.com/tsdb/characteristics-of-time-series-data/) and its cloud native design, TDengine Cloud differentiates itself from other time series data cloud services, with the following advantages. By making full use of [characteristics of time series data](https://tdengine.com/tsdb/characteristics-of-time-series-data/) and its cloud native design, TDengine Cloud differentiates itself from other time series data cloud services, with the following advantages.
- **Worry Free**: TDengine Cloud is a fast, elastic, serverless purpose built cloud platform for time-series data. It provides worry-free operations with a fully managed cloud service. You pay as you go. - **Worry Free**: TDengine Cloud is a fast, elastic, serverless purpose built cloud platform for time-series data. It provides worry-free operations with a fully managed cloud service. You pay as you go.
- **[Simplified Solution](https://tdengine.com/tdengine/simplified-time-series-data-solution/)**: Through built-in caching, stream processing and data subscription features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly. - **[Simplified Solution](https://tdengine.com/tdengine/simplified-time-series-data-solution/)**: Through built-in caching, stream processing and data subscription features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly.
...@@ -78,4 +78,4 @@ On the left-hand side, there are data collection agents like OPC-UA, MQTT, Teleg ...@@ -78,4 +78,4 @@ On the left-hand side, there are data collection agents like OPC-UA, MQTT, Teleg
## Typical Use Cases ## Typical Use Cases
As a high-performance and cloud native time-series database, TDengine's typical use case include but are not limited to IoT, Industrial Internet, Connected Vehicles, IT operation and maintenance, energy, financial markets and other fields. TDengine is a purpose-built database optimized for the characteristics of time series data. As such, it cannot be used to process data from web crawlers, social media, e-commerce, ERP, CRM and so on. More generally TDengine is not a suitable storage engine for non-time-series data. As a high-performance and cloud native time-series database, TDengine's typical use case include but are not limited to IoT, Industrial Internet, Connected Vehicles, IT operation and maintenance, energy, financial markets and other fields. TDengine is a purpose-built database optimized for the characteristics of time series data. As such, it cannot be used to process data from web crawlers, social media, e-commerce, ERP, CRM and so on. More generally TDengine is not a suitable storage engine for non-time-series data.
---
sidebar_label: Data Sharing
title: Data Sharing
description: Using topics to share data from TDengine.
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
The topic introduces how to share data from TDengine through the access control management of TDengine Cloud and the subscription interfaces of each supported connectors. The data owner first creates the topic through the topic wizard. Then adds the users or user groups which he wants to share the data with to the subscriber list of the topic. The subscriber of the topic can get the detail information how to access the shared data in TDengine in data subscription way. In this document we will briefly explain these main steps of data sharing.
## Create Topic
You can create the topic in Topics of TDengine Cloud. In the Create Topic dialog, you can choose wizard or SQL way to create the topic. In the wizard way, you need to input the topic name and select the database of the current TDengine instance. Then select the super table or specify the subquery with the super table or sub table. Also you can add fields selections or add result set and condition set for each field. In the following, you can get the detail of how to create the topic in three levels through wizard way. Additional, for SQL way, you can go to the [Data Subscription](../../tmq/) to get the details.
### To Database
The default selection in the Add New Topic dialog is database type. After select a database in the selection, you can click Confirm button to create a topic to a database.
![Create a new topic](./topic/add-topic-db.webp)
### To Super Table
In the opened Add New Topic dialog, you can click STable type and select a specified super table from the selections. Then click Confirm button to create a topic to a super table.
![Create a new topic to stable](./topic/add-topic-stable.webp)
### With Subquery
In the opened Add New Topic dialog, you can click Subquery type to show all subquery form items. The first item is Table Type and the default selection is STable. After you select or input a super table name, the following will show you all fields from the super table. You can check or uncheck each field for the sub query and also you can set the result set or condition set for each field. If you want to preview the SQL based on your chooses, click SQL Preiview to open a SQL dialog to view.
![Create a new topic with subquery for stable](./topic/add-topic-sub-stable.webp)
You can select another Table Table Table and then select a table from the selections or input an existed table name. You can get all fields of the selected table. You can check or uncheck each field for the sub query and also you can set the result set or condition set for each field. If you want to preview the SQL based on your chooses, click SQL Preiview to open a SQL dialog to view.
![Create a new topic with subquery for table](./topic/add-topic-sub-table.webp)
## Share Topic
In each row of the topic list in the Topics page, you can click Share Topic action icon to the Share Topic page. Also you can directly click Share Topic tab to switch to the right location. In the Share Topic tab, you can get only one row for yourself in the Users page.
### Users
In the default tab Users of the Share Topic page, you can click **Add Users** button to add more users who are active in the current organization. In the opened Add New Users dialog, you can select the new users who you want to share the topic with. Then you can set the expired time for the sharing to these users.
![Share topic](./share/share-topic-users.webp)
![Share topic users](./share/share-topic-adduser.webp)
### User Groups
You can click User Groups tab to switch to the User Groups page of the Share Topic. Then you can click **Add User Groups** button to add more user groups which are active in the current organization. In the opened Add New User Groups dialog, you can select the new user groups which you want to share the topic with. Then you can set the expired time for the sharing to these user groups.
![Share topic user groups](./share/share-topic-usergroup.webp)
![Share topic add user groups](./share/share-topic-addusergroup.webp)
## Consume Shared Topic
The shared user can get all topics which the creator shared with him, when he goes to the Topic page of Data Subscription. The user can click **Sample Code** icon of each topic **Action** area to the **Sample Code** page. Then he can follow the steps of the sample code how to consume the shared topic from TDengine instance.
### Configure TDengine DSN
<Tabs defaultValue="Bash" groupId="config">
<TabItem value="Bash" label="Bash">
```shell
export TDENGINE_CLOUD_ENDPOINT="{TDC_GATEWAY}"
export TDENGINE_CLOUD_TOKEN="{TDC_TOKEN}"
```
</TabItem>
<TabItem value="CMD" label="CMD">
```shell
set TDENGINE_CLOUD_ENDPOINT="{TDC_GATEWAY}"
set TDENGINE_CLOUD_TOKEN="{TDC_TOKEN}"
```
</TabItem>
<TabItem value="Powershell" label="Powershell">
```shell
$env:TDENGINE_CLOUD_ENDPOINT="{TDC_GATEWAY}"
$env:TDENGINE_CLOUD_TOKEN="{TDC_TOKEN}"
```
</TabItem>
</Tabs>
### Data Schema and API
The related schemas and APIs in various languages are described as follows:
<Tabs defaultValue="Python" groupId="lang">
<TabItem value="Python" label="Python">
```python
class TaosConsumer():
def __init__(self, *topics, **configs)
def __iter__(self)
def __next__(self)
def sync_next(self)
def subscription(self)
def unsubscribe(self)
def close(self)
def __del__(self)
```
</TabItem>
<TabItem label="Go" value="Go">
```go
func NewConsumer(conf *Config) (*Consumer, error)
func (c *Consumer) Close() error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) Subscribe(topics []string) error
func (c *Consumer) Unsubscribe() error
```
</TabItem>
<TabItem label="Rust" value="Rust">
```rust
impl TBuilder for TmqBuilder
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
fn build(&self) -> Result<Self::Target, Self::Error>
impl AsAsyncConsumer for Consumer
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self,
topics: I,
) -> Result<(), Self::Error>;
fn stream(
&self,
) -> Pin<
Box<
dyn '_
+ Send
+ futures::Stream<
Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
>,
>,
>;
async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
async fn unsubscribe(self);
```
For more information, see [Crate taos](https://docs.rs/taos).
</TabItem>
</Tabs>
### Create a Consumer from Instance
You configure the following parameters when creating a consumer:
| Parameter | Type | Description | Remarks |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td.connect.ip` | string | Used in establishing a connection; same as `taos_connect` | |
| `td.connect.user` | string | Used in establishing a connection; same as `taos_connect` | |
| `td.connect.pass` | string | Used in establishing a connection; same as `taos_connect` | |
| `td.connect.port` | string | Used in establishing a connection; same as `taos_connect` | |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
| `client.id` | string | Client ID | Maximum length: 192. |
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | |
| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages |
The method of specifying these parameters depends on the language used:
<Tabs defaultValue="Python" groupId="lang">
<TabItem value="Python" label="Python">
Python programs use the following parameters:
| Parameter | Type | Description | Remarks |
| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- |
| `td_connect_ip` | string | Used in establishing a connection; same as `taos_connect` | |
| `td_connect_user` | string | Used in establishing a connection; same as `taos_connect` | |
| `td_connect_pass` | string | Used in establishing a connection; same as `taos_connect` | |
| `td_connect_port` | string | Used in establishing a connection; same as `taos_connect` | |
| `group_id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
| `client_id` | string | Client ID | Maximum length: 192. |
| `auto_offset_reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable_auto_commit` | string | Commit automatically | Specify `true` or `false`. |
| `auto_commit_interval_ms` | string | Interval for automatic commits, in milliseconds |
| `enable_heartbeat_background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false`. |
| `experimental_snapshot_enable` | string | Specify whether to consume messages from the WAL or from TSBS | Specify `true` or `false`. |
| `msg_with_table_name` | string | Specify whether to deserialize table names from messages | Specify `true` or `false`.
| `timeout` | int | Consumer pull timeout | |
</TabItem>
<TabItem label="Go" value="Go">
```go
import (
"github.com/taosdata/driver-go/v3/ws/tmq"
)
endpoint := os.Getenv("TDENGINE_CLOUD_ENDPOINT")
token := os.Getenv("TDENGINE_CLOUD_TOKEN")
tmpDSN := fmt.Sprintf("ws://%s/rest/tmq?token=%s", endpoint, token)
config := tmq.NewConfig(tmpDSN, 0)
defer config.Destroy()
err = config.SetGroupID("test_group")
if err != nil {
panic(err)
}
err = config.SetClientID("test_consumer_ws") //
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
}
```
</TabItem>
<TabItem label="Rust" value="Rust">
```rust
let mut dsnURL = format!("ws://{}/rest/tmq?token={}", std::env::var("TDENGINE_CLOUD_ENDPOINT"), std::env::var("TDENGINE_CLOUD_TOKEN"));
let mut dsn: Dsn = dsnURL.parse()?;
dsn.set("group.id", "test_group");
dsn.set("client.id", "test_consumer_ws");
dsn.set("auto.offset.reset", "earliest");
let tmq = TmqBuilder::from_dsn(dsn)?;
let mut consumer = tmq.build()?;
```
</TabItem>
</Tabs>
A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.
### Subscribe to a Topic
A single consumer can subscribe to multiple topics.
<Tabs defaultValue="Python" groupId="lang">
<TabItem value="Python" label="Python">
```python
consumer = TaosConsumer('{TDC_TOPIC}', group_id='test_group')
```
</TabItem>
<TabItem value="Go" label="Go">
```go
consumer, err := tmq.NewConsumer(config)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"{TDC_TOPIC}"})
if err != nil {
panic(err)
}
```
</TabItem>
<TabItem value="Rust" label="Rust">
```rust
consumer.subscribe(["{TDC_TOPIC}"]).await?;
```
</TabItem>
</Tabs>
## Consume messages
The following code demonstrates how to consume the messages in a queue.
<Tabs defaultValue="Python" groupId="lang">
<TabItem value="Python" label="Python">
```python
for msg in consumer:
for row in msg:
print(row)
```
</TabItem>
<TabItem value="Go" label="Go">
```go
for {
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
}
fmt.Println(result)
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
}
```
</TabItem>
<TabItem value="Rust" label="Rust">
```rust
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
```
</TabItem>
</Tabs>
## Close the consumer
After message consumption is finished, the consumer is unsubscribed.
<Tabs defaultValue="Python" groupId="lang">
<TabItem value="Python" label="Python">
```py
# Unsubscribe
consumer.unsubscribe()
# Close consumer
consumer.close()
```
</TabItem>
<TabItem value="Go" label="Go">
```go
consumer.Close()
```
</TabItem>
<TabItem value="Rust" label="Rust">
```rust
consumer.unsubscribe().await;
```
</TabItem>
</Tabs>
### Sample Code
The following are full sample codes about how to consume the shared topic:
<Tabs defaultValue="Python" groupId="lang">
<TabItem value="Python" label="Python">
```python
{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}}
```
</TabItem>
<TabItem label="Go" value="Go">
```go
{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}}
```
</TabItem>
<TabItem label="Rust" value="Rust">
```rust
{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}}
```
</TabItem>
</Tabs>
此差异已折叠。
```c
{{#include docs/examples/c/tmq_example.c}}
```
```csharp
{{#include docs/examples/csharp/subscribe/Program.cs}}
```
\ No newline at end of file
```go
{{#include docs/examples/go/sub/main.go}}
```
\ No newline at end of file
```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
\ No newline at end of file
```js
{{#include docs/examples/node/nativeexample/subscribe_demo.js}}
```
\ No newline at end of file
```py
{{#include docs/examples/python/tmq_example.py}}
```
```rust
{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}}
```
using System;
using TDengineTMQ;
using TDengineDriver;
using System.Runtime.InteropServices;
namespace TMQExample
{
internal class SubscribeDemo
{
static void Main(string[] args)
{
IntPtr conn = GetConnection();
string topic = "topic_example";
//create topic
IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from meters");
if (TDengine.ErrorNo(res) != 0 )
{
throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
}
var cfg = new ConsumerConfig
{
GourpId = "group_1",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
MsgWithTableName = "true",
TDConnectIp = "127.0.0.1",
};
// create consumer
var consumer = new ConsumerBuilder(cfg)
.Build();
// subscribe
consumer.Subscribe(topic);
// consume
for (int i = 0; i < 5; i++)
{
var consumeRes = consumer.Consume(300);
// print consumeResult
foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
{
Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
kv.Value.Metas.ForEach(meta =>
{
Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
});
Console.WriteLine("");
kv.Value.Datas.ForEach(data =>
{
Console.WriteLine(data.ToString());
});
}
consumer.Commit(consumeRes);
Console.WriteLine("\n================ {0} done ", i);
}
// retrieve topic list
List<string> topics = consumer.Subscription();
topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
// unsubscribe
consumer.Unsubscribe();
// close consumer after use.Otherwise will lead memory leak.
consumer.Close();
TDengine.Close(conn);
}
static IntPtr GetConnection()
{
string host = "localhost";
short port = 6030;
string username = "root";
string password = "taosdata";
string dbname = "power";
var conn = TDengine.Connect(host, username, password, dbname, port);
if (conn == IntPtr.Zero)
{
throw new Exception("Connect to TDengine failed");
}
else
{
Console.WriteLine("Connect to TDengine success");
}
return conn;
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<StartupObject>TMQExample.SubscribeDemo</StartupObject>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="3.0.1" />
</ItemGroup>
</Project>
import taos import taos
from taos.tmq import TaosConsumer from taos.tmq import *
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
for msg in consumer: conn = taos.connect()
for row in msg:
print(row) print("init")
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
conn.select_db("py_tmq")
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
)
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
print("create topic")
conn.execute(
"create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1"
)
print("build consumer")
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
tmq = conf.new_consumer()
print("build topic list")
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
print("basic consume loop")
tmq.subscribe(topic_list)
sub_list = tmq.subscription()
print("subscribed topics: ", sub_list)
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row)
...@@ -18,15 +18,14 @@ struct Record { ...@@ -18,15 +18,14 @@ struct Record {
async fn prepare(taos: Taos) -> anyhow::Result<()> { async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([ let inserted = taos.exec_many([
"use tmq",
// create child table // create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table // insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values // insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists // insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql // insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?; ]).await?;
...@@ -36,20 +35,22 @@ async fn prepare(taos: Taos) -> anyhow::Result<()> { ...@@ -36,20 +35,22 @@ async fn prepare(taos: Taos) -> anyhow::Result<()> {
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "debug"); let dsn = "taos://localhost:6030";
pretty_env_logger::init(); let builder = TaosBuilder::from_dsn(dsn)?;
let dsn = std::env::var("TDENGINE_CLOUD_DSN")?;
let builder = TaosBuilder::from_dsn(&dsn)?;
let taos = builder.build()?; let taos = builder.build()?;
let db = "tmq";
// prepare database // prepare database
taos.exec_many([ taos.exec_many([
"DROP TOPIC IF EXISTS tmq_meters", format!("DROP TOPIC IF EXISTS tmq_meters"),
"USE tmq", format!("DROP DATABASE IF EXISTS `{db}`"),
"CREATE STABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))", format!("CREATE DATABASE `{db}`"),
"CREATE TOPIC tmq_meters with META AS DATABASE tmq" format!("USE `{db}`"),
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
// create topic for subscription
format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`")
]) ])
.await?; .await?;
...@@ -58,21 +59,14 @@ async fn main() -> anyhow::Result<()> { ...@@ -58,21 +59,14 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
// subscribe // subscribe
let dsn2 = format!("{dsn}&group.id=test"); let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
dbg!(&dsn2);
let tmq = TmqBuilder::from_dsn(dsn2)?;
let mut consumer = tmq.build()?; let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?; consumer.subscribe(["tmq_meters"]).await?;
println!("start subscription");
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic(); let topic = offset.topic();
// the vgroup id, like partition id in kafka. // the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id(); let vgroup_id = offset.vgroup_id();
...@@ -80,20 +74,14 @@ async fn main() -> anyhow::Result<()> { ...@@ -80,20 +74,14 @@ async fn main() -> anyhow::Result<()> {
if let Some(data) = message.into_data() { if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? { while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?; let records: Vec<Record> = block.deserialize().try_collect()?;
println!( println!("** read {} records: {:#?}\n", records.len(), records);
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
} }
} }
consumer.commit(offset).await?; consumer.commit(offset).await?;
} Ok(())
} })
.await?;
consumer.unsubscribe().await; consumer.unsubscribe().await;
......
taosadapter @ 566540d4
Subproject commit 566540d4a7f59d859378ff4ae7bb64ed4f0bc096
taosws-rs @ f406d516
Subproject commit f406d516dfab06979ca02e4f0abfc4b924264a1d
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册