From 662f194cd96841de479a5f6764051d989c205356 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 7 Aug 2022 13:20:45 +0800 Subject: [PATCH] Update 07-tmq.md --- docs/zh/07-develop/07-tmq.md | 43 ++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md index a9a658b321..c39249e6d2 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.md @@ -1,18 +1,20 @@ --- -sidebar_label: 消息队列 +sidebar_label: 数据订阅 description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。" -title: 消息队列 +title: 数据订阅 --- +为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 -为了实时获取写入TDengine的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口,它支持以消费者组的方式,使多个消费者分布式、多线程地同时订阅一个topic,共享消费进度。并且提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。 +与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 可以是一张超级表,或一张子表。不仅如此,你可以通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤交给 TDengine,而不是应用完成,有效的减少传输的数据量。 -使用TDengine一体化的数据订阅功能,除了以上标准的消息队列特性,还能在订阅同时通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。 - -为了实现上述功能,TDengine提供了多种灵活的WAL文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine从WAL中获取数据,并经过过滤、变换等操作,将数据推送给消费者。 +消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程分布式的消费数据,提高数据通吐率。但不同消费者组即使消费同一个topic, 并不共享消费进度。一个消费者组可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的vnode上,也就是多个shard上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。 +为了实现上述功能,TDengine 采用了灵活的 WAL (Write-Ahead-Log) 文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine 从 WAL 中获取数据,并经过过滤、变换等操作,将数据推送给消费者。 +本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。 +## 主要数据结构和API TMQ 的 API 中,与订阅相关的主要数据结构和API如下: @@ -51,7 +53,9 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm 这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码可以在 [tmq.c](https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c) 看到。 -一、首先完成建库、建一张超级表和多张子表,并每个子表插入若干条数据记录: +## 写入数据 + +首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: ```sql drop database if exists tmqdb; @@ -67,14 +71,15 @@ insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22'); insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33'); ``` -二、创建topic: +## 创建topic: ```sql create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; ``` -注:TMQ支持多种订阅类型: -1、列订阅 +TMQ支持多种订阅类型: + +### 列订阅 语法:CREATE TOPIC topic_name as subquery 通过select语句订阅(包括select *,或select ts, c1等指定列描述订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合) @@ -83,7 +88,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; - 被订阅或用于计算的column和tag不可被删除、修改 - 若发生schema变更,新增的column不出现在结果中 -2、超级表订阅 +### 超级表订阅 语法:CREATE TOPIC topic_name AS STABLE stbName 与select * from stbName订阅的区别是: @@ -92,7 +97,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; - 用户对于要处理的每一个数据块都可能有不同的schema,因此,必须重新获取schema - 返回数据不带有tag -三、创建consumer +## 创建 consumer 目前支持的config: @@ -128,7 +133,9 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; return tmq; ``` -四、创建订阅主题列表 +## 创建 topic 列表 + +单个consumer支持同时订阅多个topic。 ```sql tmq_list_t* topicList = tmq_list_new(); @@ -136,9 +143,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; return topicList; ``` -单个consumer支持同时订阅多个topic。 - -五、启动订阅并开始消费 +## 启动订阅并开始消费 ```sql /* 启动订阅 */ @@ -196,7 +201,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; } ``` -五、结束消费 +## 结束消费 ```sql /* 取消订阅 */ @@ -206,7 +211,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; tmq_consumer_close(tmq); ``` -六、删除topic +## 删除topic 如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。 @@ -215,7 +220,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; drop topic topicName; ``` -七、状态查看 +## 状态查看 1、topics:查询已经创建的topic -- GitLab