07-tmq.mdx 25.3 KB
Newer Older
1 2 3 4 5 6 7 8 9
---
sidebar_label: Data Subscription
description: "The TDengine data subscription service automatically pushes data written in TDengine to subscribing clients."
title: Data Subscription
---

import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
10
import JavaWS from "./_sub_java_ws.mdx"
11 12 13 14 15 16 17 18 19
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";

TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka.

20
To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications.
21 22 23 24 25

By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart.

To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.

26
Tips: Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter `WAL_RETENTION_PERIOD` or `WAL_RETENTION_SIZE` when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97

## Data Schema and API

The related schemas and APIs in various languages are described as follows:

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
typedef struct tmq_t      tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;

typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));

DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t     tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void        tmq_list_destroy(tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(int32_t code);

DLL_EXPORT int32_t   tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT int32_t   tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t   tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t   tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void      tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);

enum tmq_conf_res_t {
  TMQ_CONF_UNKNOWN = -2,
  TMQ_CONF_INVALID = -1,
  TMQ_CONF_OK = 0,
};
typedef enum tmq_conf_res_t tmq_conf_res_t;

DLL_EXPORT tmq_conf_t    *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void           tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void           tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
```

For more information, see [C/C++ Connector](/reference/connector/cpp).

The following example is based on the smart meter table described in Data Models. For complete sample code, see the C language section below.

</TabItem>
<TabItem value="java" label="Java">

```java
void subscribe(Collection<String> topics) throws SQLException;

void unsubscribe() throws SQLException;

Set<String> subscription() throws SQLException;

ConsumerRecords<V> poll(Duration timeout) throws SQLException;

void commitAsync();

void commitAsync(OffsetCommitCallback callback);

void commitSync() throws SQLException;

void close() throws SQLException;
```

</TabItem>

<TabItem value="Python" label="Python">

```python
98 99 100
class Consumer:
    def subscribe(self, topics):
        pass
101

102 103
    def unsubscribe(self):
        pass
104

105 106
    def poll(self, timeout: float = 1.0):
        pass
107

108 109
    def close(self):
        pass
110

111 112
    def commit(self, message):
        pass
113 114 115 116 117 118 119
```

</TabItem>

<TabItem label="Go" value="Go">

```go
120
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
121

122 123
// rebalanceCb is reserved for compatibility purpose
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
124

125 126
// rebalanceCb is reserved for compatibility purpose
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
127

128
func (c *Consumer) Poll(timeoutMs int) tmq.Event
129

130 131
// tmq.TopicPartition is reserved for compatibility purpose
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
132 133

func (c *Consumer) Unsubscribe() error
134 135

func (c *Consumer) Close() error
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
```

</TabItem>

<TabItem label="Rust" value="Rust">

```rust
impl TBuilder for TmqBuilder
  fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
  fn build(&self) -> Result<Self::Target, Self::Error>

impl AsAsyncConsumer for Consumer
  async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
        &mut self,
        topics: I,
    ) -> Result<(), Self::Error>;
  fn stream(
        &self,
    ) -> Pin<
        Box<
            dyn '_
                + Send
                + futures::Stream<
                    Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
                >,
        >,
    >;
  async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;

  async fn unsubscribe(self);
```

For more information, see [Crate taos](https://docs.rs/taos).

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
function TMQConsumer(config)

function subscribe(topic)

function consume(timeout)

function subscription()

function unsubscribe()

function commit(msg)

function close()
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)

virtual IConsumer Build()

Consumer(ConsumerBuilder builder)

void Subscribe(IEnumerable<string> topics)

void Subscribe(string topic) 

ConsumeResult Consume(int millisecondsTimeout)

List<string> Subscription()

void Unsubscribe()
 
void Commit(ConsumeResult consumerResult)

void Close()
```

</TabItem>
</Tabs>

## Insert Data into TDengine

A database including one supertable and two subtables is created as follows:

