diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md index 358c824ffa0330b469938a6cee75cd125ddb25c2..a9a658b3219cbee017bdbe25dd906ddc0c41be26 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.md @@ -1,14 +1,18 @@ --- sidebar_label: 消息队列 -description: "数据订阅与推送服务。连续写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。" +description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。" title: 消息队列 --- -基于数据天然的时间序列特性,TDengine 的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,均可视为系统中插入一条带时间戳的新记录。同时,TDengine 在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说,TDengine 中每一张表均可视为一个标准的消息队列。 -TDengine 内嵌支持消息订阅与推送服务(下文都简称TMQ)。使用系统提供的 API,用户可使用普通查询语句订阅数据库中的一张或多张表,或整个库。客户端启动订阅后,定时或按需轮询服务器是否有新的记录到达,有新的记录到达就会将结果反馈到客户。 +为了实时获取写入TDengine的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口,它支持以消费者组的方式,使多个消费者分布式、多线程地同时订阅一个topic,共享消费进度。并且提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。 + +使用TDengine一体化的数据订阅功能,除了以上标准的消息队列特性,还能在订阅同时通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。 + +为了实现上述功能,TDengine提供了多种灵活的WAL文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine从WAL中获取数据,并经过过滤、变换等操作,将数据推送给消费者。 + + -TMQ提供了提交机制来保证消息队列的可靠性和正确性。在调用方法上,支持自动提交和手动提交。 TMQ 的 API 中,与订阅相关的主要数据结构和API如下: @@ -82,18 +86,11 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; 2、超级表订阅 语法:CREATE TOPIC topic_name AS STABLE stbName -- 订阅某超级表的全部数据,schema变更不受限,schema变更后写入的数据将以最新schema返回 -- 在tmq的返回消息中schema是块级别的,每块的schema可能不一样 -- 列变更后写入的数据若未落盘,将以写入时的schema返回 -- 列变更后写入的数据若已落盘,将以落盘时的schema返回 - -3、db订阅 -语法:CREATE TOPIC topic_name AS DATABASE db_name - -- 订阅某一db的全部数据,schema变更不受限 -- 在tmq的返回消息中schema是块级别的,每块的schema可能不一样 -- 列变更后写入的数据若未落盘,将以写入时的schema返回 -- 列变更后写入的数据若已落盘,将以落盘时的schema返回 +与select * from stbName订阅的区别是: +- 不会限制用户的schema变更 +- 返回的是非结构化的数据:返回数据的schema会随之超级表的schema变化而变化 +- 用户对于要处理的每一个数据块都可能有不同的schema,因此,必须重新获取schema +- 返回数据不带有tag 三、创建consumer @@ -121,7 +118,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); - tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "experimental.snapshot.enable", "true"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); @@ -151,9 +148,9 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; /* 循环poll消息 */ int32_t totalRows = 0; int32_t msgCnt = 0; - int32_t consumeDelay = 5000; + int32_t timeOut = 5000; while (running) { - TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, consumeDelay); + TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut); if (tmqmsg) { msgCnt++; totalRows += msg_process(tmqmsg); @@ -190,7 +187,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1; int32_t* length = taos_fetch_lengths(msg); int32_t precision = taos_result_precision(msg); const char* tbName = tmq_get_table_name(msg); - rows++; + rows++; taos_print_row(buf, row, fields, numOfFields); printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf); }