From 6a9dc072e5147dd9be16f98750c45e476c8c3289 Mon Sep 17 00:00:00 2001 From: huolibo Date: Mon, 15 Aug 2022 14:59:02 +0800 Subject: [PATCH] docs: modify tmq document --- docs/zh/07-develop/{07-tmq.md => 07-tmq.mdx} | 141 +++++++++++++++++-- 1 file changed, 127 insertions(+), 14 deletions(-) rename docs/zh/07-develop/{07-tmq.md => 07-tmq.mdx} (60%) diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.mdx similarity index 60% rename from docs/zh/07-develop/07-tmq.md rename to docs/zh/07-develop/07-tmq.mdx index 25d468cad3..5cf3554f46 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.mdx @@ -29,6 +29,9 @@ import CDemo from "./_sub_c.mdx"; TMQ 的 API 中,与订阅相关的主要数据结构和API如下: + + + ```c typedef struct tmq_t tmq_t; typedef struct tmq_conf_t tmq_conf_t; @@ -64,6 +67,30 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm 这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码。 + + + +```java +void subscribe(Collection topics) throws SQLException; + +void unsubscribe() throws SQLException; + +Set subscription() throws SQLException; + +ConsumerRecords poll(Duration timeout) throws SQLException; + +void commitAsync(); + +void commitAsync(OffsetCommitCallback callback); + +void commitSync() throws SQLException; + +void close() throws SQLException; +``` + + + + ## 写入数据 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: @@ -108,19 +135,22 @@ TMQ支持多种订阅类型: 对于consumer, 目前支持的config包括: -| 参数名称 | 参数值 | 备注 | -| ---------------------------- | ------------------------------ | ------------------------------------------------------ | -| group.id | 最大长度:192 | | -| enable.auto.commit | 合法值:true, false | | -| auto.commit.interval.ms | | | -| auto.offset.reset | 合法值:earliest, latest, none | | -| td.connect.ip | 用于连接,同taos_connect的参数 | | -| td.connect.user | 用于连接,同taos_connect的参数 | | -| td.connect.pass | 用于连接,同taos_connect的参数 | | -| td.connect.port | 用于连接,同taos_connect的参数 | | -| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即consumer不会因为长时间不poll而认为离线 | -| experimental.snapshot.enable | 合法值:true, false | 从wal开始消费,还是从tsbs开始消费 | -| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 | + + + +| 参数名称 | 参数说明 | 备注 | +| ---------------------------- |----------------------------|----------------------------| +| group.id | | 最大长度:192 | +| enable.auto.commit | | 合法值:true, false | +| auto.commit.interval.ms | | | +| auto.offset.reset | | 合法值:earliest, latest, none | +| td.connect.ip | | 用于连接,同taos_connect的参数 | | +| td.connect.user | | 用于连接,同taos_connect的参数 | | +| td.connect.pass | | 用于连接,同taos_connect的参数 | | +| td.connect.port | | 用于连接,同taos_connect的参数 | | +| enable.heartbeat.background |开启后台心跳,即consumer不会因为长时间不poll而认为离线 | 合法值:true, false | | +| experimental.snapshot.enable |从wal开始消费,还是从tsbs开始消费 | 合法值:true, false | | +| msg.with.table.name | 从消息中能否解析表名 | 合法值:true, false | ```sql /* 根据需要,设置消费组(group.id)、自动提交(enable.auto.commit)、自动提交时间间隔(auto.commit.interval.ms)、用户名(td.connect.user)、密码(td.connect.pass)等参数 */ @@ -139,10 +169,54 @@ TMQ支持多种订阅类型: tmq_conf_destroy(conf); ``` + + + +| 参数名称 | 参数值 | 备注 | +| ---------------------------- |----------------------------|------------------------------------------------------------------------------------------------| +| group.id | 最大长度:192,必填 | consumer 所在组 id | +| client.id | 最大长度:192 | consumer id | +| enable.auto.commit | 合法值:true, false | 是否允许自动提交 | +| auto.commit.interval.ms | | 自动提交时间间隔 | +| auto.offset.reset | 合法值:earliest, latest, none | offset 消费位置 | +| bootstrap.servers | 用于创建连接 | 服务端 ip + port | +| td.connect.user | 用于创建连接,同 taos_connect 的参数 | 服务端用户名 | +| td.connect.pass | 用于创建连接,同 taos_connect 的参数 | 服务端密码 | +| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即 consumer 不会因为长时间不 poll 而认为离线 | +| experimental.snapshot.enable | 合法值:true, false | 从 wal 开始消费,还是从 tsbs 开始消费 | +| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 | +| value.deserializer | value 值解析方法 | 此方法应实现 com.taosdata.jdbc.tmq.Deserializer 接口或是继承 com.taosdata.jdbc.tmq.ReferenceDeserializer 类 | +| value.deserializer.encoding | value 中使用字符编码集 | | + +```java +import com.taosdata.jdbc.tmq.ReferenceDeserializer; + +public class MetersDeserializer extends ReferenceDeserializer { +} +``` + +```java + Properties properties = new Properties(); + properties.setProperty("enable.auto.commit", "true"); + properties.setProperty("auto.commit.interval.ms", "1000"); + properties.setProperty("group.id", "cgrpName"); + properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); + properties.setProperty("td.connect.user", "root"); + properties.setProperty("td.connect.pass", "taosdata"); + properties.setProperty("auto.offset.reset", "earliest"); + properties.setProperty("msg.with.table.name", "true"); + properties.setProperty("value.deserializer.encoding", "com.taos.example.MetersDeserializer"); + + TaosConsumer consumer = new TaosConsumer<>(properties) +``` + + + + 上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。 -## 创建 topic 列表 +## 订阅 topic 列表 单个consumer支持同时订阅多个topic。 @@ -153,6 +227,9 @@ TMQ支持多种订阅类型: ## 启动订阅并开始消费 + + + ``` /* 启动订阅 */ tmq_subscribe(tmq, topicList); @@ -165,10 +242,32 @@ TMQ支持多种订阅类型: } ``` + + + +```java + List topics = new ArrayList<>(); + topics.add("tmq_topic"); + consumer.subscribe(topics); + + while(running){ + ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); + for (Meters meter : meters) { + processMsg(meter); + } + } +``` + + + + 这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。 ## 结束消费 + + + ```sql /* 取消订阅 */ tmq_unsubscribe(tmq); @@ -177,6 +276,20 @@ TMQ支持多种订阅类型: tmq_consumer_close(tmq); ``` + + + +```java + /* 取消订阅 */ + consumer.unsubscribe(); + + /* 关闭消费 */ + consumer.close(); +``` + + + + ## 删除topic 如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。 -- GitLab