```sql
DROP DATABASE IF EXISTS tmqdb;
225
CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600;
W
WANG Xu 已提交
226
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");       
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
```

## Create a Topic

The following SQL statement creates a topic in TDengine:

```sql
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```

Multiple subscription types are supported.

#### Subscribe to a Column

Syntax:

```sql
CREATE TOPIC topic_name as subquery
```

You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as `SELECT *` and `SELECT ts, cl` are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note:

- The schema of topics created in this manner is determined by the subscribed data.
- You cannot modify (`ALTER <table> MODIFY`) or delete (`ALTER <table> DROP`) columns or tags that are used in a subscription or calculation.
- Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error.

### Subscribe to a Supertable

Syntax:

```sql
CREATE TOPIC topic_name AS STABLE stb_name
```

Creating a topic in this manner differs from a `SELECT * from stbName` statement as follows:

- The table schema can be modified.
- Unstructured data is returned. The format of the data returned changes based on the supertable schema.
- A different table schema may exist for every data block to be processed.
- The data returned does not include tags.

### Subscribe to a Database

Syntax:

```sql
CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
```

This SQL statement creates a subscription to all tables in the database. You can add the `WITH META` parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka.

## Create a Consumer

You configure the following parameters when creating a consumer:

|            Parameter            |  Type   | Description                                                 | Remarks                                        |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
G
gccgdb1234 已提交
288 289 290 291
|        `td.connect.ip`         | string  | Used in establishing a connection; same as `taos_connect`                          |  Only valid for establishing native connection                                           |
|        `td.connect.user`         | string  | Used in establishing a connection; same as `taos_connect`                          |   Only valid for establishing native connection                                          |
|        `td.connect.pass`         | string  | Used in establishing a connection; same as `taos_connect`                          |     Only valid for establishing native connection                                        |
|        `td.connect.port`         | string  | Used in establishing a connection; same as `taos_connect`                          |       Only valid for establishing native connection                                      |
292 293 294
|           `group.id`           | string  | Consumer group ID; consumers with the same ID are in the same group                        | **Required**. Maximum length: 192.                 |
|          `client.id`           | string  | Client ID                                                | Maximum length: 192.                             |
|      `auto.offset.reset`       |  enum   | Initial offset for the consumer group                                     | Specify `earliest`, `latest`, or `none`(default) |
G
gccgdb1234 已提交
295
|      `enable.auto.commit`      | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself                                           | Default value is true                  |
296
|   `auto.commit.interval.ms`    | integer | Interval for automatic commits, in milliseconds                           |
G
gccgdb1234 已提交
297
|     `msg.with.table.name`      | boolean | Specify whether to deserialize table names from messages                                 | default value: false
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359

The method of specifying these parameters depends on the language used:

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
/* Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
   an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) */
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);

tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
```

</TabItem>
<TabItem value="java" label="Java">

Java programs use the following parameters:

|            Parameter            |  Type   | Description                                                 | Remarks                                        |
| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
| `bootstrap.servers`           | string |Connection address, such as `localhost:6030`                                                                                                 |
| `value.deserializer`          | string | Value deserializer; to use this method, implement the `com.taosdata.jdbc.tmq.Deserializer` interface or inherit the `com.taosdata.jdbc.tmq.ReferenceDeserializer` type |
| `value.deserializer.encoding` | string | Specify the encoding for string deserialization                                                                                                        |  |

Note: The `bootstrap.servers` parameter is used instead of `td.connect.ip` and `td.connect.port` to provide an interface that is consistent with Kafka.

```java
Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");

TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);

/* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer;

public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
```

</TabItem>

<TabItem label="Go" value="Go">

```go
360 361 362 363 364 365 366 367 368 369
conf := &tmq.ConfigMap{
 "group.id":                     "test",
 "auto.offset.reset":            "earliest",
 "td.connect.ip":                "127.0.0.1",
 "td.connect.user":              "root",
 "td.connect.pass":              "taosdata",
 "td.connect.port":              "6030",
 "client.id":                    "test_tmq_c",
 "enable.auto.commit":           "false",
 "msg.with.table.name":          "true",
370
}
371
consumer, err := NewConsumer(conf)
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
```

</TabItem>

<TabItem label="Rust" value="Rust">

```rust
let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1");
dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "earliest");

let tmq = TmqBuilder::from_dsn(dsn)?;

let mut consumer = tmq.build()?;
```

</TabItem>

<TabItem value="Python" label="Python">

393 394 395 396 397 398 399 400 401
```python
from taos.tmq import Consumer

# Syntax: `consumer = Consumer(configs)`
#
# Example:
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```

402 403
Python programs use the following parameters:

404 405 406 407 408 409 410 411 412 413 414 415 416
| Parameter | Type | Description | Remarks |
|:---------:|:----:|:-----------:|:-------:|
| `td.connect.ip` | string | Used in establishing a connection||
| `td.connect.user` | string | Used in establishing a connection||
| `td.connect.pass` | string | Used in establishing a connection||
| `td.connect.port` | string | Used in establishing a connection||
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192 |
| `client.id` | string | Client ID | Maximum length: 192 |
| `msg.with.table.name` | string | Specify whether to deserialize table names from messages | pecify `true` or `false` |
| `enable.auto.commit` | string | Commit automatically | pecify `true` or `false` |
| `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | |
| `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` |
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
// Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
// an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) 

let consumer = taos.consumer({
  'enable.auto.commit': 'true',
  'auto.commit.interval.ms','1000',
  'group.id': 'tg2',
  'td.connect.user': 'root',
  'td.connect.pass': 'taosdata',
  'auto.offset.reset','earliest',
  'msg.with.table.name': 'true',
  'td.connect.ip','127.0.0.1',
  'td.connect.port','6030'  
  });
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
using TDengineTMQ;

// Create consumer groups on demand (GourpID) and enable automatic commits (EnableAutoCommit),
// an automatic commit interval (AutoCommitIntervalMs), and a username (TDConnectUser) and password (TDConnectPasswd)
var cfg = new ConsumerConfig
 {
    EnableAutoCommit = "true"
    AutoCommitIntervalMs = "1000"
    GourpId = "TDengine-TMQ-C#",
    TDConnectUser = "root",
    TDConnectPasswd = "taosdata",
    AutoOffsetReset = "earliest"
    MsgWithTableName = "true",
    TDConnectIp = "127.0.0.1",
    TDConnectPort = "6030"
 };

var consumer = new ConsumerBuilder(cfg).Build();

```

</TabItem>

</Tabs>

A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.

## Subscribe to a Topic

A single consumer can subscribe to multiple topics.

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
// Create a list of subscribed topics
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// Enable subscription
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
  
```

</TabItem>
<TabItem value="java" label="Java">

```java
List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
```

</TabItem>
<TabItem value="Go" label="Go">

```go
501
err = consumer.Subscribe("example_tmq_topic", nil)
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
if err != nil {
 panic(err)
}
```

</TabItem>
<TabItem value="Rust" label="Rust">

```rust
consumer.subscribe(["tmq_meters"]).await?;
```

</TabItem>

<TabItem value="Python" label="Python">

```python
519
consumer.subscribe(['topic1', 'topic2'])
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584
```

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
// Create a list of subscribed topics
let topics = ['topic_test']

// Enable subscription
consumer.subscribe(topics);
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
// Create a list of subscribed topics
List<String> topics = new List<string>();
topics.add("tmq_topic");
// Enable subscription
consumer.Subscribe(topics);
```

</TabItem>

</Tabs>

## Consume messages

The following code demonstrates how to consume the messages in a queue.

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
## Consume data
while (running) {
  TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
  msg_process(msg);
}  
```

The `while` loop obtains a message each time it calls `tmq_consumer_poll()`. This message is exactly the same as the result returned by a query, and the same deserialization API can be used on it.

</TabItem>
<TabItem value="java" label="Java">

```java
while(running){
  ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
    for (Meters meter : meters) {
      processMsg(meter);
    }    
}
```

</TabItem>

<TabItem value="Go" label="Go">

```go
for {
585 586 587 588 589 590 591 592 593 594
 ev := consumer.Poll(0)
 if ev != nil {
  switch e := ev.(type) {
  case *tmqcommon.DataMessage:
   fmt.Println(e.Value())
  case tmqcommon.Error:
   fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
   panic(e)
  }
  consumer.Commit()
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
 }
}
```

</TabItem>

<TabItem value="Rust" label="Rust">

```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
            }
        }
        consumer.commit(offset).await?;
    }
}
```

</TabItem>
<TabItem value="Python" label="Python">

```python
638 639 640 641 642 643 644 645 646 647 648
while True:
    res = consumer.poll(100)
    if not res:
        continue
    err = res.error()
    if err is not None:
        raise err
    val = res.value()

    for block in val:
        print(block.fetchall())
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
```

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
while(true){
  msg = consumer.consume(200);
  // process message(consumeResult)
  console.log(msg.topicPartition);
  console.log(msg.block);
  console.log(msg.fields)
}
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
## Consume data
while (true)
{
    var consumerRes = consumer.Consume(100);
    // process ConsumeResult
    ProcessMsg(consumerRes);
    consumer.Commit(consumerRes);
}
```

</TabItem>

</Tabs>

## Close the consumer

After message consumption is finished, the consumer is unsubscribed.

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
/* Unsubscribe */
tmq_unsubscribe(tmq);

