--- sidebar_label: 数据订阅 description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。" title: 数据订阅 --- import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; import Java from "./_sub_java.mdx"; import Python from "./_sub_python.mdx"; import Go from "./_sub_go.mdx"; import Rust from "./_sub_rust.mdx"; import Node from "./_sub_node.mdx"; import CSharp from "./_sub_cs.mdx"; import CDemo from "./_sub_c.mdx"; 为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 与 kafka 一样,你需要定义 *topic*, 但 TDengine 的 *topic* 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 `SELECT` 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。 消费者订阅 *topic* 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。 为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。 ## 主要数据结构和 API 不同语言下, TMQ 订阅相关的 API 及数据结构如下: ```c typedef struct tmq_t tmq_t; typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_list_t tmq_list_t; typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param)); DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT void tmq_list_destroy(tmq_list_t *); DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT const char *tmq_err2str(int32_t code); DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); enum tmq_conf_res_t { TMQ_CONF_UNKNOWN = -2, TMQ_CONF_INVALID = -1, TMQ_CONF_OK = 0, }; typedef enum tmq_conf_res_t tmq_conf_res_t; DLL_EXPORT tmq_conf_t *tmq_conf_new(); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); ``` 这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。 ```java void subscribe(Collection topics) throws SQLException; void unsubscribe() throws SQLException; Set subscription() throws SQLException; ConsumerRecords poll(Duration timeout) throws SQLException; void commitAsync(); void commitAsync(OffsetCommitCallback callback); void commitSync() throws SQLException; void close() throws SQLException; ``` ```python class TaosConsumer(): DEFAULT_CONFIG = { 'group.id', 'client.id', 'enable.auto.commit', 'auto.commit.interval.ms', 'auto.offset.reset', 'msg.with.table.name', 'experimental.snapshot.enable', 'enable.heartbeat.background', 'experimental.snapshot.batch.size', 'td.connect.ip', 'td.connect.user', 'td.connect.pass', 'td.connect.port', 'td.connect.db', 'timeout' } def __init__(self, *topics, **configs): self._closed = True self._conf = None self._list = None self._tmq = None keys = list(configs.keys()) for k in keys: configs.update({k.replace('_','.'): configs.pop(k)}) extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: raise TmqError("Unrecognized configs: %s" % (extra_configs,)) self._conf = tmq_conf_new() self._list = tmq_list_new() # set poll timeout if 'timeout' in configs: self._timeout = configs['timeout'] del configs['timeout'] else: self._timeout = 0 # check if group id is set if 'group.id' not in configs: raise TmqError("missing group.id in consumer config setting") for key, value in configs.items(): tmq_conf_set(self._conf, key, value) self._tmq = tmq_consumer_new(self._conf) if not topics: raise TmqError("Unset topic for Consumer") for topic in topics: tmq_list_append(self._list, topic) tmq_subscribe(self._tmq, self._list) def __iter__(self): return self def __next__(self): if not self._tmq: raise StopIteration('TaosConsumer closed') return next(self.sync_next()) def sync_next(self): while 1: res = tmq_consumer_poll(self._tmq, self._timeout) if res: break yield TaosResult(res) def subscription(self): if self._tmq is None: return None return tmq_subscription(self._tmq) def unsubscribe(self): tmq_unsubscribe(self._tmq) def close(self): if self._tmq: tmq_consumer_close(self._tmq) self._tmq = None def __del__(self): if self._conf: tmq_conf_destroy(self._conf) if self._list: tmq_list_destroy(self._list) if self._tmq: tmq_consumer_close(self._tmq) ``` ## 写入数据 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: ```sql DROP DATABASE IF EXISTS tmqdb; CREATE DATABASE tmqdb; CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16)); CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0"); CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1"); INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); ``` ## 创建 *topic* TDengine 使用 SQL 创建一个 topic: ```sql CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; ``` TMQ 支持多种订阅类型: ### 列订阅 语法: ```sql CREATE TOPIC topic_name as subquery ``` 通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是: - 该类型 TOPIC 一旦创建则订阅数据的结构确定。 - 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。 - 若发生表结构变更,新增的列不出现在结果中,若发生列删除则会报错。 ### 超级表订阅 语法: ```sql CREATE TOPIC topic_name AS STABLE stb_name ``` 与 `SELECT * from stbName` 订阅的区别是: - 不会限制用户的表结构变更。 - 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。 - 用户对于要处理的每一个数据块都可能有不同的表结构。 - 返回数据不包含标签。 ### 数据库订阅 语法: ```sql CREATE TOPIC topic_name [WITH META] AS DATABASE db_name; ``` 通过该语句可创建一个包含数据库所有表数据的订阅,`WITH META` 可选择将数据库结构变动信息加入到订阅消息流,TMQ 将消费当前数据库下所有表结构的变动,包括超级表的创建与删除,列添加、删除或修改,子表的创建、删除及 TAG 变动信息等等。消费者可通过 API 来判断具体的消息类型。这一点也是与 Kafka 不同的地方。 ## 创建消费者 *consumer* 消费者需要通过一系列配置选项创建,基础配置项如下表所示: | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | | `td.connect.ip` | string | 用于创建连接,同 `taos_connect` | | | `td.connect.user` | string | 用于创建连接,同 `taos_connect` | | | `td.connect.pass` | string | 用于创建连接,同 `taos_connect` | | `td.connect.port` | integer | 用于创建连接,同 `taos_connect` | | `group.id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | | `client.id` | string | 客户端 ID | 最大长度:192。 | | `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) | | `enable.auto.commit` | boolean | 启用自动提交 | 合法值:`true`, `false`。 | | `auto.commit.interval.ms` | integer | 以毫秒为单位的自动提交时间间隔 | | `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | | | `experimental.snapshot.enable` | boolean | 从 WAL 开始消费,还是从 TSBS 开始消费 | | | `msg.with.table.name` | boolean | 是否允许从消息中解析表名 | 对于不同编程语言,其设置方式如下: ```c /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "experimental.snapshot.enable", "true"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); ``` 对于 Java 程序,使用如下配置项: | 参数名称 | 类型 | 参数说明 | | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 ```java Properties properties = new Properties(); properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("group.id", "cgrpName"); properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); properties.setProperty("td.connect.user", "root"); properties.setProperty("td.connect.pass", "taosdata"); properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("msg.with.table.name", "true"); properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); TaosConsumer consumer = new TaosConsumer<>(properties); /* value deserializer definition. */ import com.taosdata.jdbc.tmq.ReferenceDeserializer; public class MetersDeserializer extends ReferenceDeserializer { } ``` | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | | `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | | `td_connect_user` | string | 用于创建连接,同 `taos_connect` | | | `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | | | `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | | `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | | `client_id` | string | 客户端 ID | 最大长度:192。 | | `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) | | `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 | | `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | | | `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | | `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | | `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 ## 订阅 *topics* 一个 consumer 支持同时订阅多个 topic。 ```c // 创建订阅 topics 列表 tmq_list_t* topicList = tmq_list_new(); tmq_list_append(topicList, "topicName"); // 启动订阅 tmq_subscribe(tmq, topicList); tmq_list_destroy(topicList); ``` ```java List topics = new ArrayList<>(); topics.add("tmq_topic"); consumer.subscribe(topics); ``` ```python consumer = TaosConsumer('topic_ctb_column', group_id='vg2') ``` ## 消费 以下代码展示了不同语言下如何对 TMQ 消息进行消费。 ```c // 消费数据 while (running) { TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); msg_process(msg); } ``` 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。 ```java while(running){ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); for (Meters meter : meters) { processMsg(meter); } } ``` ```python for msg in consumer: for row in msg: print(row) ``` ## 结束消费 消费结束后,应当取消订阅。 ```c /* 取消订阅 */ tmq_unsubscribe(tmq); /* 关闭消费者对象 */ tmq_consumer_close(tmq); ``` ```java /* 取消订阅 */ consumer.unsubscribe(); /* 关闭消费 */ consumer.close(); ``` ```python /* 取消订阅 */ consumer.unsubscribe(); /* 关闭消费 */ consumer.close(); ``` ## 删除 *topic* 如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。 ```sql /* 删除 topic */ DROP TOPIC topic_name; ``` ## 状态查看 1、*topics*:查询已经创建的 topic ```sql SHOW TOPICS; ``` 2、consumers:查询 consumer 的状态及其订阅的 topic ```sql SHOW CONSUMERS; ``` 3、subscriptions:查询 consumer 与 vgroup 之间的分配关系 ```sql SHOW SUBSCRIPTIONS; ``` ## 示例代码 以下是各语言的完整示例代码。 ```c /* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include #include #include #include #include #include "taos.h" static int running = 1; static char dbName[64] = "tmqdb"; static char stbName[64] = "stb"; static char topicName[64] = "topicname"; static int32_t msg_process(TAOS_RES* msg) { char buf[1024]; int32_t rows = 0; const char* topicName = tmq_get_topic_name(msg); const char* dbName = tmq_get_db_name(msg); int32_t vgroupId = tmq_get_vgroup_id(msg); printf("topic: %s\n", topicName); printf("db: %s\n", dbName); printf("vgroup id: %d\n", vgroupId); while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); int32_t* length = taos_fetch_lengths(msg); int32_t precision = taos_result_precision(msg); const char* tbName = tmq_get_table_name(msg); rows++; taos_print_row(buf, row, fields, numOfFields); printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf); } return rows; } static int32_t init_env() { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { return -1; } TAOS_RES* pRes; // drop database if exists printf("create database\n"); pRes = taos_query(pConn, "drop database if exists tmqdb"); if (taos_errno(pRes) != 0) { printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); // create database pRes = taos_query(pConn, "create database tmqdb"); if (taos_errno(pRes) != 0) { printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); // create super table printf("create super table\n"); pRes = taos_query( pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); if (taos_errno(pRes) != 0) { printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); // create sub tables printf("create sub tables\n"); pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); // insert data printf("insert data into sub tables\n"); pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); taos_close(pConn); return 0; } int32_t create_topic() { printf("create topic\n"); TAOS_RES* pRes; TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { return -1; } pRes = taos_query(pConn, "use tmqdb"); if (taos_errno(pRes) != 0) { printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); taos_close(pConn); return 0; } void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param); } tmq_t* build_consumer() { tmq_conf_res_t code; tmq_conf_t* conf = tmq_conf_new(); code = tmq_conf_set(conf, "enable.auto.commit", "true"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "group.id", "cgrpName"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "client.id", "user defined name"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "td.connect.user", "root"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "experimental.snapshot.enable", "true"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "msg.with.table.name", "true"); if (TMQ_CONF_OK != code) return NULL; tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); return tmq; } tmq_list_t* build_topic_list() { tmq_list_t* topicList = tmq_list_new(); int32_t code = tmq_list_append(topicList, "topicname"); if (code) { return NULL; } return topicList; } void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) { int32_t code; if ((code = tmq_subscribe(tmq, topicList))) { fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); return; } int32_t totalRows = 0; int32_t msgCnt = 0; int32_t timeout = 5000; while (running) { TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); if (tmqmsg) { msgCnt++; totalRows += msg_process(tmqmsg); taos_free_result(tmqmsg); /*} else {*/ /*break;*/ } } fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } int main(int argc, char* argv[]) { int32_t code; if (init_env() < 0) { return -1; } if (create_topic() < 0) { return -1; } tmq_t* tmq = build_consumer(); if (NULL == tmq) { fprintf(stderr, "%% build_consumer() fail!\n"); return -1; } tmq_list_t* topic_list = build_topic_list(); if (NULL == topic_list) { return -1; } basic_consume_loop(tmq, topic_list); code = tmq_unsubscribe(tmq); if (code) { fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code)); } else { fprintf(stderr, "%% unsubscribe\n"); } code = tmq_consumer_close(tmq); if (code) { fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); } else { fprintf(stderr, "%% Consumer closed\n"); } return 0; } ``` [查看源码](https://github.com/taosdata/TDengine/blob/develop/examples/c/tmq.c) ```python import taos from taos.tmq import * conn = taos.connect() # create database conn.execute("drop database if exists py_tmq") conn.execute("create database if not exists py_tmq vgroups 2") # create table and stables conn.select_db("py_tmq") conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") conn.execute("create table if not exists tb1 using stb1 tags(1)") conn.execute("create table if not exists tb2 using stb1 tags(2)") conn.execute("create table if not exists tb3 using stb1 tags(3)") # create topic conn.execute("drop topic if exists topic_ctb_column") conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1") # set consumer configure options conf = TaosTmqConf() conf.set("group.id", "tg2") conf.set("td.connect.user", "root") conf.set("td.connect.pass", "taosdata") conf.set("enable.auto.commit", "true") conf.set("msg.with.table.name", "true") def tmq_commit_cb_print(tmq, resp, offset, param=None): print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}") conf.set_auto_commit_cb(tmq_commit_cb_print, None) # build consumer tmq = conf.new_consumer() # build topic list topic_list = TaosTmqList() topic_list.append("topic_ctb_column") # subscribe consumer tmq.subscribe(topic_list) # check subscriptions sub_list = tmq.subscription() print("subscribed topics: ",sub_list) # start subscribe while 1: res = tmq.poll(1000) if res: topic = res.get_topic_name() vg = res.get_vgroup_id() db = res.get_db_name() print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}") for row in res: print(row) tb = res.get_table_name() print(f"from table: {tb}") ``` [查看源码](https://github.com/taosdata/TDengine/blob/develop/docs/examples/python/tmq_example.py)