#高级功能 ##连续查询(Continuous Query) 连续查询是TDengine定期自动执行的查询,采用滑动窗口的方式进行计算,是一种简化的时间驱动的流式计算。针对库中的表或超级表,TDengine可提供定期自动执行的连续查询,用户可让TDengine推送查询的结果,也可以将结果再写回到TDengine中。每次执行的查询是一个时间窗口,时间窗口随着时间流动向前滑动。在定义连续查询的时候需要指定时间窗口(time window, 参数interval )大小和每次前向增量时间(forward sliding times, 参数sliding)。 TDengine的连续查询采用时间驱动模式,可以直接使用TAOS SQL进行定义,不需要额外的操作。使用连续查询,可以方便快捷地按照时间窗口生成结果,从而对原始采集数据进行降采样(down sampling)。用户通过TAOS SQL定义连续查询以后,TDengine自动在最后的一个完整的时间周期末端拉起查询,并将计算获得的结果推送给用户或者写回TDengine。 TDengine提供的连续查询与普通流计算中的时间窗口计算具有以下区别: - 不同于流计算的实时反馈计算结果,连续查询只在时间窗口关闭以后才开始计算。例如时间周期是1天,那么当天的结果只会在23:59:59以后才会生成。 - 如果有历史记录写入到已经计算完成的时间区间,连续查询并不会重新进行计算,也不会重新将结果推送给用户。对于写回TDengine的模式,也不会更新已经存在的计算结果。 - 使用连续查询推送结果的模式,服务端并不缓存客户端计算状态,也不提供Exactly-Once的语意保证。如果用户的应用端崩溃,再次拉起的连续查询将只会从再次拉起的时间开始重新计算最近的一个完整的时间窗口。如果使用写回模式,TDengine可确保数据写回的有效性和连续性。 ####使用连续查询 使用TAOS SQL定义连续查询的过程,需要调用API taos_stream在应用端启动连续查询。例如要对统计表FOO_TABLE 每1分钟统计一次记录数量,前向滑动的时间是30秒,SQL语句如下: ```sql SELECT COUNT(*) FROM FOO_TABLE INTERVAL(1M) SLIDING(30S) ``` 其中查询的时间窗口(time window)是1分钟,前向增量(forward sliding time)时间是30秒。也可以不使用sliding来指定前向滑动时间,此时系统将自动向前滑动一个查询时间窗口再开始下一次计算,即时间窗口长度等于前向滑动时间。 ```sql SELECT COUNT(*) FROM FOO_TABLE INTERVAL(1M) ``` 如果需要将连续查询的计算结果写回到数据库中,可以使用如下的SQL语句 ```sql CREATE TABLE QUERY_RES AS SELECT COUNT(*) FROM FOO_TABLE INTERVAL(1M) SLIDING(30S) ``` 此时系统将自动创建表QUERY_RES,然后将连续查询的结果写入到该表。需要注意的是,前向滑动时间不能大于时间窗口的范围。如果用户指定的前向滑动时间超过时间窗口范围,系统将自动将其设置为时间窗口的范围值。如上所示SQL语句,如果用户设置前向滑动时间超过1分钟,系统将强制将其设置为1分钟。 此外,TDengine还支持用户指定连续查询的结束时间,默认如果不输入结束时间,连续查询将永久运行,如果用户指定了结束时间,连续查询在系统时间达到指定的时间以后停止运行。如SQL所示,连续查询将运行1个小时,1小时之后连续查询自动停止。 ```sql CREATE TABLE QUERY_RES AS SELECT COUNT(*) FROM FOO_TABLE WHERE TS > NOW AND TS <= NOW + 1H INTERVAL(1M) SLIDING(30S) ``` 此外,还需要注意的是查询时间窗口的最小值是10毫秒,没有时间窗口范围的上限。 ####管理连续查询 用户可在控制台中通过 *show streams* 命令来查看系统中全部运行的连续查询,并可以通过 *kill stream* 命令杀掉对应的连续查询。在写回模式中,如果用户可以直接将写回的表删除,此时连续查询也会自动停止并关闭。后续版本会提供更细粒度和便捷的连续查询管理命令。 ##数据订阅(Publisher/Subscriber) 基于数据天然的时间序列特性,TDengine的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,均可视为系统中插入一条带时间戳的新记录。同时,TDengine在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说,TDengine中里每一张表均可视为一个标准的消息队列。 TDengine内嵌支持轻量级的消息订阅与推送服务。使用系统提供的API,用户可订阅数据库中的某一张表(或超级表)。订阅的逻辑和操作状态的维护均是由客户端完成,客户端定时轮询服务器是否有新的记录到达,有新的记录到达就会将结果反馈到客户。 TDengine的订阅与推送服务的状态是客户端维持,TDengine服务器并不维持。因此如果应用重启,从哪个时间点开始获取最新数据,由应用决定。 ####API说明 使用订阅的功能,主要API如下:
TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, int64_t time, int mseconds)
该函数负责启动订阅服务。其中参数说明:
host:主机IP地址
user:数据库登录用户名
pass:密码
db:数据库名称
table:(超级) 表的名称
time:启动时间,Unix Epoch时间,单位为毫秒。从1970年1月1日起计算的毫秒数。如果设为0,表示从当前时间开始订阅
mseconds:查询数据库更新的时间间隔,单位为毫秒。一般设置为1000毫秒。返回值为指向TDengine_SUB 结构的指针,如果返回为空,表示失败。
TAOS_ROW taos_consume(TAOS_SUB *tsub)
该函数用来获取订阅的结果,用户应用程序将其置于一个无限循环语句。如果数据库有新记录到达,该API将返回该最新的记录。如果没有新的记录,该API将阻塞。如果返回值为空,说明系统出错。参数说明:
tsub:taos_subscribe的结构体指针。
void taos_unsubscribe(TAOS_SUB *tsub)
取消订阅。应用程序退出时,务必调用该函数以避免资源泄露。
int taos_num_subfields(TAOS_SUB *tsub)
获取返回的一行记录中数据包含多少列。
TAOS_FIELD *taos_fetch_subfields(TAOS_SUB *tsub)
获取每列数据的属性(数据类型、名字、长度),与taos_num_subfileds配合使用,可解析返回的每行数据。