提交 ee0a7271 编写于 作者: L Li Ya Qiang

add the latest go sample for data sharing

上级 8baf3ae0
...@@ -6,11 +6,11 @@ description: Using topics to share data from TDengine. ...@@ -6,11 +6,11 @@ description: Using topics to share data from TDengine.
import Tabs from "@theme/Tabs"; import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem"; import TabItem from "@theme/TabItem";
The topic introduces how to share data from TDengine through the access control management of TDengine Cloud and the subscription interfaces of each supported connectors. The data owner first creates the topic through the topic wizard. Then adds the users or user groups which he wants to share the data with to the subscriber list of the topic. The subscriber of the topic can get the detail information how to access the shared data in TDengine in data subscription way. In this document we will briefly explain these main steps of data sharing. The topic introduces how to share data from TDengine instance through the access control management of TDengine Cloud and the subscription interfaces of each supported connectors. The data owner first creates the topic through the topic wizard. Then adds the users or user groups which he wants to share the data with to the subscribers of the topic. The subscriber of the topic can get the details about how to access the shared data from TDengine in the data subscription way. In this document we will briefly explain these main steps of data sharing.
## Create Topic ## Create Topic
You can create the topic in Topics of TDengine Cloud. In the Create Topic dialog, you can choose wizard or SQL way to create the topic. In the wizard way, you need to input the topic name and select the database of the current TDengine instance. Then select the super table or specify the subquery with the super table or sub table. Also you can add fields selections or add result set and condition set for each field. In the following, you can get the detail of how to create the topic in three levels through wizard way. Additional, for SQL way, you can go to the [Data Subscription](../../tmq/) to get the details. You can create the topic in Topics of TDengine Cloud. In the Create Topic dialog, you can choose wizard or SQL way to create the topic. In the wizard way, you need to input the topic name and select the database of the current TDengine instance. Then select the super table or specify the subquery with the super table or sub table. Also you can add fields selections or add result set and condition set for each field. In the following, you can get the detail of how to create the topic in three levels through wizard way.
### To Database ### To Database
...@@ -58,62 +58,11 @@ You can click User Groups tab to switch to the User Groups page of the Share Top ...@@ -58,62 +58,11 @@ You can click User Groups tab to switch to the User Groups page of the Share Top
The shared user can get all topics which the creator shared with him, when he goes to the Topic page of Data Subscription. The user can click **Sample Code** icon of each topic **Action** area to the **Sample Code** page. Then he can follow the steps of the sample code how to consume the shared topic from TDengine instance. The shared user can get all topics which the creator shared with him, when he goes to the Topic page of Data Subscription. The user can click **Sample Code** icon of each topic **Action** area to the **Sample Code** page. Then he can follow the steps of the sample code how to consume the shared topic from TDengine instance.
### Configure TDengine DSN
<Tabs defaultValue="Bash" groupId="config">
<TabItem value="Bash" label="Bash">
```shell
export TDENGINE_CLOUD_ENDPOINT="{TDC_GATEWAY}"
export TDENGINE_CLOUD_TOKEN="{TDC_TOKEN}"
```
</TabItem>
<TabItem value="CMD" label="CMD">
```shell
set TDENGINE_CLOUD_ENDPOINT="{TDC_GATEWAY}"
set TDENGINE_CLOUD_TOKEN="{TDC_TOKEN}"
```
</TabItem>
<TabItem value="Powershell" label="Powershell">
```shell
$env:TDENGINE_CLOUD_ENDPOINT="{TDC_GATEWAY}"
$env:TDENGINE_CLOUD_TOKEN="{TDC_TOKEN}"
```
</TabItem>
</Tabs>
### Data Schema and API ### Data Schema and API
The related schemas and APIs in various languages are described as follows: The related schemas and APIs in various languages are described as follows:
<Tabs defaultValue="Python" groupId="lang"> <Tabs defaultValue="Go" groupId="lang">
<TabItem value="Python" label="Python">
```python
class TaosConsumer():
def __init__(self, *topics, **configs)
def __iter__(self)
def __next__(self)
def sync_next(self)
def subscription(self)
def unsubscribe(self)
def close(self)
def __del__(self)
```
</TabItem>
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
...@@ -165,9 +114,97 @@ impl AsAsyncConsumer for Consumer ...@@ -165,9 +114,97 @@ impl AsAsyncConsumer for Consumer
For more information, see [Crate taos](https://docs.rs/taos). For more information, see [Crate taos](https://docs.rs/taos).
</TabItem>
<TabItem value="Python" label="Python">
```python
class TaosConsumer():
def __init__(self, *topics, **configs)
def __iter__(self)
def __next__(self)
def sync_next(self)
def subscription(self)
def unsubscribe(self)
def close(self)
def __del__(self)
```
</TabItem>
</Tabs>
### Configure TDengine DSN
You can set the following for Go and Rust:
<Tabs defaultValue="Bash" groupId="config">
<TabItem value="Bash" label="Bash">
```shell
export TDENGINE_CLOUD_TMQ="<TDENGINE_CLOUD_TMQ>"
```
</TabItem>
<TabItem value="CMD" label="CMD">
```shell
set TDENGINE_CLOUD_TMQ="<TDENGINE_CLOUD_TMQ>"
```
</TabItem>
<TabItem value="Powershell" label="Powershell">
```shell
$env:TDENGINE_CLOUD_TMQ="<TDENGINE_CLOUD_TMQ>"
```
</TabItem> </TabItem>
</Tabs> </Tabs>
:::note
Replace <TDENGINE_CLOUD_TMQ> with the real value, the format should be `wss://<cloud_endpoint>)/rest/tmq?token=<token>`.
To obtain the value of `TDENGINE_CLOUD_TMQ`, please log in [TDengine Cloud](https://cloud.tdengine.com) and click **Topcis** on the left menu, then click **Sample Code** action of the each topic to **Example** part.
:::
Especially, for Python, you need to set the following variables:
<Tabs defaultValue="Bash" groupId="config">
<TabItem value="Bash" label="Bash">
```shell
export TDENGINE_CLOUD_ENDPOINT="<TDENGINE_CLOUD_ENDPOINT>"
export TDENGINE_CLOUD_TOKEN="<TDENGINE_CLOUD_TOKEN>"
```
</TabItem>
<TabItem value="CMD" label="CMD">
```shell
set TDENGINE_CLOUD_ENDPOINT="<TDENGINE_CLOUD_ENDPOINT>"
set TDENGINE_CLOUD_TOKEN="<TDENGINE_CLOUD_TOKEN>"
```
</TabItem>
<TabItem value="Powershell" label="Powershell">
```shell
$env:TDENGINE_CLOUD_ENDPOINT="<TDENGINE_CLOUD_ENDPOINT>"
$env:TDENGINE_CLOUD_TOKEN="<TDENGINE_CLOUD_TOKEN>"
```
</TabItem>
</Tabs>
:::note
Replace <TDENGINE_CLOUD_ENDPOINT> and <TDENGINE_CLOUD_TOKEN> with the real values. To obtain the value of these, please log in [TDengine Cloud](https://cloud.tdengine.com) and click **Topcis** on the left menu, then click **Sample Code** action of the each topic to the **Python** tab of the **Example** part.
:::
### Create a Consumer from Instance ### Create a Consumer from Instance
You configure the following parameters when creating a consumer: You configure the following parameters when creating a consumer:
...@@ -189,7 +226,44 @@ You configure the following parameters when creating a consumer: ...@@ -189,7 +226,44 @@ You configure the following parameters when creating a consumer:
The method of specifying these parameters depends on the language used: The method of specifying these parameters depends on the language used:
<Tabs defaultValue="Python" groupId="lang"> <Tabs defaultValue="Go" groupId="lang">
<TabItem label="Go" value="Go">
```go
import (
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
"github.com/taosdata/driver-go/v3/ws/tmq"
)
tmqStr := os.Getenv("TDENGINE_CLOUD_TMQ")
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": tmqStr,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"group.id": "test_group",
"client.id": "test_consumer_ws",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
```
</TabItem>
<TabItem label="Rust" value="Rust">
```rust
let tmq_str = std::env::var("TDENGINE_CLOUD_TMQ")?;
let tmq_uri = format!( "{}&group.id=test_group_rs&client.id=test_consumer_ws", tmq_str);
println!("request tmq URI is {tmq_uri}\n");
let tmq = TmqBuilder::from_dsn(tmq_uri,)?;
let mut consumer = tmq.build()?;
```
</TabItem>
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
...@@ -209,57 +283,27 @@ Python programs use the following parameters: ...@@ -209,57 +283,27 @@ Python programs use the following parameters:
| `enable_heartbeat_background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false`. | | `enable_heartbeat_background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false`. |
| `experimental_snapshot_enable` | string | Specify whether to consume messages from the WAL or from TSBS | Specify `true` or `false`. | | `experimental_snapshot_enable` | string | Specify whether to consume messages from the WAL or from TSBS | Specify `true` or `false`. |
| `msg_with_table_name` | string | Specify whether to deserialize table names from messages | Specify `true` or `false`. | `msg_with_table_name` | string | Specify whether to deserialize table names from messages | Specify `true` or `false`.
| `timeout` | int | Consumer pull timeout | | | `timeout` | int | Consumer pull timeout | |
</TabItem>
<TabItem label="Go" value="Go">
```go ```python
import ( endpoint = os.environ["TDENGINE_CLOUD_ENDPOINT"]
"github.com/taosdata/driver-go/v3/ws/tmq" token = os.environ["TDENGINE_CLOUD_TOKEN"]
) urlparts = endpoint.split(":", 1)
endpoint := os.Getenv("TDENGINE_CLOUD_ENDPOINT")
token := os.Getenv("TDENGINE_CLOUD_TOKEN") conf = {
tmpDSN := fmt.Sprintf("ws://%s/rest/tmq?token=%s", endpoint, token) # auth options
config := tmq.NewConfig(tmpDSN, 0) "td.connect.websocket.scheme": "wss",
defer config.Destroy() "td.connect.ip": urlparts[0],
err = config.SetGroupID("test_group") "td.connect.port": urlparts[1],
if err != nil { "td.connect.token": token,
panic(err) # consume options
} "group.id": "test_group_py",
err = config.SetClientID("test_consumer_ws") // "client.id": "test_consumer_ws_py",
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
} }
consumer = Consumer(conf)
``` ```
</TabItem> </TabItem>
<TabItem label="Rust" value="Rust">
```rust
let mut dsnURL = format!("ws://{}/rest/tmq?token={}", std::env::var("TDENGINE_CLOUD_ENDPOINT"), std::env::var("TDENGINE_CLOUD_TOKEN"));
let mut dsn: Dsn = dsnURL.parse()?;
dsn.set("group.id", "test_group");
dsn.set("client.id", "test_consumer_ws");
dsn.set("auto.offset.reset", "earliest");
let tmq = TmqBuilder::from_dsn(dsn)?;
let mut consumer = tmq.build()?;
```
</TabItem>
</Tabs> </Tabs>
A consumer group is automatically created when multiple consumers are configured with the same consumer group ID. A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.
...@@ -268,24 +312,14 @@ A consumer group is automatically created when multiple consumers are configured ...@@ -268,24 +312,14 @@ A consumer group is automatically created when multiple consumers are configured
A single consumer can subscribe to multiple topics. A single consumer can subscribe to multiple topics.
<Tabs defaultValue="Python" groupId="lang"> <Tabs defaultValue="Go" groupId="lang">
<TabItem value="Python" label="Python">
```python
consumer = TaosConsumer('{TDC_TOPIC}', group_id='test_group')
```
</TabItem>
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
```go ```go
consumer, err := tmq.NewConsumer(config) err = consumer.Subscribe("<TDC_TOPIC>", nil)
if err != nil { if err != nil {
panic(err) panic(err)
}
err = consumer.Subscribe([]string{"{TDC_TOPIC}"})
if err != nil {
panic(err)
} }
``` ```
...@@ -293,39 +327,50 @@ if err != nil { ...@@ -293,39 +327,50 @@ if err != nil {
<TabItem value="Rust" label="Rust"> <TabItem value="Rust" label="Rust">
```rust ```rust
consumer.subscribe(["{TDC_TOPIC}"]).await?; consumer.subscribe(["<TDC_TOPIC>"]).await?;
``` ```
</TabItem> </TabItem>
</Tabs>
## Consume messages
The following code demonstrates how to consume the messages in a queue.
<Tabs defaultValue="Python" groupId="lang">
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
for msg in consumer: consumer.subscribe(["<TDC_TOPIC>"])
for row in msg:
print(row)
``` ```
</TabItem> </TabItem>
</Tabs>
:::note
Replace <TDC_TOPIC\> with the real value. To obtain the value of `TDC_TOPIC`, please log in [TDengine Cloud](https://cloud.tdengine.com) and click **Topcis** on the left menu, then copy the topic name you want to consume.
:::
## Consume messages
The following code demonstrates how to consume the messages in a queue.
<Tabs defaultValue="Go" groupId="lang">
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
```go ```go
for { for {
result, err := consumer.Poll(time.Second) ev := consumer.Poll(0)
if err != nil { if ev != nil {
panic(err) switch e := ev.(type) {
} case *tmqcommon.DataMessage:
fmt.Println(result) fmt.Printf("get message:%v\n", e.String())
consumer.Commit(context.Background(), result.Message) consumer.Commit()
consumer.FreeMessage(result.Message) case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
return
default:
fmt.Printf("unexpected event:%v\n", e)
return
}
}
} }
``` ```
...@@ -334,56 +379,62 @@ for { ...@@ -334,56 +379,62 @@ for {
<TabItem value="Rust" label="Rust"> <TabItem value="Rust" label="Rust">
```rust ```rust
{ // consume loop
let mut stream = consumer.stream(); consumer
.stream()
while let Some((offset, message)) = stream.try_next().await? { .try_for_each_concurrent(10, |(offset, message)| async {
// get information from offset let topic = offset.topic();
// the vgroup id, like partition id in kafka.
// the topic let vgroup_id = offset.vgroup_id();
let topic = offset.topic(); println!("* in vgroup id {vgroup_id} of topic {topic}\n");
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id(); if let Some(data) = message.into_data() {
println!("* in vgroup id {vgroup_id} of topic {topic}\n"); while let Some(block) = data.fetch_raw_block().await? {
// A two-dimension matrix while each cell is a [taos::Value] object.
if let Some(data) = message.into_data() { let values = block.to_values();
while let Some(block) = data.fetch_raw_block().await? { // Number of rows.
// one block for one table, get table name if needed assert_eq!(values.len(), block.nrows());
let name = block.table_name(); // Number of columns
let records: Vec<Record> = block.deserialize().try_collect()?; assert_eq!(values[0].len(), block.ncols());
println!( println!("first row: {}", values[0].iter().join(", "));
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
} }
consumer.commit(offset).await?; }
} consumer.commit(offset).await?;
} Ok(())
})
.await?;
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
```python
while 1:
message = consumer.poll(timeout=1.0)
if message:
id = message.vgroup()
topic = message.topic()
database = message.database()
for block in message:
nrows = block.nrows()
ncols = block.ncols()
for row in block:
print(row)
values = block.fetchall()
print(nrows, ncols)
else:
break
```
</TabItem>
</Tabs> </Tabs>
## Close the consumer ## Close the consumer
After message consumption is finished, the consumer is unsubscribed. After message consumption is finished, the consumer is unsubscribed.
<Tabs defaultValue="Python" groupId="lang"> <Tabs defaultValue="Go" groupId="lang">
<TabItem value="Python" label="Python">
```py
# Unsubscribe
consumer.unsubscribe()
# Close consumer
consumer.close()
```
</TabItem>
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
...@@ -401,26 +452,29 @@ consumer.unsubscribe().await; ...@@ -401,26 +452,29 @@ consumer.unsubscribe().await;
</TabItem> </TabItem>
</Tabs> <TabItem value="Python" label="Python">
### Sample Code ```py
# Unsubscribe
consumer.unsubscribe()
# Close consumer
consumer.close()
```
The following are full sample codes about how to consume the shared topic: </TabItem>
<Tabs defaultValue="Python" groupId="lang"> </Tabs>
<TabItem value="Python" label="Python"> ### Sample Code
```python The following are full sample codes about how to consume the shared topic **test**:
{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}}
```
</TabItem> <Tabs defaultValue="Go" groupId="lang">
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
```go ```go
{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}} {{#include docs/examples/go/sub/cloud/main.go}}
``` ```
</TabItem> </TabItem>
...@@ -428,7 +482,15 @@ The following are full sample codes about how to consume the shared topic: ...@@ -428,7 +482,15 @@ The following are full sample codes about how to consume the shared topic:
<TabItem label="Rust" value="Rust"> <TabItem label="Rust" value="Rust">
```rust ```rust
{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}} {{#include docs/examples/rust/cloud-example/examples/sub.rs}}
```
</TabItem>
<TabItem value="Python" label="Python">
```python
{{#include docs/examples/python/cloud/sub.py}}
``` ```
</TabItem> </TabItem>
......
---
sidebar_label: Data Subscription
description: "The TDengine data subscription service automatically pushes data written in TDengine to subscribing clients."
title: Data Subscription
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";
TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka.
To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications.
By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart.
To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.
## Data Schema and API
The related schemas and APIs in various languages are described as follows:
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(int32_t code);
DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
};
typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
```
For more information, see [C/C++ Connector](/reference/connector/cpp).
The following example is based on the smart meter table described in Data Models. For complete sample code, see the C language section below.
</TabItem>
<TabItem value="java" label="Java">
```java
void subscribe(Collection<String> topics) throws SQLException;
void unsubscribe() throws SQLException;
Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitSync() throws SQLException;
void close() throws SQLException;
```
</TabItem>
<TabItem value="Python" label="Python">
```python
class TaosConsumer():
def __init__(self, *topics, **configs)
def __iter__(self)
def __next__(self)
def sync_next(self)
def subscription(self)
def unsubscribe(self)
def close(self)
def __del__(self)
```
</TabItem>
<TabItem label="Go" value="Go">
```go
func NewConsumer(conf *Config) (*Consumer, error)
func (c *Consumer) Close() error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) Subscribe(topics []string) error
func (c *Consumer) Unsubscribe() error
```
</TabItem>
<TabItem label="Rust" value="Rust">
```rust
impl TBuilder for TmqBuilder
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
fn build(&self) -> Result<Self::Target, Self::Error>
impl AsAsyncConsumer for Consumer
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self,
topics: I,
) -> Result<(), Self::Error>;
fn stream(
&self,
) -> Pin<
Box<
dyn '_
+ Send
+ futures::Stream<
Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
>,
>,
>;
async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
async fn unsubscribe(self);
```
For more information, see [Crate taos](https://docs.rs/taos).
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```js
function TMQConsumer(config)
function subscribe(topic)
function consume(timeout)
function subscription()
function unsubscribe()
function commit(msg)
function close()
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
virtual IConsumer Build()
Consumer(ConsumerBuilder builder)
void Subscribe(IEnumerable<string> topics)
void Subscribe(string topic)
ConsumeResult Consume(int millisecondsTimeout)
List<string> Subscription()
void Unsubscribe()
void Commit(ConsumeResult consumerResult)
void Close()
```
</TabItem>
</Tabs>
## Insert Data into TDengine
A database including one supertable and two subtables is created as follows:
```sql
DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
```
## Create a Topic
The following SQL statement creates a topic in TDengine:
```sql
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```
Multiple subscription types are supported.
#### Subscribe to a Column
Syntax:
```sql
CREATE TOPIC topic_name as subquery
```
You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as `SELECT *` and `SELECT ts, cl` are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note:
- The schema of topics created in this manner is determined by the subscribed data.
- You cannot modify (`ALTER <table> MODIFY`) or delete (`ALTER <table> DROP`) columns or tags that are used in a subscription or calculation.
- Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error.
### Subscribe to a Supertable
Syntax:
```sql
CREATE TOPIC topic_name AS STABLE stb_name
```
Creating a topic in this manner differs from a `SELECT * from stbName` statement as follows:
- The table schema can be modified.
- Unstructured data is returned. The format of the data returned changes based on the supertable schema.
- A different table schema may exist for every data block to be processed.
- The data returned does not include tags.
### Subscribe to a Database
Syntax:
```sql
CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
```
This SQL statement creates a subscription to all tables in the database. You can add the `WITH META` parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka.
## Create a Consumer
You configure the following parameters when creating a consumer:
| Parameter | Type | Description | Remarks |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td.connect.ip` | string | Used in establishing a connection; same as `taos_connect` | |
| `td.connect.user` | string | Used in establishing a connection; same as `taos_connect` | |
| `td.connect.pass` | string | Used in establishing a connection; same as `taos_connect` | |
| `td.connect.port` | string | Used in establishing a connection; same as `taos_connect` | |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
| `client.id` | string | Client ID | Maximum length: 192. |
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | |
| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages |
The method of specifying these parameters depends on the language used:
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
/* Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) */
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
```
</TabItem>
<TabItem value="java" label="Java">
Java programs use the following parameters:
| Parameter | Type | Description | Remarks |
| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
| `bootstrap.servers` | string |Connection address, such as `localhost:6030` |
| `value.deserializer` | string | Value deserializer; to use this method, implement the `com.taosdata.jdbc.tmq.Deserializer` interface or inherit the `com.taosdata.jdbc.tmq.ReferenceDeserializer` type |
| `value.deserializer.encoding` | string | Specify the encoding for string deserialization | |
Note: The `bootstrap.servers` parameter is used instead of `td.connect.ip` and `td.connect.port` to provide an interface that is consistent with Kafka.
```java
Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);
/* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
```
</TabItem>
<TabItem label="Go" value="Go">
```go
config := tmq.NewConfig()
defer config.Destroy()
err = config.SetGroupID("test")
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
}
err = config.SetConnectIP("127.0.0.1")
if err != nil {
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil {
panic(err)
}
```
</TabItem>
<TabItem label="Rust" value="Rust">
```rust
let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1");
dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "earliest");
let tmq = TmqBuilder::from_dsn(dsn)?;
let mut consumer = tmq.build()?;
```
</TabItem>
<TabItem value="Python" label="Python">
Python programs use the following parameters:
| Parameter | Type | Description | Remarks |
| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- |
| `td_connect_ip` | string | Used in establishing a connection; same as `taos_connect` | |
| `td_connect_user` | string | Used in establishing a connection; same as `taos_connect` | |
| `td_connect_pass` | string | Used in establishing a connection; same as `taos_connect` | |
| `td_connect_port` | string | Used in establishing a connection; same as `taos_connect` | |
| `group_id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
| `client_id` | string | Client ID | Maximum length: 192. |
| `auto_offset_reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable_auto_commit` | string | Commit automatically | Specify `true` or `false`. |
| `auto_commit_interval_ms` | string | Interval for automatic commits, in milliseconds |
| `enable_heartbeat_background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false`. |
| `experimental_snapshot_enable` | string | Specify whether to consume messages from the WAL or from TSBS | Specify `true` or `false`. |
| `msg_with_table_name` | string | Specify whether to deserialize table names from messages | Specify `true` or `false`.
| `timeout` | int | Consumer pull timeout | |
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```js
// Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
// an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass)
let consumer = taos.consumer({
'enable.auto.commit': 'true',
'auto.commit.interval.ms','1000',
'group.id': 'tg2',
'td.connect.user': 'root',
'td.connect.pass': 'taosdata',
'auto.offset.reset','earliest',
'msg.with.table.name': 'true',
'td.connect.ip','127.0.0.1',
'td.connect.port','6030'
});
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
using TDengineTMQ;
// Create consumer groups on demand (GourpID) and enable automatic commits (EnableAutoCommit),
// an automatic commit interval (AutoCommitIntervalMs), and a username (TDConnectUser) and password (TDConnectPasswd)
var cfg = new ConsumerConfig
{
EnableAutoCommit = "true"
AutoCommitIntervalMs = "1000"
GourpId = "TDengine-TMQ-C#",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
AutoOffsetReset = "earliest"
MsgWithTableName = "true",
TDConnectIp = "127.0.0.1",
TDConnectPort = "6030"
};
var consumer = new ConsumerBuilder(cfg).Build();
```
</TabItem>
</Tabs>
A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.
## Subscribe to a Topic
A single consumer can subscribe to multiple topics.
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
// Create a list of subscribed topics
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// Enable subscription
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
```
</TabItem>
<TabItem value="java" label="Java">
```java
List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
```
</TabItem>
<TabItem value="Go" label="Go">
```go
consumer, err := tmq.NewConsumer(config)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
if err != nil {
panic(err)
}
```
</TabItem>
<TabItem value="Rust" label="Rust">
```rust
consumer.subscribe(["tmq_meters"]).await?;
```
</TabItem>
<TabItem value="Python" label="Python">
```python
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```js
// Create a list of subscribed topics
let topics = ['topic_test']
// Enable subscription
consumer.subscribe(topics);
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
// Create a list of subscribed topics
List<String> topics = new List<string>();
topics.add("tmq_topic");
// Enable subscription
consumer.Subscribe(topics);
```
</TabItem>
</Tabs>
## Consume messages
The following code demonstrates how to consume the messages in a queue.
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
## Consume data
while (running) {
TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
msg_process(msg);
}
```
The `while` loop obtains a message each time it calls `tmq_consumer_poll()`. This message is exactly the same as the result returned by a query, and the same deserialization API can be used on it.
</TabItem>
<TabItem value="java" label="Java">
```java
while(running){
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
processMsg(meter);
}
}
```
</TabItem>
<TabItem value="Go" label="Go">
```go
for {
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
}
fmt.Println(result)
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
}
```
</TabItem>
<TabItem value="Rust" label="Rust">
```rust
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
```
</TabItem>
<TabItem value="Python" label="Python">
```python
for msg in consumer:
for row in msg:
print(row)
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```js
while(true){
msg = consumer.consume(200);
// process message(consumeResult)
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
}
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
## Consume data
while (true)
{
var consumerRes = consumer.Consume(100);
// process ConsumeResult
ProcessMsg(consumerRes);
consumer.Commit(consumerRes);
}
```
</TabItem>
</Tabs>
## Close the consumer
After message consumption is finished, the consumer is unsubscribed.
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
/* Unsubscribe */
tmq_unsubscribe(tmq);
/* Close consumer object */
tmq_consumer_close(tmq);
```
</TabItem>
<TabItem value="java" label="Java">
```java
/* Unsubscribe */
consumer.unsubscribe();
/* Close consumer */
consumer.close();
```
</TabItem>
<TabItem value="Go" label="Go">
```go
consumer.Close()
```
</TabItem>
<TabItem value="Rust" label="Rust">
```rust
consumer.unsubscribe().await;
```
</TabItem>
<TabItem value="Python" label="Python">
```py
# Unsubscribe
consumer.unsubscribe()
# Close consumer
consumer.close()
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```js
consumer.unsubscribe();
consumer.close();
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
// Unsubscribe
consumer.Unsubscribe();
// Close consumer
consumer.Close();
```
</TabItem>
</Tabs>
## Delete a Topic
You can delete topics that are no longer useful. Note that you must unsubscribe all consumers from a topic before deleting it.
```sql
/* Delete topic/
DROP TOPIC topic_name;
```
## Check Status
1. Query all existing topics.
```sql
SHOW TOPICS;
```
2. Query the status and subscribed topics of all consumers.
```sql
SHOW CONSUMERS;
```
3. Query the relationships between consumers and vgroups.
```sql
SHOW SUBSCRIPTIONS;
```
## Examples
The following section shows sample code in various languages.
<Tabs defaultValue="java" groupId="lang">
<TabItem label="C" value="c">
<CDemo />
</TabItem>
<TabItem label="Java" value="java">
<Java />
</TabItem>
<TabItem label="Go" value="Go">
<Go/>
</TabItem>
<TabItem label="Rust" value="Rust">
<Rust />
</TabItem>
<TabItem label="Python" value="Python">
<Python />
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
<Node/>
</TabItem>
<TabItem label="C#" value="C#">
<CSharp/>
</TabItem>
</Tabs>
...@@ -54,8 +54,8 @@ $env:TDENGINE_GO_DSN="<goDSN>" ...@@ -54,8 +54,8 @@ $env:TDENGINE_GO_DSN="<goDSN>"
<!-- exclude --> <!-- exclude -->
:::note :::note
Replace <goDSN\> with the real value, the format should be `https(<cloud_host>)/?token=<token>`. Replace <goDSN\> with the real value, the format should be `https(<cloud_endpoint>)/?token=<token>`.
To obtain the value of `goDSN`, please log in [TDengine Cloud](https://cloud.tdengine.com) and click "Data In" on the lef menu. To obtain the value of `goDSN`, please log in [TDengine Cloud](https://cloud.tdengine.com) and click "Data In" on the left menu.
::: :::
<!-- exclude-end --> <!-- exclude-end -->
......
```c
{{#include docs/examples/c/tmq_example.c}}
```
```csharp
{{#include docs/examples/csharp/subscribe/Program.cs}}
```
\ No newline at end of file
```go
{{#include docs/examples/go/sub/main.go}}
```
\ No newline at end of file
```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
\ No newline at end of file
```js
{{#include docs/examples/node/nativeexample/subscribe_demo.js}}
```
\ No newline at end of file
```py
{{#include docs/examples/python/tmq_example.py}}
```
```rust
{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}}
```
package main
import (
"fmt"
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
"github.com/taosdata/driver-go/v3/ws/tmq"
"os"
)
func main() {
tmqStr := os.Getenv("TDENGINE_CLOUD_TMQ")
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": tmqStr,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"group.id": "test_group",
"client.id": "test_consumer_ws",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
err = consumer.Subscribe("test", nil)
if err != nil {
panic(err)
}
defer consumer.Close()
for {
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e.String())
consumer.Commit()
case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
return
default:
fmt.Printf("unexpected event:%v\n", e)
return
}
}
}
}
#!/usr/bin/env python
import os
from taosws import Consumer
endpoint = os.environ["TDENGINE_CLOUD_ENDPOINT"]
token = os.environ["TDENGINE_CLOUD_TOKEN"]
urlparts = endpoint.split(":", 1)
conf = {
# auth options
"td.connect.websocket.scheme": "ws",
"td.connect.ip": urlparts[0],
"td.connect.port": urlparts[1],
"td.connect.token": token,
# consume options
"group.id": "test_group_py",
"client.id": "test_consumer_ws_py",
}
consumer = Consumer(conf)
consumer.subscribe(["test"])
while 1:
message = consumer.poll(timeout=1.0)
if message:
id = message.vgroup()
topic = message.topic()
database = message.database()
for block in message:
nrows = block.nrows()
ncols = block.ncols()
for row in block:
print(row)
values = block.fetchall()
print(nrows, ncols)
else:
break
consumer.close()
\ No newline at end of file
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// subscribe
let tmq_str = std::env::var("TDENGINE_CLOUD_TMQ")?;
let tmq_uri = format!( "{}&group.id=test_group_rs&client.id=test_consumer_ws", tmq_str);
println!("request tmq URI is {tmq_uri}\n");
let tmq = TmqBuilder::from_dsn(tmq_uri,)?;
let mut consumer = tmq.build()?;
consumer.subscribe(["test"]).await?;
// consume loop
consumer
.stream()
.try_for_each_concurrent(10, |(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// A two-dimension matrix while each cell is a [taos::Value] object.
let values = block.to_values();
// Number of rows.
assert_eq!(values.len(), block.nrows());
// Number of columns
assert_eq!(values[0].len(), block.ncols());
println!("first row: {}", values[0].iter().join(", "));
}
}
consumer.commit(offset).await?;
Ok(())
})
.await?;
consumer.unsubscribe().await;
Ok(())
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册