未验证 提交 f6cd7a03 编写于 作者: L Linhe Huo 提交者: GitHub

Merge pull request #16163 from taosdata/docs/tmq-rust

docs: add rust examples for subscription document
...@@ -132,6 +132,58 @@ func (c *Consumer) Unsubscribe() error ...@@ -132,6 +132,58 @@ func (c *Consumer) Unsubscribe() error
</TabItem> </TabItem>
<TabItem label="Rust" value="Rust">
```rust
impl TBuilder for TmqBuilder
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
fn build(&self) -> Result<Self::Target, Self::Error>
impl AsAsyncConsumer for Consumer
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self,
topics: I,
) -> Result<(), Self::Error>;
fn stream(
&self,
) -> Pin<
Box<
dyn '_
+ Send
+ futures::Stream<
Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
>,
>,
>;
async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
async fn unsubscribe(self);
```
可在 <https://docs.rs/taos> 上查看详细 API 说明。
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```js
function TMQConsumer(config)
function subscribe(topic)
function consume(timeout)
function subscription()
function unsubscribe()
function commit(msg)
function close()
```
</TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
...@@ -157,27 +209,6 @@ void Close() ...@@ -157,27 +209,6 @@ void Close()
``` ```
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS">
```node
function TMQConsumer(config)
function subscribe(topic)
function consume(timeout)
function subscription()
function unsubscribe()
function commit(msg)
function close()
```
</TabItem>
</Tabs> </Tabs>
## 写入数据 ## 写入数据
...@@ -321,28 +352,6 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> { ...@@ -321,28 +352,6 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
Python 使用以下配置项创建一个 Consumer 实例。
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
| `client_id` | string | 客户端 ID | 最大长度:192。 |
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
| `timeout` | int | 消费者拉去的超时时间 | |
</TabItem>
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
```go ```go
...@@ -394,35 +403,46 @@ if err != nil { ...@@ -394,35 +403,46 @@ if err != nil {
</TabItem> </TabItem>
<TabItem value="C#" label="C#"> <TabItem label="Rust" value="Rust">
```csharp ```rust
using TDengineTMQ; let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1");
dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "earliest");
// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 let tmq = TmqBuilder::from_dsn(dsn)?;
// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数
var cfg = new ConsumerConfig
{
EnableAutoCommit = "true"
AutoCommitIntervalMs = "1000"
GourpId = "TDengine-TMQ-C#",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
AutoOffsetReset = "earliest"
MsgWithTableName = "true",
TDConnectIp = "127.0.0.1",
TDConnectPort = "6030"
};
var consumer = new ConsumerBuilder(cfg).Build();
let mut consumer = tmq.build()?;
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
Python 使用以下配置项创建一个 Consumer 实例。
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- |
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
| `client_id` | string | 客户端 ID | 最大长度:192。 |
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
| `timeout` | int | 消费者拉去的超时时间 | |
</TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
``` node ```js
// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
...@@ -437,6 +457,31 @@ let consumer = taos.consumer({ ...@@ -437,6 +457,31 @@ let consumer = taos.consumer({
'td.connect.ip','127.0.0.1', 'td.connect.ip','127.0.0.1',
'td.connect.port','6030' 'td.connect.port','6030'
}); });
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
using TDengineTMQ;
// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、
// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数
var cfg = new ConsumerConfig
{
EnableAutoCommit = "true"
AutoCommitIntervalMs = "1000"
GourpId = "TDengine-TMQ-C#",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
AutoOffsetReset = "earliest"
MsgWithTableName = "true",
TDConnectIp = "127.0.0.1",
TDConnectPort = "6030"
};
var consumer = new ConsumerBuilder(cfg).Build();
``` ```
...@@ -487,28 +532,25 @@ if err != nil { ...@@ -487,28 +532,25 @@ if err != nil {
``` ```
</TabItem> </TabItem>
<TabItem value="Rust" label="Rust">
<TabItem value="C#" label="C#"> ```rust
consumer.subscribe(["tmq_meters"]).await?;
```csharp
// 创建订阅 topics 列表
List<String> topics = new List<string>();
topics.add("tmq_topic");
// 启动订阅
consumer.Subscribe(topics);
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
consumer = TaosConsumer('topic_ctb_column', group_id='vg2') consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
``` ```
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```node ```js
// 创建订阅 topics 列表 // 创建订阅 topics 列表
let topics = ['topic_test'] let topics = ['topic_test']
...@@ -518,6 +560,18 @@ consumer.subscribe(topics); ...@@ -518,6 +560,18 @@ consumer.subscribe(topics);
</TabItem> </TabItem>
<TabItem value="C#" label="C#">
```csharp
// 创建订阅 topics 列表
List<String> topics = new List<string>();
topics.add("tmq_topic");
// 启动订阅
consumer.Subscribe(topics);
```
</TabItem>
</Tabs> </Tabs>
## 消费 ## 消费
...@@ -551,14 +605,6 @@ while(running){ ...@@ -551,14 +605,6 @@ while(running){
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
```python
for msg in consumer:
for row in msg:
print(row)
```
</TabItem>
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
```go ```go
...@@ -575,6 +621,64 @@ for { ...@@ -575,6 +621,64 @@ for {
</TabItem> </TabItem>
<TabItem value="Rust" label="Rust">
```rust
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
```
</TabItem>
<TabItem value="Python" label="Python">
```python
for msg in consumer:
for row in msg:
print(row)
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```js
while(true){
msg = consumer.consume(200);
// process message(consumeResult)
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
}
```
</TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
...@@ -590,20 +694,6 @@ while (true) ...@@ -590,20 +694,6 @@ while (true)
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS">
```node
while(true){
msg = consumer.consume(200);
// process message(consumeResult)
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
}
```
</TabItem>
</Tabs> </Tabs>
## 结束消费 ## 结束消费
...@@ -634,16 +724,6 @@ consumer.close(); ...@@ -634,16 +724,6 @@ consumer.close();
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
```python
/* 取消订阅 */
consumer.unsubscribe();
/* 关闭消费 */
consumer.close();
</TabItem>
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
...@@ -652,26 +732,45 @@ consumer.Close() ...@@ -652,26 +732,45 @@ consumer.Close()
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#">
```csharp <TabItem value="Rust" label="Rust">
// 取消订阅
consumer.Unsubscribe();
// 关闭消费 ```rust
consumer.Close(); consumer.unsubscribe().await;
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
```py
# 取消订阅
consumer.unsubscribe()
# 关闭消费
consumer.close()
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```node ```js
consumer.unsubscribe(); consumer.unsubscribe();
consumer.close(); consumer.close();
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#">
```csharp
// 取消订阅
consumer.Unsubscribe();
// 关闭消费
consumer.Close();
```
</TabItem>
</Tabs> </Tabs>
## 删除 *topic* ## 删除 *topic*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册