07-tmq.mdx 24.7 KB
Newer Older
H
Huo Linhe 已提交
1 2 3 4 5 6 7 8 9
---
sidebar_label: 数据订阅
description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
title: 数据订阅
---

import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
10
import JavaWS from "./_sub_java_ws.mdx";
H
Huo Linhe 已提交
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";

为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。

与 kafka 一样,你需要定义 *topic*, 但 TDengine 的 *topic* 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 `SELECT` 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。

消费者订阅 *topic* 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。

为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。

本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。

## 主要数据结构和 API

不同语言下, TMQ 订阅相关的 API 及数据结构如下:

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
typedef struct tmq_t      tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;

typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));

DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t     tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void        tmq_list_destroy(tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(int32_t code);

DLL_EXPORT int32_t   tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT int32_t   tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t   tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t   tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void      tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);

enum tmq_conf_res_t {
  TMQ_CONF_UNKNOWN = -2,
  TMQ_CONF_INVALID = -1,
  TMQ_CONF_OK = 0,
};
typedef enum tmq_conf_res_t tmq_conf_res_t;

DLL_EXPORT tmq_conf_t    *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void           tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void           tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
```

G
gccgdb1234 已提交
68
这些 API 的文档请见 [C/C++ Connector](../../connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
H
Huo Linhe 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91

</TabItem>
<TabItem value="java" label="Java">

```java
void subscribe(Collection<String> topics) throws SQLException;

void unsubscribe() throws SQLException;

Set<String> subscription() throws SQLException;

ConsumerRecords<V> poll(Duration timeout) throws SQLException;

void commitAsync();

void commitAsync(OffsetCommitCallback callback);

void commitSync() throws SQLException;

void close() throws SQLException;
```

</TabItem>
Z
zhaoyanggh 已提交
92

93
<TabItem value="Python" label="Python">
S
Shuaiqiang Chang 已提交
94

Z
zhaoyanggh 已提交
95
```python
96 97 98
class Consumer:
    def subscribe(self, topics):
        pass
Y
Yang Zhao 已提交
99

100 101
    def unsubscribe(self):
        pass
Y
Yang Zhao 已提交
102

103 104
    def poll(self, timeout: float = 1.0):
        pass
Y
Yang Zhao 已提交
105

106 107
    def close(self):
        pass
Y
Yang Zhao 已提交
108

109 110
    def commit(self, message):
        pass
Z
zhaoyanggh 已提交
111
```
S
Shuaiqiang Chang 已提交
112

113
</TabItem>
S
Shuaiqiang Chang 已提交
114

115 116 117
<TabItem label="Go" value="Go">

```go
118
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
119

120 121
// 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
122

123 124
// 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
125

126
func (c *Consumer) Poll(timeoutMs int) tmq.Event
127

128 129
// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
130 131

func (c *Consumer) Unsubscribe() error
132 133

func (c *Consumer) Close() error
134
```
S
Shuaiqiang Chang 已提交
135

136 137
</TabItem>

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
<TabItem label="Rust" value="Rust">

```rust
impl TBuilder for TmqBuilder
  fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
  fn build(&self) -> Result<Self::Target, Self::Error>

impl AsAsyncConsumer for Consumer
  async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
        &mut self,
        topics: I,
    ) -> Result<(), Self::Error>;
  fn stream(
        &self,
    ) -> Pin<
        Box<
            dyn '_
                + Send
                + futures::Stream<
                    Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
                >,
        >,
    >;
  async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;

  async fn unsubscribe(self);
```

可在 <https://docs.rs/taos> 上查看详细 API 说明。

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
function TMQConsumer(config)

function subscribe(topic)

function consume(timeout)

function subscription()

function unsubscribe()

function commit(msg)

function close()
```

</TabItem>

190 191
<TabItem value="C#" label="C#">

