--- sidebar_label: 数据订阅 description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。" title: 数据订阅 --- import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; import Java from "./_sub_java.mdx"; import JavaWS from "./_sub_java_ws.mdx"; import Python from "./_sub_python.mdx"; import Go from "./_sub_go.mdx"; import Rust from "./_sub_rust.mdx"; import Node from "./_sub_node.mdx"; import CSharp from "./_sub_cs.mdx"; import CDemo from "./_sub_c.mdx"; 为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 与 kafka 一样,你需要定义 *topic*, 但 TDengine 的 *topic* 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 `SELECT` 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。 消费者订阅 *topic* 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。 为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。 注意:数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似。 ## 主要数据结构和 API 不同语言下, TMQ 订阅相关的 API 及数据结构如下: ```c typedef struct tmq_t tmq_t; typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_list_t tmq_list_t; typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param)); DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT void tmq_list_destroy(tmq_list_t *); DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT const char *tmq_err2str(int32_t code); DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); enum tmq_conf_res_t { TMQ_CONF_UNKNOWN = -2, TMQ_CONF_INVALID = -1, TMQ_CONF_OK = 0, }; typedef enum tmq_conf_res_t tmq_conf_res_t; DLL_EXPORT tmq_conf_t *tmq_conf_new(); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); ``` 这些 API 的文档请见 [C/C++ Connector](../../connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。 ```java void subscribe(Collection topics) throws SQLException; void unsubscribe() throws SQLException; Set subscription() throws SQLException; ConsumerRecords poll(Duration timeout) throws SQLException; void commitAsync(); void commitAsync(OffsetCommitCallback callback); void commitSync() throws SQLException; void close() throws SQLException; ``` ```python class Consumer: def subscribe(self, topics): pass def unsubscribe(self): pass def poll(self, timeout: float = 1.0): pass def assignment(self): pass def seek(self, partition): pass def close(self): pass def commit(self, message): pass ``` ```go func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) // 出于兼容目的保留 rebalanceCb 参数,当前未使用 func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error // 出于兼容目的保留 rebalanceCb 参数,当前未使用 func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error func (c *Consumer) Poll(timeoutMs int) tmq.Event // 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用 func (c *Consumer) Commit() ([]tmq.TopicPartition, error) func (c *Consumer) Unsubscribe() error func (c *Consumer) Close() error ``` ```rust impl TBuilder for TmqBuilder fn from_dsn(dsn: D) -> Result fn build(&self) -> Result impl AsAsyncConsumer for Consumer async fn subscribe, I: IntoIterator + Send>( &mut self, topics: I, ) -> Result<(), Self::Error>; fn stream( &self, ) -> Pin< Box< dyn '_ + Send + futures::Stream< Item = Result<(Self::Offset, MessageSet), Self::Error>, >, >, >; async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; async fn unsubscribe(self); ``` 可在 上查看详细 API 说明。 ```js function TMQConsumer(config) function subscribe(topic) function consume(timeout) function subscription() function unsubscribe() function commit(msg) function close() ``` ```csharp ConsumerBuilder(IEnumerable> config) virtual IConsumer Build() Consumer(ConsumerBuilder builder) void Subscribe(IEnumerable topics) void Subscribe(string topic) ConsumeResult Consume(int millisecondsTimeout) List Subscription() void Unsubscribe() void Commit(ConsumeResult consumerResult) void Close() ``` ## 写入数据 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: ```sql DROP DATABASE IF EXISTS tmqdb; CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600; CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16)); CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0"); CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1"); INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); ``` ## 创建 *topic* TDengine 使用 SQL 创建一个 topic: ```sql CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; ``` TMQ 支持多种订阅类型: ### 列订阅 语法: ```sql CREATE TOPIC topic_name as subquery ``` 通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是: - 该类型 TOPIC 一旦创建则订阅数据的结构确定。 - 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。 - 若发生表结构变更,新增的列不出现在结果中。 ### 超级表订阅 语法: ```sql CREATE TOPIC topic_name AS STABLE stb_name ``` 与 `SELECT * from stbName` 订阅的区别是: - 不会限制用户的表结构变更。 - 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。 - 用户对于要处理的每一个数据块都可能有不同的表结构。 - 返回数据不包含标签。 ### 数据库订阅 语法: ```sql CREATE TOPIC topic_name AS DATABASE db_name; ``` 通过该语句可创建一个包含数据库所有表数据的订阅 ## 创建消费者 *consumer* 消费者需要通过一系列配置选项创建,基础配置项如下表所示: | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | | `td.connect.ip` | string | 服务端的 IP 地址 | | | `td.connect.user` | string | 用户名 | | | `td.connect.pass` | string | 密码 | | | `td.connect.port` | integer | 服务端的端口号 | | | `group.id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | | `client.id` | string | 客户端 ID | 最大长度:192。 | | `auto.offset.reset` | enum | 消费组订阅的初始位置 |
`earliest`: default;从头开始订阅;
`latest`: 仅从最新数据开始订阅;
`none`: 没有提交的 offset 无法订阅 | | `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true | | `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 | | `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) |默认关闭 | 对于不同编程语言,其设置方式如下: ```c /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); ``` 对于 Java 程序,还可以使用如下配置项: | 参数名称 | 类型 | 参数说明 | | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" | | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 ```java Properties properties = new Properties(); properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("group.id", "cgrpName"); properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); properties.setProperty("td.connect.user", "root"); properties.setProperty("td.connect.pass", "taosdata"); properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("msg.with.table.name", "true"); properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); TaosConsumer consumer = new TaosConsumer<>(properties); /* value deserializer definition. */ import com.taosdata.jdbc.tmq.ReferenceDeserializer; public class MetersDeserializer extends ReferenceDeserializer { } ``` ```go conf := &tmq.ConfigMap{ "group.id": "test", "auto.offset.reset": "earliest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", "td.connect.port": "6030", "client.id": "test_tmq_c", "enable.auto.commit": "false", "msg.with.table.name": "true", } consumer, err := NewConsumer(conf) ``` ```rust let mut dsn: Dsn = "taos://".parse()?; dsn.set("group.id", "group1"); dsn.set("client.id", "test"); dsn.set("auto.offset.reset", "earliest"); let tmq = TmqBuilder::from_dsn(dsn)?; let mut consumer = tmq.build()?; ``` Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: ```python from taos.tmq import Consumer # Syntax: `consumer = Consumer(configs)` # # Example: consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) ``` ```js // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 let consumer = taos.consumer({ 'enable.auto.commit': 'true', 'auto.commit.interval.ms','1000', 'group.id': 'tg2', 'td.connect.user': 'root', 'td.connect.pass': 'taosdata', 'auto.offset.reset','earliest', 'msg.with.table.name': 'true', 'td.connect.ip','127.0.0.1', 'td.connect.port','6030' }); ``` ```csharp using TDengineTMQ; // 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 // 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数 var cfg = new ConsumerConfig { EnableAutoCommit = "true" AutoCommitIntervalMs = "1000" GourpId = "TDengine-TMQ-C#", TDConnectUser = "root", TDConnectPasswd = "taosdata", AutoOffsetReset = "earliest" MsgWithTableName = "true", TDConnectIp = "127.0.0.1", TDConnectPort = "6030" }; var consumer = new ConsumerBuilder(cfg).Build(); ``` 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 ## 订阅 *topics* 一个 consumer 支持同时订阅多个 topic。 ```c // 创建订阅 topics 列表 tmq_list_t* topicList = tmq_list_new(); tmq_list_append(topicList, "topicName"); // 启动订阅 tmq_subscribe(tmq, topicList); tmq_list_destroy(topicList); ``` ```java List topics = new ArrayList<>(); topics.add("tmq_topic"); consumer.subscribe(topics); ``` ```go err = consumer.Subscribe("example_tmq_topic", nil) if err != nil { panic(err) } ``` ```rust consumer.subscribe(["tmq_meters"]).await?; ``` ```python consumer.subscribe(['topic1', 'topic2']) ``` ```js // 创建订阅 topics 列表 let topics = ['topic_test'] // 启动订阅 consumer.subscribe(topics); ``` ```csharp // 创建订阅 topics 列表 List topics = new List(); topics.add("tmq_topic"); // 启动订阅 consumer.Subscribe(topics); ``` ## 消费 以下代码展示了不同语言下如何对 TMQ 消息进行消费。 ```c // 消费数据 while (running) { TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); msg_process(msg); } ``` 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。 ```java while(running){ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); for (Meters meter : meters) { processMsg(meter); } } ``` ```go for { ev := consumer.Poll(0) if ev != nil { switch e := ev.(type) { case *tmqcommon.DataMessage: fmt.Println(e.Value()) case tmqcommon.Error: fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) panic(e) } consumer.Commit() } } ``` ```rust { let mut stream = consumer.stream(); while let Some((offset, message)) = stream.try_next().await? { // get information from offset // the topic let topic = offset.topic(); // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); println!("* in vgroup id {vgroup_id} of topic {topic}\n"); if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { // one block for one table, get table name if needed let name = block.table_name(); let records: Vec = block.deserialize().try_collect()?; println!( "** table: {}, got {} records: {:#?}\n", name.unwrap(), records.len(), records ); } } consumer.commit(offset).await?; } } ``` ```python while True: res = consumer.poll(100) if not res: continue err = res.error() if err is not None: raise err val = res.value() for block in val: print(block.fetchall()) ``` ```js while(true){ msg = consumer.consume(200); // process message(consumeResult) console.log(msg.topicPartition); console.log(msg.block); console.log(msg.fields) } ``` ```csharp // 消费数据 while (true) { var consumerRes = consumer.Consume(100); // process ConsumeResult ProcessMsg(consumerRes); consumer.Commit(consumerRes); } ``` ## 结束消费 消费结束后,应当取消订阅。 ```c /* 取消订阅 */ tmq_unsubscribe(tmq); /* 关闭消费者对象 */ tmq_consumer_close(tmq); ``` ```java /* 取消订阅 */ consumer.unsubscribe(); /* 关闭消费 */ consumer.close(); ``` ```go /* Unsubscribe */ _ = consumer.Unsubscribe() /* Close consumer */ _ = consumer.Close() ``` ```rust consumer.unsubscribe().await; ``` ```py # 取消订阅 consumer.unsubscribe() # 关闭消费 consumer.close() ``` ```js consumer.unsubscribe(); consumer.close(); ``` ```csharp // 取消订阅 consumer.Unsubscribe(); // 关闭消费 consumer.Close(); ``` ## 删除 *topic* 如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。 ```sql /* 删除 topic */ DROP TOPIC topic_name; ``` ## 状态查看 1、*topics*:查询已经创建的 topic ```sql SHOW TOPICS; ``` 2、consumers:查询 consumer 的状态及其订阅的 topic ```sql SHOW CONSUMERS; ``` 3、subscriptions:查询 consumer 与 vgroup 之间的分配关系 ```sql SHOW SUBSCRIPTIONS; ``` ## 示例代码 以下是各语言的完整示例代码。