You need to sign in or sign up before continuing.
提交 6a9dc072 编写于 作者: H huolibo

docs: modify tmq document

上级 d6362604
......@@ -29,6 +29,9 @@ import CDemo from "./_sub_c.mdx";
TMQ 的 API 中,与订阅相关的主要数据结构和API如下:
<Tabs defaultValue="c_struct">
<TabItem value="c_struct" label="C">
```c
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
......@@ -64,6 +67,30 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码。
</TabItem>
<TabItem value="java_struct" label="Java">
```java
void subscribe(Collection<String> topics) throws SQLException;
void unsubscribe() throws SQLException;
Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitSync() throws SQLException;
void close() throws SQLException;
```
</TabItem>
</Tabs>
## 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
......@@ -108,19 +135,22 @@ TMQ支持多种订阅类型:
对于consumer, 目前支持的config包括:
| 参数名称 | 参数值 | 备注 |
| ---------------------------- | ------------------------------ | ------------------------------------------------------ |
| group.id | 最大长度:192 | |
| enable.auto.commit | 合法值:true, false | |
| auto.commit.interval.ms | | |
| auto.offset.reset | 合法值:earliest, latest, none | |
| td.connect.ip | 用于连接,同taos_connect的参数 | |
| td.connect.user | 用于连接,同taos_connect的参数 | |
| td.connect.pass | 用于连接,同taos_connect的参数 | |
| td.connect.port | 用于连接,同taos_connect的参数 | |
| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即consumer不会因为长时间不poll而认为离线 |
| experimental.snapshot.enable | 合法值:true, false | 从wal开始消费,还是从tsbs开始消费 |
| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 |
<Tabs defaultValue="c_param">
<TabItem value="c_param" label="C">
| 参数名称 | 参数说明 | 备注 |
| ---------------------------- |----------------------------|----------------------------|
| group.id | | 最大长度:192 |
| enable.auto.commit | | 合法值:true, false |
| auto.commit.interval.ms | | |
| auto.offset.reset | | 合法值:earliest, latest, none |
| td.connect.ip | | 用于连接,同taos_connect的参数 | |
| td.connect.user | | 用于连接,同taos_connect的参数 | |
| td.connect.pass | | 用于连接,同taos_connect的参数 | |
| td.connect.port | | 用于连接,同taos_connect的参数 | |
| enable.heartbeat.background |开启后台心跳,即consumer不会因为长时间不poll而认为离线 | 合法值:true, false | |
| experimental.snapshot.enable |从wal开始消费,还是从tsbs开始消费 | 合法值:true, false | |
| msg.with.table.name | 从消息中能否解析表名 | 合法值:true, false |
```sql
/* 根据需要,设置消费组(group.id)、自动提交(enable.auto.commit)、自动提交时间间隔(auto.commit.interval.ms)、用户名(td.connect.user)、密码(td.connect.pass)等参数 */
......@@ -139,10 +169,54 @@ TMQ支持多种订阅类型:
tmq_conf_destroy(conf);
```
</TabItem>
<TabItem value="java_param" label="Java">
| 参数名称 | 参数值 | 备注 |
| ---------------------------- |----------------------------|------------------------------------------------------------------------------------------------|
| group.id | 最大长度:192,必填 | consumer 所在组 id |
| client.id | 最大长度:192 | consumer id |
| enable.auto.commit | 合法值:true, false | 是否允许自动提交 |
| auto.commit.interval.ms | | 自动提交时间间隔 |
| auto.offset.reset | 合法值:earliest, latest, none | offset 消费位置 |
| bootstrap.servers | 用于创建连接 | 服务端 ip + port |
| td.connect.user | 用于创建连接,同 taos_connect 的参数 | 服务端用户名 |
| td.connect.pass | 用于创建连接,同 taos_connect 的参数 | 服务端密码 |
| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即 consumer 不会因为长时间不 poll 而认为离线 |
| experimental.snapshot.enable | 合法值:true, false | 从 wal 开始消费,还是从 tsbs 开始消费 |
| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 |
| value.deserializer | value 值解析方法 | 此方法应实现 com.taosdata.jdbc.tmq.Deserializer 接口或是继承 com.taosdata.jdbc.tmq.ReferenceDeserializer 类 |
| value.deserializer.encoding | value 中使用字符编码集 | |
```java
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
```
```java
Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer.encoding", "com.taos.example.MetersDeserializer");
TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)
```
</TabItem>
</Tabs>
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
## 创建 topic 列表
## 订阅 topic 列表
单个consumer支持同时订阅多个topic。
......@@ -153,6 +227,9 @@ TMQ支持多种订阅类型:
## 启动订阅并开始消费
<Tabs defaultValue="c_create">
<TabItem value="c_create" label="C">
```
/* 启动订阅 */
tmq_subscribe(tmq, topicList);
......@@ -165,10 +242,32 @@ TMQ支持多种订阅类型:
}
```
</TabItem>
<TabItem value="java_create" label="Java">
```java
List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
while(running){
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
processMsg(meter);
}
}
```
</TabItem>
</Tabs>
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。
## 结束消费
<Tabs defaultValue="c_close">
<TabItem value="c_close" label="C">
```sql
/* 取消订阅 */
tmq_unsubscribe(tmq);
......@@ -177,6 +276,20 @@ TMQ支持多种订阅类型:
tmq_consumer_close(tmq);
```
</TabItem>
<TabItem value="java_close" label="Java">
```java
/* 取消订阅 */
consumer.unsubscribe();
/* 关闭消费 */
consumer.close();
```
</TabItem>
</Tabs>
## 删除topic
如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册