/* Close consumer object */
tmq_consumer_close(tmq);
```

</TabItem>
<TabItem value="java" label="Java">

```java
/* Unsubscribe */
consumer.unsubscribe();

/* Close consumer */
consumer.close();
```

</TabItem>

<TabItem value="Go" label="Go">

```go
715 716 717 718 719
/* Unsubscribe */
_ = consumer.Unsubscribe()

/* Close consumer */
_ = consumer.Close()
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804
```

</TabItem>

<TabItem value="Rust" label="Rust">

```rust
consumer.unsubscribe().await;
```

</TabItem>

<TabItem value="Python" label="Python">

```py
# Unsubscribe
consumer.unsubscribe()
# Close consumer
consumer.close()
```

</TabItem>
<TabItem label="Node.JS" value="Node.JS">

```js
consumer.unsubscribe();
consumer.close();
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
// Unsubscribe
consumer.Unsubscribe();

// Close consumer
consumer.Close();
```

</TabItem>

</Tabs>

## Delete a Topic

You can delete topics that are no longer useful. Note that you must unsubscribe all consumers from a topic before deleting it.

```sql
/* Delete topic/
DROP TOPIC topic_name;
```

## Check Status

1. Query all existing topics.

```sql
SHOW TOPICS;
```

2. Query the status and subscribed topics of all consumers.

```sql
SHOW CONSUMERS;
```

3. Query the relationships between consumers and vgroups.

```sql
SHOW SUBSCRIPTIONS;
```

## Examples

The following section shows sample code in various languages.

<Tabs defaultValue="java" groupId="lang">

<TabItem label="C" value="c">
  <CDemo />
</TabItem>

<TabItem label="Java" value="java">
805 806 807 808 809 810 811 812
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<Java />
</TabItem>
<TabItem value="ws" label="WebSocket connection">
<JavaWS />
</TabItem>
</Tabs>
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835
</TabItem>

<TabItem label="Go" value="Go">
   <Go/>
</TabItem>

<TabItem label="Rust" value="Rust">
    <Rust />
</TabItem>

<TabItem label="Python" value="Python">
    <Python />
</TabItem>

<TabItem label="Node.JS" value="Node.JS">
   <Node/>
</TabItem>

<TabItem label="C#" value="C#">
   <CSharp/>
</TabItem>

</Tabs>