提交 d1cc52f4 编写于 作者: L Liu Jicong

doc(tmq)

上级 9a9c179d
---
sidebar_label: 消息队列
description: "数据订阅与推送服务。连续写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
title: 消息队列
---
基于数据天然的时间序列特性,TDengine 的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,均可视为系统中插入一条带时间戳的新记录。同时,TDengine 在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说,TDengine 中每一张表均可视为一个标准的消息队列。
TDengine 内嵌支持消息订阅与推送服务(下文都简称TMQ)。使用系统提供的 API,用户可使用普通查询语句订阅数据库中的一张或多张表,或整个库。客户端启动订阅后,定时或按需轮询服务器是否有新的记录到达,有新的记录到达就会将结果反馈到客户。
为了实时获取写入TDengine的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口,它支持以消费者组的方式,使多个消费者分布式、多线程地同时订阅一个topic,共享消费进度。并且提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。
使用TDengine一体化的数据订阅功能,除了以上标准的消息队列特性,还能在订阅同时通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。
为了实现上述功能,TDengine提供了多种灵活的WAL文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine从WAL中获取数据,并经过过滤、变换等操作,将数据推送给消费者。
TMQ提供了提交机制来保证消息队列的可靠性和正确性。在调用方法上,支持自动提交和手动提交。
TMQ 的 API 中,与订阅相关的主要数据结构和API如下:
......@@ -82,18 +86,11 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
2、超级表订阅
语法:CREATE TOPIC topic_name AS STABLE stbName
- 订阅某超级表的全部数据,schema变更不受限,schema变更后写入的数据将以最新schema返回
- 在tmq的返回消息中schema是块级别的,每块的schema可能不一样
- 列变更后写入的数据若未落盘,将以写入时的schema返回
- 列变更后写入的数据若已落盘,将以落盘时的schema返回
3、db订阅
语法:CREATE TOPIC topic_name AS DATABASE db_name
- 订阅某一db的全部数据,schema变更不受限
- 在tmq的返回消息中schema是块级别的,每块的schema可能不一样
- 列变更后写入的数据若未落盘,将以写入时的schema返回
- 列变更后写入的数据若已落盘,将以落盘时的schema返回
与select * from stbName订阅的区别是:
- 不会限制用户的schema变更
- 返回的是非结构化的数据:返回数据的schema会随之超级表的schema变化而变化
- 用户对于要处理的每一个数据块都可能有不同的schema,因此,必须重新获取schema
- 返回数据不带有tag
三、创建consumer
......@@ -121,7 +118,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -151,9 +148,9 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
/* 循环poll消息 */
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t consumeDelay = 5000;
int32_t timeOut = 5000;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, consumeDelay);
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
......@@ -190,7 +187,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册