X
xleili 已提交
192
```csharp
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)

virtual IConsumer Build()

Consumer(ConsumerBuilder builder)

void Subscribe(IEnumerable<string> topics)

void Subscribe(string topic) 

ConsumeResult Consume(int millisecondsTimeout)

List<string> Subscription()

void Unsubscribe()
 
void Commit(ConsumeResult consumerResult)

void Close()
212
```
S
Shuaiqiang Chang 已提交
213

H
Huo Linhe 已提交
214 215 216 217 218 219 220 221 222 223
</TabItem>
</Tabs>

## 写入数据

首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:

```sql
DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb;
Y
Yiqing Liu 已提交
224
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
H
Huo Linhe 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");       
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
```

## 创建 *topic*

TDengine 使用 SQL 创建一个 topic:

```sql
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```

TMQ 支持多种订阅类型:

### 列订阅

语法:

```sql
CREATE TOPIC topic_name as subquery
```

通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:

- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。
L
Liu Jicong 已提交
253
- 若发生表结构变更,新增的列不出现在结果中。
H
Huo Linhe 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274

### 超级表订阅

语法:

```sql
CREATE TOPIC topic_name AS STABLE stb_name
```

与 `SELECT * from stbName` 订阅的区别是:

- 不会限制用户的表结构变更。
- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
- 用户对于要处理的每一个数据块都可能有不同的表结构。
- 返回数据不包含标签。

### 数据库订阅

语法:

```sql
L
Liu Jicong 已提交
275
CREATE TOPIC topic_name AS DATABASE db_name;
H
Huo Linhe 已提交
276 277
```

L
Liu Jicong 已提交
278
通过该语句可创建一个包含数据库所有表数据的订阅
H
Huo Linhe 已提交
279 280 281 282 283 284 285

## 创建消费者 *consumer*

消费者需要通过一系列配置选项创建,基础配置项如下表所示:

|            参数名称            |  类型   | 参数说明                                                 | 备注                                        |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
G
gccgdb1234 已提交
286 287 288 289
|        `td.connect.ip`         | string  | 用于创建连接,同 `taos_connect`                          |     仅用于使用 C 语言 API 建立原生连接                                        |
|       `td.connect.user`        | string  | 用于创建连接,同 `taos_connect`                          |   仅用于使用 C 语言 API 建立原生连接                                          |
|       `td.connect.pass`        | string  | 用于创建连接,同 `taos_connect`                          |  仅用于使用 C 语言 API 建立原生连接                                           |
|       `td.connect.port`        | integer | 用于创建连接,同 `taos_connect`                          |    仅用于使用 C 语言 API 建立原生连接                                         |
H
Huo Linhe 已提交
290 291
|           `group.id`           | string  | 消费组 ID,同一消费组共享消费进度                        | **必填项**。最大长度:192。                 |
|          `client.id`           | string  | 客户端 ID                                                | 最大长度:192。                             |
S
songshuqi 已提交
292
|      `auto.offset.reset`       |  enum   | 消费组订阅的初始位置                                     | <br />`earliest`: default;从头开始订阅; <br/>`latest`: 仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
L
Liu Jicong 已提交
293
|      `enable.auto.commit`      | boolean | 是否启用消费位点自动提交                                 | 合法值:`true`, `false`。                   |
S
songshuqi 已提交
294
|   `auto.commit.interval.ms`    | integer | 以毫秒为单位的消费记录自动提交消费位点时间间隔             | 默认 5000 m                                 |
G
gccgdb1234 已提交
295 296
| `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据。当其关闭时,只能消费依据 WAL 保留策略仍然在WAL中的数据;当其打开时,除WAL中的数据以外,也能够消费已经从WAL中删除但落盘到TSDB中的数据                              | 实验功能,默认关闭                          |
|     `msg.with.table.name`      | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)               |默认关闭 |
H
Huo Linhe 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323

对于不同编程语言,其设置方式如下:

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
   自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);

tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
```

</TabItem>
<TabItem value="java" label="Java">

H
Huo Linhe 已提交
324
对于 Java 程序,使用如下配置项:
H
Huo Linhe 已提交
325

