diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.mdx
similarity index 60%
rename from docs/zh/07-develop/07-tmq.md
rename to docs/zh/07-develop/07-tmq.mdx
index 25d468cad3658190f6b9409637543061ac22f958..5cf3554f462a1457082eb9131db9991677ac76af 100644
--- a/docs/zh/07-develop/07-tmq.md
+++ b/docs/zh/07-develop/07-tmq.mdx
@@ -29,6 +29,9 @@ import CDemo from "./_sub_c.mdx";
TMQ 的 API 中,与订阅相关的主要数据结构和API如下:
+
+
+
```c
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
@@ -64,6 +67,30 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码。
+
+
+
+```java
+void subscribe(Collection topics) throws SQLException;
+
+void unsubscribe() throws SQLException;
+
+Set subscription() throws SQLException;
+
+ConsumerRecords poll(Duration timeout) throws SQLException;
+
+void commitAsync();
+
+void commitAsync(OffsetCommitCallback callback);
+
+void commitSync() throws SQLException;
+
+void close() throws SQLException;
+```
+
+
+
+
## 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
@@ -108,19 +135,22 @@ TMQ支持多种订阅类型:
对于consumer, 目前支持的config包括:
-| 参数名称 | 参数值 | 备注 |
-| ---------------------------- | ------------------------------ | ------------------------------------------------------ |
-| group.id | 最大长度:192 | |
-| enable.auto.commit | 合法值:true, false | |
-| auto.commit.interval.ms | | |
-| auto.offset.reset | 合法值:earliest, latest, none | |
-| td.connect.ip | 用于连接,同taos_connect的参数 | |
-| td.connect.user | 用于连接,同taos_connect的参数 | |
-| td.connect.pass | 用于连接,同taos_connect的参数 | |
-| td.connect.port | 用于连接,同taos_connect的参数 | |
-| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即consumer不会因为长时间不poll而认为离线 |
-| experimental.snapshot.enable | 合法值:true, false | 从wal开始消费,还是从tsbs开始消费 |
-| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 |
+
+
+
+| 参数名称 | 参数说明 | 备注 |
+| ---------------------------- |----------------------------|----------------------------|
+| group.id | | 最大长度:192 |
+| enable.auto.commit | | 合法值:true, false |
+| auto.commit.interval.ms | | |
+| auto.offset.reset | | 合法值:earliest, latest, none |
+| td.connect.ip | | 用于连接,同taos_connect的参数 | |
+| td.connect.user | | 用于连接,同taos_connect的参数 | |
+| td.connect.pass | | 用于连接,同taos_connect的参数 | |
+| td.connect.port | | 用于连接,同taos_connect的参数 | |
+| enable.heartbeat.background |开启后台心跳,即consumer不会因为长时间不poll而认为离线 | 合法值:true, false | |
+| experimental.snapshot.enable |从wal开始消费,还是从tsbs开始消费 | 合法值:true, false | |
+| msg.with.table.name | 从消息中能否解析表名 | 合法值:true, false |
```sql
/* 根据需要,设置消费组(group.id)、自动提交(enable.auto.commit)、自动提交时间间隔(auto.commit.interval.ms)、用户名(td.connect.user)、密码(td.connect.pass)等参数 */
@@ -139,10 +169,54 @@ TMQ支持多种订阅类型:
tmq_conf_destroy(conf);
```
+
+
+
+| 参数名称 | 参数值 | 备注 |
+| ---------------------------- |----------------------------|------------------------------------------------------------------------------------------------|
+| group.id | 最大长度:192,必填 | consumer 所在组 id |
+| client.id | 最大长度:192 | consumer id |
+| enable.auto.commit | 合法值:true, false | 是否允许自动提交 |
+| auto.commit.interval.ms | | 自动提交时间间隔 |
+| auto.offset.reset | 合法值:earliest, latest, none | offset 消费位置 |
+| bootstrap.servers | 用于创建连接 | 服务端 ip + port |
+| td.connect.user | 用于创建连接,同 taos_connect 的参数 | 服务端用户名 |
+| td.connect.pass | 用于创建连接,同 taos_connect 的参数 | 服务端密码 |
+| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即 consumer 不会因为长时间不 poll 而认为离线 |
+| experimental.snapshot.enable | 合法值:true, false | 从 wal 开始消费,还是从 tsbs 开始消费 |
+| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 |
+| value.deserializer | value 值解析方法 | 此方法应实现 com.taosdata.jdbc.tmq.Deserializer 接口或是继承 com.taosdata.jdbc.tmq.ReferenceDeserializer 类 |
+| value.deserializer.encoding | value 中使用字符编码集 | |
+
+```java
+import com.taosdata.jdbc.tmq.ReferenceDeserializer;
+
+public class MetersDeserializer extends ReferenceDeserializer {
+}
+```
+
+```java
+ Properties properties = new Properties();
+ properties.setProperty("enable.auto.commit", "true");
+ properties.setProperty("auto.commit.interval.ms", "1000");
+ properties.setProperty("group.id", "cgrpName");
+ properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
+ properties.setProperty("td.connect.user", "root");
+ properties.setProperty("td.connect.pass", "taosdata");
+ properties.setProperty("auto.offset.reset", "earliest");
+ properties.setProperty("msg.with.table.name", "true");
+ properties.setProperty("value.deserializer.encoding", "com.taos.example.MetersDeserializer");
+
+ TaosConsumer consumer = new TaosConsumer<>(properties)
+```
+
+
+
+
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
-## 创建 topic 列表
+## 订阅 topic 列表
单个consumer支持同时订阅多个topic。
@@ -153,6 +227,9 @@ TMQ支持多种订阅类型:
## 启动订阅并开始消费
+
+
+
```
/* 启动订阅 */
tmq_subscribe(tmq, topicList);
@@ -165,10 +242,32 @@ TMQ支持多种订阅类型:
}
```
+
+
+
+```java
+ List topics = new ArrayList<>();
+ topics.add("tmq_topic");
+ consumer.subscribe(topics);
+
+ while(running){
+ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
+ for (Meters meter : meters) {
+ processMsg(meter);
+ }
+ }
+```
+
+
+
+
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。
## 结束消费
+
+
+
```sql
/* 取消订阅 */
tmq_unsubscribe(tmq);
@@ -177,6 +276,20 @@ TMQ支持多种订阅类型:
tmq_consumer_close(tmq);
```
+
+
+
+```java
+ /* 取消订阅 */
+ consumer.unsubscribe();
+
+ /* 关闭消费 */
+ consumer.close();
+```
+
+
+
+
## 删除topic
如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。