提交 90a0de85 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/mnode

......@@ -61,7 +61,7 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev
sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev pkg-config
```
### CentOS 7.9
### CentOS 7.9
```bash
sudo yum install epel-release
......@@ -78,7 +78,7 @@ sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel
#### 在 CentOS 上构建 taosTools 安装依赖软件
#### For CentOS 7/RHEL
#### For CentOS 7.9
```
sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel
......
......@@ -20,7 +20,7 @@ English | [简体中文](README-CN.md) | We are hiring, check [here](https://tde
# What is TDengine?
TDengine is an open source, cloud native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other TSDBs with the following advantages.:
TDengine is an open source, high performance, cloud native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other TSDBs with the following advantages.:
- High-Performance: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression.
......@@ -40,7 +40,7 @@ For user manual, system design and architecture, please refer to [TDengine Docum
# Building
At the moment, TDengine server supports running on Linux, Windows, and macOS systems. You can choose to [install from packages](https://www.tdengine.com/getting-started/#Install-from-Package) or build it from the source code. This quick guide is for installation from the source only.
At the moment, TDengine server supports running on Linux and Windows systems. You can choose to [install from packages](https://www.taosdata.com/en/getting-started/#Install-from-Package) or build it from the source code. This quick guide is for installation from the source only.
We provide a few useful tools such as taosBenchmark (was named taosdemo) and taosdump. They were part of TDengine. By default, TDengine compiling does not include taosTools. You can use 'cmake .. -DBUILD_TOOLS=true' to make them be compiled with TDengine.
......
......@@ -8,16 +8,13 @@ TDengine provides a rich set of APIs (application development interface). To fac
## Supported platforms
Currently, TDengine's native interface connectors can support platforms such as X64/X86/ARM64/ARM32/MIPS/Alpha hardware platforms and Linux/Win64/Win32 development environments. The comparison matrix is as follows.
Currently, TDengine's native interface connectors can support platforms such as X64/ARM64 hardware platforms and Linux/Win64 development environments. The comparison matrix is as follows.
| **CPU** | **OS** | **JDBC** | **Python** | **Go** | **Node.js** | **C#** | **Rust** | C/C++ |
| ------- | ------ | -------- | ---------- | ------ | ----------- | ------ | -------- | ----- |
| **X86 64bit** | **Linux** | ● | ● | ● | ● | ● | ● | ● |
| **X86 64bit** | **Win64** | ● | ● | ● | ● | ● | ● | ● |
| **X86 64bit** | **Win32** | ● | ● | ● | ● | ○ | ○ | ● |
| **X86 32bit** | **Win32** | ○ | ○ | ○ | ○ | ○ | ○ | ● |
| **ARM64** | **Linux** | ● | ● | ● | ● | ○ | ○ | ● |
| **MIPS** | **Linux** | ○ | ○ | ○ | ○ | ○ | ○ | ○ |
Where ● means the official test verification passed, ○ means the unofficial test verification passed, -- means no assurance.
......
......@@ -5,11 +5,11 @@ description: "List of platforms supported by TDengine server, client, and connec
## List of supported platforms for TDengine server
| | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18/20** | **Other Linux** | **UOS** | **Kylin** | **Ningsi V60/V80** | **HUAWEI EulerOS** |
| ------------------ | ----------------- | ---------------- | ---------------- | --------------- | ------- | --------- | ------------------ | ------------------ |
| X64 | ● | ● | ● | | ● | ● | ● | |
| Raspberry Pi ARM64 | | | | ● | | | | |
| HUAWEI cloud ARM64 | | | | | | | | ● |
| | **Windows server 2016/2019** | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18/20** | **UOS** | **kylin** | **Ningsi V60/V80** |
| ------------------ | ---------------------------- | ----------------- | ---------------- | ---------------- | ------- | --------- | ------------------ |
| X64 | ● | ● | ● | ● | ● | ● | ● |
| Raspberry Pi ARM64 | | | ● | | | | |
| HUAWEI Cloud ARM64 | | | | ● | | | |
Note: ● means officially tested and verified, ○ means unofficially tested and verified.
......@@ -19,15 +19,15 @@ TDengine's connector can support a wide range of platforms, including X64/X86/AR
The comparison matrix is as follows.
| **CPU** | **X64 64bit** | | | **X86 32bit** | **ARM64** | **MIPS** | **Alpha** |
| ----------- | ------------- | --------- | --------- | ------------- | --------- | --------- | --------- |
| **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** |
| **C/C++** | ● | ● | ● | ○ | ● | ● | ● |
| **JDBC** | ● | ● | ● | ○ | ● | ● | ● |
| **Python** | ● | ● | ● | ○ | ● | ● | -- |
| **Go** | ● | ● | ● | ○ | ● | ○ | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ○ | -- |
| **C#** | ● | ● | ○ | ○ | ○ | ○ | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● |
| **CPU** | **X64 64bit** | **X64 64bit** | **ARM64** |
| ----------- | ------------- | ------------- | --------- |
| **OS** | **Linux** | **Win64** | **Linux** |
| **C/C++** | ● | ● | ● |
| **JDBC** | ● | ● | ● |
| **Python** | ● | ● | ● |
| **Go** | ● | ● | ● |
| **NodeJs** | ● | ● | ● |
| **C#** | ● | ● | ○ |
| **RESTful** | ● | ● | ● |
Note: ● means the official test is verified, ○ means the unofficial test is verified, -- means not verified.
import taos
from taos.tmq import *
conn = taos.connect()
# create database
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
# create table and stables
conn.select_db("py_tmq")
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
# create topic
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
# set consumer configure options
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
conf.set("msg.with.table.name", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
# build consumer
tmq = conf.new_consumer()
# build topic list
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
# subscribe consumer
tmq.subscribe(topic_list)
# check subscriptions
sub_list = tmq.subscription()
print("subscribed topics: ",sub_list)
# start subscribe
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row)
tb = res.get_table_name()
print(f"from table: {tb}")
......@@ -46,7 +46,7 @@ apt-get 方式只适用于 Debian 或 Ubuntu 系统
</TabItem>
<TabItem label="Deb 安装" value="debinst">
1.官网下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
1. [发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令:
```bash
......@@ -57,7 +57,7 @@ sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
<TabItem label="RPM 安装" value="rpminst">
1.官网下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
1. [发布历史页面](../../releases) 下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令:
```bash
......@@ -68,7 +68,7 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
<TabItem label="tar.gz 安装" value="tarinst">
1.官网下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
1. [发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
```bash
......@@ -90,7 +90,7 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问
<TabItem label="Windows 安装" value="windows">
1.官网下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
1. [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
</TabItem>
......
......@@ -43,7 +43,7 @@ Query OK, 2 row(s) in set (0.001100s)
为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。
具体的查询语法请看 [TAOS SQL 的数据查询](/taos-sql/select) 章节。
具体的查询语法请看 [TAOS SQL 的数据查询](../../taos-sql/select) 章节。
## 多表聚合查询
......@@ -74,7 +74,7 @@ taos> SELECT count(*), max(current) FROM meters where groupId = 2 and ts > now -
Query OK, 1 row(s) in set (0.002136s)
```
在 [TAOS SQL 的数据查询](/taos-sql/select) 一章,查询类操作都会注明是否支持超级表。
在 [TAOS SQL 的数据查询](../../taos-sql/select) 一章,查询类操作都会注明是否支持超级表。
## 降采样查询、插值
......@@ -121,7 +121,7 @@ Query OK, 5 row(s) in set (0.001521s)
如果一个时间间隔里,没有采集的数据,TDengine 还提供插值计算的功能。
语法规则细节请见 [TAOS SQL 的按时间窗口切分聚合](/taos-sql/interval) 章节。
语法规则细节请见 [TAOS SQL 的按时间窗口切分聚合](../../taos-sql/distinguished) 章节。
## 示例代码
......
......@@ -29,100 +29,77 @@ stream_options: {
首先准备数据,完成建库、建一张超级表和多张子表操作
```sql
drop database if exists stream_db;
create database stream_db;
DROP DATABASE IF EXISTS power;
CREATE DATABASE power;
USE power;
create stable stream_db.meters (ts timestamp, current float, voltage int) TAGS (location varchar(64), groupId int);
CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
create table stream_db.d1001 using stream_db.meters tags("beijing", 1);
create table stream_db.d1002 using stream_db.meters tags("guangzhou", 2);
create table stream_db.d1003 using stream_db.meters tags("shanghai", 3);
CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2);
CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3);
CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2);
CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);
```
### 创建流
```sql
create stream stream1 into stream_db.stream1_output_stb as select _wstart as start, _wend as end, max(current) as max_current from stream_db.meters where voltage <= 220 and ts > now - 12h interval (1h);
create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 and ts > now - 12h interval (1h);
```
### 写入数据
```sql
insert into stream_db.d1001 values(now-14h, 10.3, 210);
insert into stream_db.d1001 values(now-13h, 13.5, 216);
insert into stream_db.d1001 values(now-12h, 12.5, 219);
insert into stream_db.d1002 values(now-11h, 14.7, 221);
insert into stream_db.d1002 values(now-10h, 10.5, 218);
insert into stream_db.d1002 values(now-9h, 11.2, 220);
insert into stream_db.d1003 values(now-8h, 11.5, 217);
insert into stream_db.d1003 values(now-7h, 12.3, 227);
insert into stream_db.d1003 values(now-6h, 12.3, 215);
insert into d1001 values(now-13h, 10.30000, 219, 0.31000);
insert into d1001 values(now-11h, 12.60000, 218, 0.33000);
insert into d1001 values(now-10h, 12.30000, 221, 0.31000);
insert into d1002 values(now-9h, 10.30000, 218, 0.25000);
insert into d1003 values(now-8h, 11.80000, 221, 0.28000);
insert into d1003 values(now-7h, 13.40000, 223, 0.29000);
insert into d1004 values(now-6h, 10.80000, 223, 0.29000);
insert into d1004 values(now-5h, 11.50000, 221, 0.35000);
```
### 查询以观查结果
```sql
taos> select * from stream_db.stream1_output_stb;
start | end | max_current | group_id |
===================================================================================================
2022-08-09 14:00:00.000 | 2022-08-09 15:00:00.000 | 10.50000 | 0 |
2022-08-09 15:00:00.000 | 2022-08-09 16:00:00.000 | 11.20000 | 0 |
2022-08-09 16:00:00.000 | 2022-08-09 17:00:00.000 | 11.50000 | 0 |
2022-08-09 18:00:00.000 | 2022-08-09 19:00:00.000 | 12.30000 | 0 |
Query OK, 4 rows in database (0.012033s)
taos> select start, end, max_current from current_stream_output_stb;
start | end | max_current |
===========================================================================
2022-08-12 04:00:00.000 | 2022-08-12 05:00:00.000 | 12.60000 |
2022-08-12 06:00:00.000 | 2022-08-12 07:00:00.000 | 10.30000 |
Query OK, 2 rows in database (0.009580s)
```
## 示例二
某运营商平台要采集机房所有服务器的系统资源指标,包含 cpu、内存、网络延迟等,采集后需要对数据进行四舍五入运算,将地域和服务器名以下划线拼接,然后将结果按时间排序并以服务器名分组输出到新的数据表中。
### 创建 DB 和原始数据表
首先准备数据,完成建库、建一张超级表和多张子表操作
依然以示例一中的数据为基础,我们已经采集到了每个智能电表的电流和电压数据,现在要求出功率,并将地域和电表名以符号 "." 拼接,然后以电表名称分组输出到新的数据表中。
```sql
drop database if exists stream_db;
create database stream_db;
create stable stream_db.idc (ts timestamp, cpu float, mem float, latency float) TAGS (location varchar(64), groupId int);
### 创建 DB 和原始数据表
create table stream_db.server01 using stream_db.idc tags("beijing", 1);
create table stream_db.server02 using stream_db.idc tags("shanghai", 2);
create table stream_db.server03 using stream_db.idc tags("beijing", 2);
create table stream_db.server04 using stream_db.idc tags("tianjin", 3);
create table stream_db.server05 using stream_db.idc tags("shanghai", 1);
```
参考示例一 [创建 DB 和原始数据表](#创建-db-和原始数据表)
### 创建流
```sql
create stream stream2 into stream_db.stream2_output_stb as select ts, concat_ws("_", location, tbname) as server_location, round(cpu) as cpu, round(mem) as mem, round(latency) as latency from stream_db.idc partition by tbname order by ts;
create stream power_stream into power_stream_output_stb as select ts, concat_ws(".", location, tbname) as meter_location, current*voltage as meter_power from meters partition by tbname;
```
### 写入数据
```sql
insert into stream_db.server01 values(now-14h, 50.9, 654.8, 23.11);
insert into stream_db.server01 values(now-13h, 13.5, 221.2, 11.22);
insert into stream_db.server02 values(now-12h, 154.7, 218.3, 22.33);
insert into stream_db.server02 values(now-11h, 120.5, 111.5, 5.55);
insert into stream_db.server03 values(now-10h, 101.5, 125.6, 5.99);
insert into stream_db.server03 values(now-9h, 12.3, 165.6, 6.02);
insert into stream_db.server04 values(now-8h, 160.9, 120.7, 43.51);
insert into stream_db.server04 values(now-7h, 240.9, 520.7, 54.55);
insert into stream_db.server05 values(now-6h, 190.9, 320.7, 55.43);
insert into stream_db.server05 values(now-5h, 110.9, 600.7, 35.54);
```
参考示例一 [写入数据](#写入数据)
### 查询以观查结果
```sql
taos> select ts, server_location, cpu, mem, latency from stream_db.stream2_output_stb;
ts | server_location | cpu | mem | latency |
================================================================================================================================
2022-08-09 21:24:56.785 | beijing_server01 | 51.00000 | 655.00000 | 23.00000 |
2022-08-09 22:24:56.795 | beijing_server01 | 14.00000 | 221.00000 | 11.00000 |
2022-08-09 23:24:56.806 | shanghai_server02 | 155.00000 | 218.00000 | 22.00000 |
2022-08-10 00:24:56.815 | shanghai_server02 | 121.00000 | 112.00000 | 6.00000 |
2022-08-10 01:24:56.826 | beijing_server03 | 102.00000 | 126.00000 | 6.00000 |
2022-08-10 02:24:56.838 | beijing_server03 | 12.00000 | 166.00000 | 6.00000 |
2022-08-10 03:24:56.846 | tianjin_server04 | 161.00000 | 121.00000 | 44.00000 |
2022-08-10 04:24:56.853 | tianjin_server04 | 241.00000 | 521.00000 | 55.00000 |
2022-08-10 05:24:56.866 | shanghai_server05 | 191.00000 | 321.00000 | 55.00000 |
2022-08-10 06:24:57.301 | shanghai_server05 | 111.00000 | 601.00000 | 36.00000 |
Query OK, 10 rows in database (0.022950s)
```
taos> select ts, meter_location, meter_power from power_stream_output_stb;
ts | meter_location | meter_power |
=======================================================================================
2022-08-12 07:44:47.817 | California.SanFrancisco.d1002 | 2245.400041580 |
2022-08-12 08:44:47.826 | California.LosAngeles.d1003 | 2607.800042152 |
2022-08-12 09:44:47.833 | California.LosAngeles.d1003 | 2988.199914932 |
2022-08-12 03:44:47.791 | California.SanFrancisco.d1001 | 2255.700041771 |
2022-08-12 05:44:47.800 | California.SanFrancisco.d1001 | 2746.800083160 |
2022-08-12 06:44:47.809 | California.SanFrancisco.d1001 | 2718.300042152 |
2022-08-12 10:44:47.840 | California.LosAngeles.d1004 | 2408.400042534 |
2022-08-12 11:44:48.379 | California.LosAngeles.d1004 | 2541.500000000 |
Query OK, 8 rows in database (0.014788s)
```
\ No newline at end of file
......@@ -4,6 +4,17 @@ description: "数据订阅与推送服务。写入到 TDengine 中的时序数
title: 数据订阅
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
......@@ -51,7 +62,7 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
```
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码可以在 [tmq.c](https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c) 看到
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码
## 写入数据
......@@ -62,13 +73,9 @@ drop database if exists tmqdb;
create database tmqdb;
create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16) tags(t1 int, t3 varchar(16));
create table tmqdb.ctb0 using tmqdb.stb tags(0, "subtable0");
create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1");
create table tmqdb.ctb2 using tmqdb.stb tags(2, "subtable2");
create table tmqdb.ctb3 using tmqdb.stb tags(3, "subtable3");
create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1");
insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22');
insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33');
```
## 创建topic:
......@@ -130,7 +137,6 @@ TMQ支持多种订阅类型:
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return tmq;
```
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
......@@ -143,66 +149,23 @@ TMQ支持多种订阅类型:
```sql
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
return topicList;
```
## 启动订阅并开始消费
```sql
```
/* 启动订阅 */
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
/* 循环poll消息 */
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeOut = 5000;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
} else {
break;
}
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
msg_process(tmqmsg);
}
```
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析:
```sql
static int32_t msg_process(TAOS_RES* msg) {
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf);
}
return rows;
}
```
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。
## 结束消费
......@@ -243,4 +206,44 @@ TMQ支持多种订阅类型:
show subscriptions;
```
## 示例代码
本节展示各种语言的示例代码。
<Tabs>
<TabItem label="C" value="c">
```c
{{#include examples/c/tmq.c}}
```
</TabItem>
<TabItem label="Java" value="java">
<Java />
</TabItem>
<TabItem label="Go" value="Go">
<Go/>
</TabItem>
<TabItem label="Rust" value="Rust">
<Rust />
</TabItem>
<TabItem label="Python" value="Python">
```python
{{#include docs/examples/python/tmq_example.py}}
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
<Node/>
</TabItem>
<TabItem label="C#" value="C#">
<CSharp/>
</TabItem>
</Tabs>
......@@ -26,10 +26,19 @@ subquery: SELECT [DISTINCT] select_list
[WHERE condition]
[PARTITION BY tag_list]
[window_clause]
[group_by_clause]
```
不支持 order_by,limit,slimit,fill 语句
支持会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与partition by tbname一起使用
```sql
window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
}
```
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
......@@ -88,23 +97,3 @@ T = 最新事件时间 - watermark
2. 重新计算:从 TSDB 中重新查找对应窗口的所有数据并重新计算得到最新结果
无论在哪种模式下,watermark 都应该被妥善设置,来得到正确结果(直接丢弃模式)或避免频繁触发重算带来的性能开销(重新计算模式)。
## 流式计算与会话窗口(session window)
```sql
window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
}
```
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
## 流式计算的暂停与恢复
```sql
STOP STREAM stream_name;
RESUME STREAM stream_name;
```
......@@ -8,18 +8,13 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
## 支持的平台
目前 TDengine 的原生接口连接器可支持的平台包括:X64/X86/ARM64/ARM32/MIPS/Alpha 等硬件平台,以及 Linux/Win64/Win32 等开发环境。对照矩阵如下:
目前 TDengine 的原生接口连接器可支持的平台包括:X64/ARM64 等硬件平台,以及 Linux/Win64 等开发环境。对照矩阵如下:
| **CPU** | **OS** | **Java** | **Python** | **Go** | **Node.js** | **C#** | **Rust** | C/C++ |
| -------------- | --------- | -------- | ---------- | ------ | ----------- | ------ | -------- | ----- |
| **X86 64bit** | **Linux** | ● | ● | ● | ● | ● | ● | ● |
| **X86 64bit** | **Win64** | ● | ● | ● | ● | ● | ● | ● |
| **X86 64bit** | **Win32** | ● | ● | ● | ● | ○ | ○ | ● |
| **X86 32bit** | **Win32** | ○ | ○ | ○ | ○ | ○ | ○ | ● |
| **ARM64** | **Linux** | ● | ● | ● | ● | ○ | ○ | ● |
| **MIPS 龙芯** | **Linux** | ○ | ○ | ○ | ○ | ○ | ○ | ○ |
| **Alpha 申威** | **Linux** | ○ | ○ | -- | -- | -- | -- | ○ |
| **X86 海光** | **Linux** | ○ | ○ | ○ | -- | -- | -- | ○ |
其中 ● 表示官方测试验证通过,○ 表示非官方测试验证通过,-- 表示未经验证。
......
......@@ -5,11 +5,11 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
## TDengine 服务端支持的平台列表
| | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18/20** | **Other Linux** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **华为 EulerOS** |
| ------------ | ----------------- | ---------------- | ---------------- | --------------- | ------------ | ----------------- | ---------------- | ---------------- |
| X64 | ● | ● | ● | | ● | ● | ● | |
| 树莓派 ARM64 | | | | ● | | | | |
| 华为云 ARM64 | | | | | | | | ● |
| | **Windows server 2016/2019** | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18/20** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** |
| ------------ | ---------------------------- | ----------------- | ---------------- | ---------------- | ------------ | ----------------- | ---------------- |
| X64 | ● | ● | ● | ● | ● | ● | ● |
| 树莓派 ARM64 | | | ● | | | | |
| 华为云 ARM64 | | | | ● | | | |
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。
......@@ -19,15 +19,15 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
对照矩阵如下:
| **CPU** | **X64 64bit** | | | **X86 32bit** | **ARM64** | **MIPS 龙芯** | **Alpha 申威** | **X64 海光** |
| ----------- | ------------- | --------- | --------- | ------------- | --------- | ------------- | -------------- | ------------ |
| **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** | **Linux** |
| **C/C++** | ● | ● | ● | ○ | ● | ● | ● | ● |
| **JDBC** | ● | ● | ● | ○ | ● | ● | ● | ● |
| **Python** | ● | ● | ● | ○ | ● | ● | -- | ● |
| **Go** | ● | ● | ● | ○ | ● | ○ | -- | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ○ | -- | -- |
| **C#** | ● | ● | ○ | ○ | ○ | ○ | -- | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● |
| **CPU** | **X64 64bit** | **X64 64bit** | **ARM64** |
| ----------- | ------------- | ------------- | --------- |
| **OS** | **Linux** | **Win64** | **Linux** |
| **C/C++** | ● | ● | ● |
| **JDBC** | ● | ● | ● |
| **Python** | ● | ● | ● |
| **Go** | ● | ● | ● |
| **NodeJs** | ● | ● | ● |
| **C#** | ● | ● | ○ |
| **RESTful** | ● | ● | ● |
注:● 表示官方测试验证通过,○ 表示非官方测试验证通过,-- 表示未经验证。
......@@ -162,8 +162,6 @@ Vnode 会保持一个数据版本号(version),对内存数据进行持久
一个 vnode 启动时,角色(leader、follower)是不定的,数据是处于未同步状态,它需要与虚拟节点组内其他节点建立 TCP 连接,并互相交换 status,按照标准的 raft 一致性算法完成选主。
更多的关于数据复制的流程,请见[《TDengine 3.0 数据复制模块设计》](/tdinternal/replica/)
### 同步复制
对于数据一致性要求更高的场景,异步数据复制提供的最终一致性无法满足要求。因此 TDengine 提供同步复制的机制供用户选择。在创建数据库时,除指定副本数 replica 之外,用户还需要指定新的参数 strict。如果 strict 等于 1,它表示每次 leader 转发给副本时,需要等待半数以上副本达成一致后,才能通知应用,数据在 follower 已经写入成功。如果在一定的时间内,得不到半数以上副本的确认,leader vnode 将返回错误给应用。
......@@ -241,15 +239,16 @@ dataDir /mnt/data6 2 0
## 数据查询
TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、mnode 节点协同完成。
### 单表查询
SQL 语句的解析和校验工作在客户端完成。解析 SQL 语句并生成抽象语法树(Abstract Syntax Tree,AST),然后对其进行校验和检查。以及向管理节点(mnode)请求查询中指定表的元数据信息(table metadata)。
TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、qnode、mnode 节点协同完成,一个复杂的超级表聚合查询可能需要多个 vnode 和 qnode 节点公共分担查询和计算任务。
根据元数据信息中的 End Point 信息,将查询请求序列化后发送到该表所在的数据节点(dnode)。dnode 接收到查询请求后,识别出该查询请求指向的虚拟节点(vnode),将消息转发到 vnode 的查询执行队列。vnode 的查询执行线程建立基础的查询执行环境,并立即返回该查询请求,同时开始执行该查询。
### 查询基本流程
客户端在获取查询结果的时候,dnode 的查询执行队列中的工作线程会等待 vnode 执行线程执行完成,才能将查询结果返回到请求的客户端。
1. 客户端解析输入 SQL 语句并生成抽象语法树(Abstract Syntax Tree,AST),然后根据元数据信息对其进行校验和检查。在此期间,元数据管理模块(Catalog)会向管理节点(mnode)或 vnode 请求查询中指定库和表的元数据信息(table metadata)。
2. 在通过校验检查后,客户端将生成分布式的查询计划并对查询计划进行优化处理。
3. 客户端根据配置的查询策略进行任务调度处理,一个查询子任务会根据其数据亲缘关系或负载信息调度到某个 vnode 或 qnode 所属的数据节点(dnode)进行处理。
4. dnode 接收到查询请求后,识别出该查询请求指向的虚拟节点(vnode)或查询节点(qnode),将消息转发到 vnode 或 qnode 的查询执行队列。
5. vnode 或 qnode 的查询执行线程建立基础的查询执行环境,并立即执行该查询,在得到部分可获取查询结果后通知客户端。
6. 客户端将启动下级查询任务或直接获取查询结果。
### 按时间轴聚合、降采样、插值
......@@ -279,12 +278,14 @@ TDengine 对每个数据采集点单独建表,但在实际应用中经常需
<center> 图 5 多表聚合查询原理图 </center>
1. 应用将一个查询条件发往系统;
2. taosc 将超级表的名字发往 meta node(管理节点);
3. 管理节点将超级表所拥有的 vnode 列表发回 taosc;
4. taosc 将计算的请求连同标签过滤条件发往这些 vnode 对应的多个数据节点;
5. 每个 vnode 先在内存里查找出自己节点里符合标签过滤条件的表的集合,然后扫描存储的时序数据,完成相应的聚合计算,将结果返回给 taosc;
6. taosc 将多个数据节点返回的结果做最后的聚合,将其返回给应用。
1. 客户端从 mnode 获取库和表的元数据信息;
2. mnode 返回请求的元数据信息;
3. 客户端向超级表所属的每个 vnode 发送查询请求;
4. vnode 启动本地查询,在获得查询结果后返回查询响应;
5. 客户端向聚合节点 (在本例中为 qnode)发送查询请求;
6. qnode 向每个 vnode 节点发送数据请求消息来拉取数据;
7. vnode 返回本节点的查询计算结果;
8. qnode 完成多节点数据聚合后将最终查询结果返回给客户端;
由于 TDengine 在 vnode 内将标签数据与时序数据分离存储,通过在内存里过滤标签数据,先找到需要参与聚合操作的表的集合,将需要扫描的数据集大幅减少,大幅提升聚合计算速度。同时,由于数据分布在多个 vnode/dnode,聚合计算操作在多个 vnode 里并发进行,又进一步提升了聚合的速度。 对普通表的聚合函数以及绝大部分操作都适用于超级表,语法完全一样,细节请看 TAOS SQL。
......
......@@ -359,7 +359,7 @@ typedef enum ELogicConditionType {
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
#define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1
#define TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD (24 * 60 * 60 * 2)
#define TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD (24 * 60 * 60 * 4)
#define TSDB_DB_MIN_WAL_RETENTION_SIZE -1
#define TSDB_DEFAULT_DB_WAL_RETENTION_SIZE -1
#define TSDB_DB_MIN_WAL_ROLL_PERIOD 0
......
......@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(clientTest clientTests.cpp)
TARGET_LINK_LIBRARIES(
clientTest
PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom executor
os util common transport parser catalog scheduler gtest taos_static qcom executor function
)
ADD_EXECUTABLE(tmqTest tmqTest.cpp)
......
......@@ -238,7 +238,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
} else {
memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
}
if (mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
return -1;
}
......
......@@ -1105,6 +1105,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
SName* pName = ctgGetFetchName(ctx->pNames, pFetch);
int32_t flag = pFetch->flag;
int32_t* vgId = &pFetch->vgId;
bool taskDone = false;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
......@@ -1250,6 +1251,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
pOut->tbMeta = NULL;
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
taskDone = true;
}
_return:
......@@ -1264,10 +1266,11 @@ _return:
pRes->pRes = NULL;
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
taskDone = true;
}
}
if (pTask->res) {
if (pTask->res && taskDone) {
ctgHandleTaskEnd(pTask, code);
}
......@@ -1354,6 +1357,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
SCatalog* pCtg = pTask->pJob->pCtg;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
bool taskDone = false;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
......@@ -1377,6 +1381,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
taskDone = true;
}
_return:
......@@ -1392,10 +1397,11 @@ _return:
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
taskDone = true;
}
}
if (pTask->res) {
if (pTask->res && taskDone) {
ctgHandleTaskEnd(pTask, code);
}
......
......@@ -581,6 +581,20 @@ _return:
}
int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
char *p = strchr(dbFName, '.');
if (p) {
++p;
} else {
p = dbFName;
}
if (IS_SYS_DBNAME(p)) {
*inCache = true;
*pass = true;
ctgDebug("sysdb %s, pass", dbFName);
return TSDB_CODE_SUCCESS;
}
SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user));
if (NULL == pUser) {
ctgDebug("user not in cache, user:%s", user);
......
......@@ -716,7 +716,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
taosArrayDestroy(pBlockList);
}
} else {
ASSERT(0);
return TSDB_CODE_OPS_NOT_SUPPORT;
}
}
......
......@@ -2037,7 +2037,17 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
}
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
if (TSDB_CODE_SUCCESS == code &&
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
isSelectStmt(pCxt->pCurrStmt) &&
0 == taosArrayGetSize(vgroupList)) {
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
}
if (TSDB_CODE_SUCCESS == code &&
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
}
......
......@@ -33,7 +33,8 @@ target_link_libraries (transportTest
util
common
gtest_main
transport
transport
function
)
target_link_libraries (transUT
......
......@@ -111,7 +111,7 @@ endi
if $data21_db != 1000 then # wal_level fsyncperiod
return -1
endi
if $data22_db != 172800 then # wal_retention_period
if $data22_db != 345600 then # wal_retention_period
return -1
endi
if $data23_db != -1 then # wal_retention_size
......
......@@ -45,19 +45,19 @@ sql_error drop database db
sql_error use db
sql_error alter database db replica 1;
sql_error show db.vgroups
sql_error select * from information_schema.ins_stables where db_name = 'db'
sql_error select * from information_schema.ins_tables where db_name = 'db'
sql select * from information_schema.ins_stables where db_name = 'db'
sql select * from information_schema.ins_tables where db_name = 'db'
print =============== check show
sql_error select * from information_schema.ins_users
sql select * from information_schema.ins_users
sql_error show cluster
sql_error select * from information_schema.ins_dnodes
sql_error select * from information_schema.ins_mnodes
sql select * from information_schema.ins_dnodes
sql select * from information_schema.ins_mnodes
sql_error show snodes
sql_error select * from information_schema.ins_qnodes
sql select * from information_schema.ins_qnodes
sql_error show bnodes
sql_error show grants
sql_error show dnode 1 variables;
sql show variables;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册