326 327 328 329 330
| 参数名称                      | 类型   | 参数说明                                                                                                                      |
| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
| `bootstrap.servers`           | string | 连接地址,如 `localhost:6030`                                                                                                 |
| `value.deserializer`          | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
| `value.deserializer.encoding` | string | 指定字符串解析的字符集                                                                                                        |  |
H
Huo Linhe 已提交
331

H
Huo Linhe 已提交
332
需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。
H
Huo Linhe 已提交
333 334 335 336 337 338 339 340 341 342 343

```java
Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("msg.with.table.name", "true");
H
huolibo 已提交
344
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
H
Huo Linhe 已提交
345 346

TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);
H
Huo Linhe 已提交
347 348 349 350 351 352

/* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer;

public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
H
Huo Linhe 已提交
353 354
```

355
</TabItem>
Z
zhaoyanggh 已提交
356

T
t_max 已提交
357 358 359
<TabItem label="Go" value="Go">

```go
360 361 362 363 364 365 366 367 368 369 370 371
conf := &tmq.ConfigMap{
 "group.id":                     "test",
 "auto.offset.reset":            "earliest",
 "td.connect.ip":                "127.0.0.1",
 "td.connect.user":              "root",
 "td.connect.pass":              "taosdata",
 "td.connect.port":              "6030",
 "client.id":                    "test_tmq_c",
 "enable.auto.commit":           "false",
 "enable.heartbeat.background":  "true",
 "experimental.snapshot.enable": "true",
 "msg.with.table.name":          "true",
T
t_max 已提交
372
}
373
consumer, err := NewConsumer(conf)
374 375
```

H
Huo Linhe 已提交
376
</TabItem>
377

378
<TabItem label="Rust" value="Rust">
379

380 381 382 383 384
```rust
let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1");
dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "earliest");
385

386
let tmq = TmqBuilder::from_dsn(dsn)?;
387

388
let mut consumer = tmq.build()?;
389
```
S
Shuaiqiang Chang 已提交
390

391 392
</TabItem>

393 394
<TabItem value="Python" label="Python">

395
Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
396 397

```python
398
from taos.tmq import Consumer
399

400
# Syntax: `consumer = Consumer(configs)`
401 402
#
# Example:
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```

其中,`configs` 为 dict 类型,传递创建 Consumer 的参数。可以配置的参数有:

| 参数名称 | 类型 | 参数说明 | 备注 |
|:------:|:----:|:-------:|:---:|
| `td.connect.ip` | string | 用于创建连接||
| `td.connect.user` | string | 用于创建连接||
| `td.connect.pass` | string | 用于创建连接||
| `td.connect.port` | string | 用于创建连接||
| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192 |
| `client.id` | string | 客户端 ID | 最大长度:192 |
| `msg.with.table.name` | string | 是否允许从消息中解析表名,不适用于列订阅 | 合法值:`true`, `false` |
| `enable.auto.commit` | string | 启用自动提交 | 合法值:`true`, `false` |
| `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms |
| `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
| `enable.heartbeat.background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
422 423 424

</TabItem>

X
xleili 已提交
425 426
<TabItem label="Node.JS" value="Node.JS">

427
```js
X
xleili 已提交
428 429 430 431 432 433 434 435 436 437 438 439 440 441
// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 

let consumer = taos.consumer({
  'enable.auto.commit': 'true',
  'auto.commit.interval.ms','1000',
  'group.id': 'tg2',
  'td.connect.user': 'root',
  'td.connect.pass': 'taosdata',
  'auto.offset.reset','earliest',
  'msg.with.table.name': 'true',
  'td.connect.ip','127.0.0.1',
  'td.connect.port','6030'  
  });
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
using TDengineTMQ;

// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、
// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数
var cfg = new ConsumerConfig
 {
    EnableAutoCommit = "true"
    AutoCommitIntervalMs = "1000"
    GourpId = "TDengine-TMQ-C#",
    TDConnectUser = "root",
    TDConnectPasswd = "taosdata",
    AutoOffsetReset = "earliest"
    MsgWithTableName = "true",
    TDConnectIp = "127.0.0.1",
    TDConnectPort = "6030"
 };

var consumer = new ConsumerBuilder(cfg).Build();
X
xleili 已提交
467 468 469 470 471

```

</TabItem>

H
Huo Linhe 已提交
472 473 474 475 476 477 478 479
</Tabs>

上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。

## 订阅 *topics*

一个 consumer 支持同时订阅多个 topic。

H
Huo Linhe 已提交
480 481 482
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

H
Huo Linhe 已提交
483 484 485 486 487 488 489 490 491 492
```c
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
  
```

H
Huo Linhe 已提交
493 494 495 496 497 498 499 500 501
</TabItem>
<TabItem value="java" label="Java">

```java
List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
```

T
t_max 已提交
502 503 504 505
</TabItem>
<TabItem value="Go" label="Go">

```go
506
err = consumer.Subscribe("example_tmq_topic", nil)
T
t_max 已提交
507 508 509 510 511
if err != nil {
 panic(err)
}
```

H
Huo Linhe 已提交
512
</TabItem>
513
<TabItem value="Rust" label="Rust">
H
Huo Linhe 已提交
514

515 516
```rust
consumer.subscribe(["tmq_meters"]).await?;
517 518 519 520
```

</TabItem>

521
<TabItem value="Python" label="Python">
522

Z
zhaoyanggh 已提交
523
```python
524
consumer.subscribe(['topic1', 'topic2'])
Z
zhaoyanggh 已提交
525
```
526

Z
zhaoyanggh 已提交
527 528
</TabItem>

X
xleili 已提交
529 530
<TabItem label="Node.JS" value="Node.JS">

531
```js
X
xleili 已提交
532 533 534 535 536 537 538 539 540
// 创建订阅 topics 列表
let topics = ['topic_test']

// 启动订阅
consumer.subscribe(topics);
```

</TabItem>

541 542 543 544 545 546 547 548 549 550 551 552
<TabItem value="C#" label="C#">

```csharp
// 创建订阅 topics 列表
List<String> topics = new List<string>();
topics.add("tmq_topic");
// 启动订阅
consumer.Subscribe(topics);
```

</TabItem>

H
Huo Linhe 已提交
553 554
</Tabs>

H
Huo Linhe 已提交
555 556
## 消费

H
Huo Linhe 已提交
557 558
以下代码展示了不同语言下如何对 TMQ 消息进行消费。

H
Huo Linhe 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
// 消费数据
while (running) {
  TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
  msg_process(msg);
}  
```

这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。

</TabItem>
<TabItem value="java" label="Java">

```java
while(running){
  ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
    for (Meters meter : meters) {
      processMsg(meter);
    }    
}
```

T
t_max 已提交
584
</TabItem>
Z
zhaoyanggh 已提交
585

T
t_max 已提交
586 587 588 589
<TabItem value="Go" label="Go">

```go
for {
590 591 592 593 594 595 596 597 598 599
 ev := consumer.Poll(0)
 if ev != nil {
  switch e := ev.(type) {
  case *tmqcommon.DataMessage:
   fmt.Println(e.Value())
  case tmqcommon.Error:
   fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
   panic(e)
  }
  consumer.Commit()
T
t_max 已提交
600 601 602 603
 }
}
```

H
Huo Linhe 已提交
604 605
</TabItem>

606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
<TabItem value="Rust" label="Rust">

```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
            }
        }
        consumer.commit(offset).await?;
    }
}
```

</TabItem>
<TabItem value="Python" label="Python">

```python
643 644 645 646 647 648 649 650 651 652 653
while True:
    res = consumer.poll(100)
    if not res:
        continue
    err = res.error()
    if err is not None:
        raise err
    val = res.value()

    for block in val:
        print(block.fetchall())
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671
```

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
while(true){
  msg = consumer.consume(200);
  // process message(consumeResult)
  console.log(msg.topicPartition);
  console.log(msg.block);
  console.log(msg.fields)
}
```

</TabItem>

672 673
<TabItem value="C#" label="C#">

X
xleili 已提交
674
```csharp
675 676 677 678 679 680 681 682 683 684 685
// 消费数据
while (true)
{
    var consumerRes = consumer.Consume(100);
    // process ConsumeResult
    ProcessMsg(consumerRes);
    consumer.Commit(consumerRes);
}
```

</TabItem>
X
xleili 已提交
686

687 688
</Tabs>

H
Huo Linhe 已提交
689 690
## 结束消费

H
Huo Linhe 已提交
691 692
消费结束后,应当取消订阅。

H
Huo Linhe 已提交
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
/* 取消订阅 */
tmq_unsubscribe(tmq);

/* 关闭消费者对象 */
tmq_consumer_close(tmq);
```

</TabItem>
<TabItem value="java" label="Java">

```java
/* 取消订阅 */
consumer.unsubscribe();

/* 关闭消费 */
consumer.close();
```

715
</TabItem>
Z
zhaoyanggh 已提交
716

717 718 719
<TabItem value="Go" label="Go">

```go
720 721 722 723 724
/* Unsubscribe */
_ = consumer.Unsubscribe()

/* Close consumer */
_ = consumer.Close()
725 726 727
```

</TabItem>
728

729
<TabItem value="Rust" label="Rust">
730

731 732
```rust
consumer.unsubscribe().await;
S
Shuaiqiang Chang 已提交
733
```
734

735 736
</TabItem>

737 738 739 740 741 742 743 744 745 746
<TabItem value="Python" label="Python">

```py
# 取消订阅
consumer.unsubscribe()
# 关闭消费
consumer.close()
```

</TabItem>
X
xleili 已提交
747 748
<TabItem label="Node.JS" value="Node.JS">

749
```js
X
xleili 已提交
750 751 752 753 754 755
consumer.unsubscribe();
consumer.close();
```

</TabItem>

756 757 758 759 760 761 762 763 764
<TabItem value="C#" label="C#">

```csharp
// 取消订阅
consumer.Unsubscribe();

// 关闭消费
consumer.Close();
```
S
ShuaiQChang 已提交
765

766 767
</TabItem>

H
Huo Linhe 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804
</Tabs>

## 删除 *topic*

如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。

```sql
/* 删除 topic */
DROP TOPIC topic_name;
```

## 状态查看

1、*topics*:查询已经创建的 topic

```sql
SHOW TOPICS;
```

2、consumers:查询 consumer 的状态及其订阅的 topic

```sql
SHOW CONSUMERS;
```

3、subscriptions:查询 consumer 与 vgroup 之间的分配关系

```sql
SHOW SUBSCRIPTIONS;
```

## 示例代码

以下是各语言的完整示例代码。

<Tabs defaultValue="java" groupId="lang">

805
<TabItem label="C" value="c">
S
ShuaiQChang 已提交
806
  <CDemo />
H
Huo Linhe 已提交
807 808 809
</TabItem>

<TabItem label="Java" value="java">
810 811 812 813 814 815 816 817
<Tabs defaultValue="native">
<TabItem value="native" label="本地连接">
<Java />
</TabItem>
<TabItem value="ws" label="WebSocket 连接">
<JavaWS />
</TabItem>
</Tabs>
H
Huo Linhe 已提交
818 819 820 821 822 823 824 825 826 827 828
</TabItem>

<TabItem label="Go" value="Go">
   <Go/>
</TabItem>

<TabItem label="Rust" value="Rust">
    <Rust />
</TabItem>

<TabItem label="Python" value="Python">
Y
Yang Zhao 已提交
829
    <Python />
H
Huo Linhe 已提交
830 831 832 833 834 835 836 837 838 839 840
</TabItem>

<TabItem label="Node.JS" value="Node.JS">
   <Node/>
</TabItem>

<TabItem label="C#" value="C#">
   <CSharp/>
</TabItem>

</Tabs>