Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6d4c2239
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
6d4c2239
编写于
8月 23, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into refact/tsdb_optimize
上级
478d5ebb
25ffc934
变更
14
显示空白变更内容
内联
并排
Showing
14 changed file
with
80 addition
and
100 deletion
+80
-100
docs/zh/14-reference/07-tdinsight/assets/TDinsight-1-cluster-status.webp
...rence/07-tdinsight/assets/TDinsight-1-cluster-status.webp
+0
-0
docs/zh/14-reference/07-tdinsight/assets/TDinsight-2-dnodes.webp
.../14-reference/07-tdinsight/assets/TDinsight-2-dnodes.webp
+0
-0
docs/zh/14-reference/07-tdinsight/assets/TDinsight-3-mnodes.webp
.../14-reference/07-tdinsight/assets/TDinsight-3-mnodes.webp
+0
-0
docs/zh/14-reference/07-tdinsight/assets/TDinsight-4-requests.webp
...4-reference/07-tdinsight/assets/TDinsight-4-requests.webp
+0
-0
docs/zh/14-reference/07-tdinsight/assets/TDinsight-5-database.webp
...4-reference/07-tdinsight/assets/TDinsight-5-database.webp
+0
-0
docs/zh/14-reference/07-tdinsight/assets/TDinsight-8-taosadapter.webp
...eference/07-tdinsight/assets/TDinsight-8-taosadapter.webp
+0
-0
docs/zh/14-reference/07-tdinsight/assets/howto-add-datasource.webp
...4-reference/07-tdinsight/assets/howto-add-datasource.webp
+0
-0
docs/zh/14-reference/07-tdinsight/assets/import_dashboard.webp
...zh/14-reference/07-tdinsight/assets/import_dashboard.webp
+0
-0
docs/zh/14-reference/07-tdinsight/assets/import_dashboard_view.webp
...-reference/07-tdinsight/assets/import_dashboard_view.webp
+0
-0
docs/zh/14-reference/07-tdinsight/assets/select_dashboard_db.webp
...14-reference/07-tdinsight/assets/select_dashboard_db.webp
+0
-0
docs/zh/14-reference/07-tdinsight/index.mdx
docs/zh/14-reference/07-tdinsight/index.mdx
+45
-70
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+11
-4
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+23
-25
未找到文件。
docs/zh/14-reference/07-tdinsight/assets/TDinsight-1-cluster-status.webp
浏览文件 @
6d4c2239
无法预览此类型文件
docs/zh/14-reference/07-tdinsight/assets/TDinsight-2-dnodes.webp
浏览文件 @
6d4c2239
无法预览此类型文件
docs/zh/14-reference/07-tdinsight/assets/TDinsight-3-mnodes.webp
浏览文件 @
6d4c2239
无法预览此类型文件
docs/zh/14-reference/07-tdinsight/assets/TDinsight-4-requests.webp
浏览文件 @
6d4c2239
无法预览此类型文件
docs/zh/14-reference/07-tdinsight/assets/TDinsight-5-database.webp
浏览文件 @
6d4c2239
无法预览此类型文件
docs/zh/14-reference/07-tdinsight/assets/TDinsight-8-taosadapter.webp
浏览文件 @
6d4c2239
无法预览此类型文件
docs/zh/14-reference/07-tdinsight/assets/howto-add-datasource.webp
浏览文件 @
6d4c2239
无法预览此类型文件
docs/zh/14-reference/07-tdinsight/assets/import_dashboard.webp
浏览文件 @
6d4c2239
无法预览此类型文件
docs/zh/14-reference/07-tdinsight/assets/import_dashboard_view.webp
0 → 100644
浏览文件 @
6d4c2239
文件已添加
docs/zh/14-reference/07-tdinsight/assets/select_dashboard_db.webp
0 → 100644
浏览文件 @
6d4c2239
文件已添加
docs/zh/14-reference/07-tdinsight/index.md
→
docs/zh/14-reference/07-tdinsight/index.md
x
浏览文件 @
6d4c2239
...
...
@@ -3,19 +3,28 @@ title: TDinsight - 基于Grafana的TDengine零依赖监控解决方案
sidebar_label: TDinsight
---
TDinsight 是使用
内置
监控数据库和 [Grafana] 对 TDengine 进行监控的解决方案。
TDinsight 是使用监控数据库和 [Grafana] 对 TDengine 进行监控的解决方案。
TDengine
启动后,会自动创建一个监测数据库
`log`
,并自动将服务器的 CPU、内存、硬盘空间、带宽、请求数、磁盘读写速度、慢查询等信息定时写入该
数据库,并对重要的系统操作(比如登录、创建、删除数据库等)以及各种错误报警信息进行记录。通过
[
Grafana] 和 [TDengine 数据源插件
](
https://github.com/taosdata/grafanaplugin/releases
)
,TDinsight 将集群状态、节点信息、插入及查询请求、资源使用情况等进行可视化展示,同时还支持 vnode、dnode、mnode 节点状态异常告警,为开发者实时监控 TDengine 集群运行状态提供了便利。本文将指导用户安装 Grafana 服务器并通过
`TDinsight.sh`
安装脚本自动安装 TDengine 数据源插件及部署 TDinsight 可视化面板。
TDengine
通过 [taosKeeper](../../taosKeeper) 将服务器的 CPU、内存、硬盘空间、带宽、请求数、磁盘读写速度、慢查询等信息定时写入指定
数据库,并对重要的系统操作(比如登录、创建、删除数据库等)以及各种错误报警信息进行记录。通过 [Grafana] 和 [TDengine 数据源插件](https://github.com/taosdata/grafanaplugin/releases),TDinsight 将集群状态、节点信息、插入及查询请求、资源使用情况等进行可视化展示,同时还支持 vnode、dnode、mnode 节点状态异常告警,为开发者实时监控 TDengine 集群运行状态提供了便利。本文将指导用户安装 Grafana 服务器并通过 `TDinsight.sh` 安装脚本自动安装 TDengine 数据源插件及部署 TDinsight 可视化面板。
## 系统要求
要部署 TDinsight,需要一个单节点的 TDengine 服务器或一个多节点的 [TDengine] 集群,以及一个[Grafana]服务器。此仪表盘需要 TDengine 2.3.3.0 及以上,并启用
`log`
数据库(
`monitor = 1`
)。
- 单节点的 TDengine 服务器或多节点的 [TDengine] 集群,以及一个[Grafana]服务器。此仪表盘需要 TDengine 3.0.0.0 及以上,并开启监控服务,具体配置请参考:[TDengine 监控配置](../config/#监控相关)。
- taosAdapter 已经安装并正常运行。具体细节请参考:[taosAdapter 使用手册](../../taosadapter)
- taosKeeper 已安装并正常运行。具体细节请参考:[taosKeeper 使用手册](../../taosKeeper)
记录以下信息:
- taosAdapter 集群 REST API 地址,如:`http://tdengine.local:6041`。
- taosAdapter 集群认证信息,可使用用户名及密码。
- taosKeeper 记录监控指标的数据库名称。
## 安装 Grafana
我们建议在此处使用最新的
[
Grafana]
7 或 8
版本。您可以在任何[支持的操作系统
](
https://grafana.com/docs/grafana/latest/installation/requirements/#supported-operating-systems
)
中,按照
[
Grafana 官方文档安装说明
](
https://grafana.com/docs/grafana/latest/installation/
)
安装 [Grafana]。
我们建议在此处使用最新的[Grafana]
8 或 9
版本。您可以在任何[支持的操作系统](https://grafana.com/docs/grafana/latest/installation/requirements/#supported-operating-systems)中,按照 [Grafana 官方文档安装说明](https://grafana.com/docs/grafana/latest/installation/) 安装 [Grafana]。
### 在 Debian 或 Ubuntu 上安装 Grafana
<Tabs defaultValue="debian" groupId="install">
<TabItem value="debian" label="基于 Debian 或 Ubuntu 系统">
对于 Debian 或 Ubuntu 操作系统,建议使用 Grafana 镜像仓库。使用如下命令从零开始安装:
...
...
@@ -31,6 +40,8 @@ sudo apt-get install grafana
```
### 在 CentOS / RHEL 上安装 Grafana
</TabItem>
<TabItem label="redhat" value="基于 CentOS / RHEL 系统">
您可以从官方 YUM 镜像仓库安装。
...
...
@@ -59,7 +70,12 @@ sudo yum install \
https://dl.grafana.com/oss/release/grafana-7.5.11-1.x86_64.rpm
```
## 自动部署 TDinsight
</TabItem>
</Tabs>
<Tabs defaultValue="auto" groupId="deploy">
<TabItem value="auto" label="自动部署 TDinsight">
我们提供了一个自动化安装脚本 [`TDinsight.sh`](https://github.com/taosdata/grafanaplugin/releases/latest/download/TDinsight.sh) 脚本以便用户快速进行安装配置。
...
...
@@ -71,7 +87,7 @@ chmod +x TDinsight.sh
./TDinsight.sh
```
这个脚本会自动下载最新的
[
Grafana TDengine 数据源插件
](
https://github.com/taosdata/grafanaplugin/releases/latest
)
和
[
TDinsight 仪表盘
](
https://g
rafana.com/grafana/dashboards/15167
)
,将命令行选项中的可配置参数转为
[
Grafana Provisioning
](
https://grafana.com/docs/grafana/latest/administration/provisioning/
)
配置文件,以进行自动化部署及更新等操作。利用该脚本提供的告警设置选项,你还可以获得内置的阿里云短信告警通知支持。
这个脚本会自动下载最新的[Grafana TDengine 数据源插件](https://github.com/taosdata/grafanaplugin/releases/latest) 和 [TDinsight 仪表盘](https://g
ithub.com/taosdata/grafanaplugin/blob/master/dashboards/TDinsightV3.json
) ,将命令行选项中的可配置参数转为 [Grafana Provisioning](https://grafana.com/docs/grafana/latest/administration/provisioning/) 配置文件,以进行自动化部署及更新等操作。利用该脚本提供的告警设置选项,你还可以获得内置的阿里云短信告警通知支持。
假设您在同一台主机上使用 TDengine 和 Grafana 的默认服务。运行 `./TDinsight.sh` 并打开 Grafana 浏览器窗口就可以看到 TDinsight 仪表盘了。
...
...
@@ -106,18 +122,6 @@ Install and configure TDinsight dashboard in Grafana on Ubuntu 18.04/20.04 syste
-E, --external-notifier <string> Apply external notifier uid to TDinsight dashboard.
Aliyun SMS as Notifier:
-s, --sms-enabled To enable tdengine-datasource plugin builtin Aliyun SMS webhook.
-N, --sms-notifier-name <string> Provisioning notifier name.[default: TDinsight Builtin SMS]
-U, --sms-notifier-uid <string> Provisioning notifier uid, use lowercase notifier name by default.
-D, --sms-notifier-is-default Set notifier as default.
-I, --sms-access-key-id <string> Aliyun SMS access key id
-K, --sms-access-key-secret <string> Aliyun SMS access key secret
-S, --sms-sign-name <string> Sign name
-C, --sms-template-code <string> Template code
-T, --sms-template-param <string> Template param, a escaped JSON string like '{"alarm_level":"%s","time":"%s","name":"%s","content":"%s"}'
-B, --sms-phone-numbers <string> Comma-separated numbers list, eg "189xxxxxxxx,132xxxxxxxx"
-L, --sms-listen-addr <string> [default: 127.0.0.1:9100]
```
大多数命令行选项都可以通过环境变量获得同样的效果。
...
...
@@ -136,17 +140,6 @@ Aliyun SMS as Notifier:
| -t | --tdinsight-title | TDINSIGHT_DASHBOARD_TITLE | TDinsight 仪表盘标题。 [默认:TDinsight] |
| -e | --tdinsight-可编辑 | TDINSIGHT_DASHBOARD_EDITABLE | 如果配置仪表盘可以编辑。 [默认值:false] |
| -E | --external-notifier | EXTERNAL_NOTIFIER | 将外部通知程序 uid 应用于 TDinsight 仪表盘。 |
| -s | --sms-enabled | SMS_ENABLED | 启用阿里云短信 webhook 内置的 tdengine-datasource 插件。 |
| -N | --sms-notifier-name | SMS_NOTIFIER_NAME | 供应通知程序名称。[默认:
`TDinsight Builtin SMS`
] |
| -U | --sms-notifier-uid | SMS_NOTIFIER_UID | "Notification Channel"
`uid`
,默认使用程序名称的小写,其他字符用 “-” 代替。 |
| -D | --sms-notifier-is-default | SMS_NOTIFIER_IS_DEFAULT | 将内置短信通知设置为默认值。 |
| -I | --sms-access-key-id | SMS_ACCESS_KEY_ID | 阿里云短信访问密钥 id |
| -K | --sms-access-key-secret | SMS_ACCESS_KEY_SECRET | 阿里云短信访问秘钥 |
| -S | --sms-sign-name | SMS_SIGN_NAME | 签名 |
| -C | --sms-template-code | SMS_TEMPLATE_CODE | 模板代码 |
| -T | --sms-template-param | SMS_TEMPLATE_PARAM | 模板参数的 JSON 模板 |
| -B | --sms-phone-numbers | SMS_PHONE_NUMBERS | 逗号分隔的手机号列表,例如
`"189xxxxxxxx,132xxxxxxxx"`
|
| -L | --sms-listen-addr | SMS_LISTEN_ADDR | 内置 SMS webhook 监听地址,默认为
`127.0.0.1:9100`
|
假设您在主机 `tdengine` 上启动 TDengine 数据库,HTTP API 端口为 `6041`,用户为 `root1`,密码为 `pass5ord`。执行脚本:
...
...
@@ -166,31 +159,18 @@ curl --no-progress-meter -u admin:admin http://localhost:3000/api/alert-notifica
sudo ./TDinsight.sh -a http://tdengine:6041 -u root1 -p pass5ord -E existing-notifier
```
如果你想使用
[
阿里云短信
](
https://www.aliyun.com/product/sms
)
服务作为通知渠道,你应该使用
`-s`
标志启用并添加以下参数:
-
`-N`
:Notification Channel 名,默认为
`TDinsight Builtin SMS`
。
-
`-U`
:Channel uid,默认是
`name`
的小写,任何其他字符都替换为 - ,对于默认的
`-N`
,其 uid 为
`tdinsight-builtin-sms`
。
-
`-I`
:阿里云短信访问密钥 id。
-
`-K`
:阿里云短信访问秘钥。
-
`-S`
:阿里云短信签名。
-
`-C`
:阿里云短信模板 ID。
-
`-T`
:阿里云短信模板参数,为 JSON 格式模板,示例如下
`'{"alarm_level":"%s","time":"%s","name":"%s","content":"%s "}'`
。有四个参数:告警级别、时间、名称和告警内容。
-
`-B`
:电话号码列表,以逗号
`,`
分隔。
如果要监控多个 TDengine 集群,则需要设置多个 TDinsight 仪表盘。设置非默认 TDinsight 需要进行一些更改: `-n` `-i` `-t` 选项需要更改为非默认名称,如果使用 内置短信告警功能,`-N` 和 `-L` 也应该改变。
```bash
sudo ./TDengine.sh -n TDengine-Env1 -a http://another:6041 -u root -p taosdata -i tdinsight-env1 -t 'TDinsight Env1'
# 如果使用内置短信通知
sudo
./TDengine.sh
-n
TDengine-Env1
-a
http://another:6041
-u
root
-p
taosdata
-i
tdinsight-env1
-t
'TDinsight Env1'
\
-s
-N
'Env1 SMS'
-I
xx
-K
xx
-S
xx
-C
SMS_XX
-T
''
-B
00000000000
-L
127.0.0.01:10611
```
请注意,配置数据源、通知 Channel 和仪表盘在前端是不可更改的。您应该再次通过此脚本更新配置或手动更改 `/etc/grafana/provisioning` 目录(这是 Grafana 的默认目录,根据需要使用`-P`选项更改)中的配置文件。
特别地,当您使用 Grafana Cloud 或其他组织时,`-O` 可用于设置组织 ID。 `-G` 可指定 Grafana 插件安装目录。 `-e` 参数将仪表盘设置为可编辑。
## 手动设置 TDinsight
</TabItem>
<TabItem label="manual" value="手动设置 TDinsight">
### 安装 TDengine 数据源插件
...
...
@@ -247,23 +227,30 @@ sudo systemctl enable grafana-server
![TDengine Database TDinsight 数据源测试](./assets/howto-add-datasource-test.webp)
</TabItem>
</Tabs>
### 导入仪表盘
指向
**+**
/
**Create**
-
**import**
(或
`/dashboard/import`
url)
。
在配置 TDengine 数据源界面,点击 **Dashboards** tab
。
![TDengine Database TDinsight 导入仪表盘和配置](./assets/import_dashboard.webp)
在
**Import via grafana.com**
位置键入仪表盘 ID
`15167`
并
**Load**
。
选择 `TDengine for 3.x`,并点击 `import`。
导入完成后,在搜索界面已经出现了 **TDinsight for 3.x** dashboard。
![TDengine Database TDinsight 查看导入结果](./assets/import_dashboard_view.webp)
![
通过 grafana.com 导入
](
./assets/import-dashboard-15167.webp
)
进入 TDinsight for 3.x dashboard 后,选择 taosKeeper 中设置的记录监控指标的数据库。
导入完成后,TDinsight 的完整页面视图如下所示。
![TDengine Database TDinsight 选择数据库](./assets/select_dashboard_db.webp)
![
TDengine Database TDinsight 显示
](
./assets/TDinsight-full.webp
)
然后可以看到监控结果。
## TDinsight 仪表盘详细信息
TDinsight 仪表盘旨在提供 TDengine 相关资源
使用情况
[
dnodes, mnodes, vnodes
](
https://www.taosdata.com/cn/documentation/architecture#cluster
)
或数据库的使用情况和状态
。
TDinsight 仪表盘旨在提供 TDengine 相关资源
的使用情况和状态,比如 dnodes、 mnodes、 vnodes 和数据库等
。
指标详情如下:
...
...
@@ -285,7 +272,6 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
- **Measuring Points Used**:启用告警规则的测点数用量(社区版无数据,默认情况下是健康的)。
- **Grants Expire Time**:启用告警规则的企业版过期时间(社区版无数据,默认情况是健康的)。
- **Error Rate**:启用警报的集群总合错误率(每秒平均错误数)。
-
**Variables**
:
`show variables`
表格展示。
### DNodes 状态
...
...
@@ -294,7 +280,6 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
- **DNodes Status**:`show dnodes` 的简单表格视图。
- **DNodes Lifetime**:从创建 dnode 开始经过的时间。
- **DNodes Number**:DNodes 数量变化。
-
**Offline Reason**
:如果有任何 dnode 状态为离线,则以饼图形式展示离线原因。
### MNode 概述
...
...
@@ -309,7 +294,6 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
1. **Requests Rate(Inserts per Second)**:平均每秒插入次数。
2. **Requests (Selects)**:查询请求数及变化率(count of second)。
3.
**Requests (HTTP)**
:HTTP 请求数和请求速率(count of second)。
### 数据库
...
...
@@ -319,9 +303,8 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
1. **STables**:超级表数量。
2. **Total Tables**:所有表数量。
3.
**Sub Tables**
:所有超级表子表的数量。
4.
**Tables**
:所有普通表数量随时间变化图。
5.
**Tables Number Foreach VGroups**
:每个 VGroups 包含的表数量。
3. **Tables**:所有普通表数量随时间变化图。
4. **Tables Number Foreach VGroups**:每个 VGroups 包含的表数量。
### DNode 资源使用情况
...
...
@@ -356,12 +339,11 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
支持监控 taosAdapter 请求统计和状态详情。包括:
1.
**http_request**
: 包含总请求数,请求失败数以及正在处理的请求数
2.
**top 3 request endpoint**
: 按终端分组,请求排名前三的数据
3.
**Memory Used**
: taosAdapter 内存使用情况
4.
**latency_quantile(ms)**
: (1, 2, 5, 9, 99)阶段的分位数
5.
**top 3 failed request endpoint**
: 按终端分组,请求失败排名前三的数据
6.
**CPU Used**
: taosAdapter CPU 使用情况
1. **http_request_inflight**: 即时处理请求数
2. **http_request_total**: 请求总数。
3. **http_request_fail**: 请求总数。
4. **CPU Used**: taosAdapter CPU 使用情况。
5. **Memory Used**: taosAdapter 内存使用情况。
## 升级
...
...
@@ -403,13 +385,6 @@ services:
TDENGINE_API: ${TDENGINE_API}
TDENGINE_USER: ${TDENGINE_USER}
TDENGINE_PASS: ${TDENGINE_PASS}
SMS_ACCESS_KEY_ID
:
${SMS_ACCESS_KEY_ID}
SMS_ACCESS_KEY_SECRET
:
${SMS_ACCESS_KEY_SECRET}
SMS_SIGN_NAME
:
${SMS_SIGN_NAME}
SMS_TEMPLATE_CODE
:
${SMS_TEMPLATE_CODE}
SMS_TEMPLATE_PARAM
:
'
${SMS_TEMPLATE_PARAM}'
SMS_PHONE_NUMBERS
:
$SMS_PHONE_NUMBERS
SMS_LISTEN_ADDR
:
${SMS_LISTEN_ADDR}
ports:
- 3000:3000
volumes:
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
6d4c2239
...
...
@@ -1016,7 +1016,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool
isOverdue
(
TSKEY
ts
,
STimeWindowAggSupp
*
pSup
);
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
);
bool
isDeletedWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SAggSupporter
*
pSup
);
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
);
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
int32_t
uidCol
,
uint64_t
*
pID
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
6d4c2239
...
...
@@ -1086,7 +1086,10 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData
*
pDestStartCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestEndCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestUidCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pDestGpCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
int32_t
dummy
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSrcBlock
->
info
.
rows
;
i
++
)
{
uint64_t
groupId
=
getGroupId
(
pInfo
->
pTableScanOp
,
uidCol
[
i
]);
...
...
@@ -1100,9 +1103,13 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SResultWindowInfo
*
pEndWin
=
getCurSessionWindow
(
pInfo
->
sessionSup
.
pStreamAggSup
,
endData
[
i
],
endData
[
i
],
groupId
,
0
,
&
dummy
);
ASSERT
(
pEndWin
);
TSKEY
ts
=
INT64_MIN
;
colDataAppend
(
pDestStartCol
,
i
,
(
const
char
*
)
&
pStartWin
->
win
.
skey
,
false
);
colDataAppend
(
pDestEndCol
,
i
,
(
const
char
*
)
&
pEndWin
->
win
.
ekey
,
false
);
colDataAppendNULL
(
pDestUidCol
,
i
);
colDataAppend
(
pDestGpCol
,
i
,
(
const
char
*
)
&
groupId
,
false
);
colDataAppendNULL
(
pDestCalStartTsCol
,
i
);
colDataAppendNULL
(
pDestCalEndTsCol
,
i
);
pDestBlock
->
info
.
rows
++
;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1157,13 +1164,13 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return
code
;
}
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
)
{
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
int32_t
uidCol
,
uint64_t
*
pID
)
{
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pUidCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pUidCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
uidCol
);
colDataAppend
(
pStartTsCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
pStartTs
,
false
);
colDataAppend
(
pEndTsCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
pEndTs
,
false
);
colDataAppend
(
pUidCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
p
Uid
,
false
);
colDataAppend
(
pUidCol
,
pBlock
->
info
.
rows
,
(
const
char
*
)
p
ID
,
false
);
pBlock
->
info
.
rows
++
;
}
...
...
@@ -1190,7 +1197,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool
closedWin
=
isClosed
&&
isSignleIntervalWindow
(
pInfo
)
&&
isDeletedWindow
(
&
win
,
pBlock
->
info
.
groupId
,
pInfo
->
sessionSup
.
pIntervalAggSup
);
if
((
update
||
closedWin
)
&&
out
)
{
appendOneRow
(
pInfo
->
pUpdateDataRes
,
tsCol
+
rowId
,
tsCol
+
rowId
,
&
pBlock
->
info
.
uid
);
appendOneRow
(
pInfo
->
pUpdateDataRes
,
tsCol
+
rowId
,
tsCol
+
rowId
,
UID_COLUMN_INDEX
,
&
pBlock
->
info
.
uid
);
}
}
if
(
out
)
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
6d4c2239
...
...
@@ -3951,11 +3951,13 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
int32_t
numOfOutput
,
int64_t
gap
,
SArray
*
result
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
TSKEY
*
tsCols
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
SColumnInfoData
*
pGpDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
gpCols
=
(
uint64_t
*
)
pGpDataInfo
->
pData
;
int32_t
step
=
0
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getCurSessionWindow
(
pAggSup
,
tsCols
[
i
],
INT64_MIN
,
pBlock
->
info
.
groupId
,
gap
,
&
winIndex
);
getCurSessionWindow
(
pAggSup
,
tsCols
[
i
],
INT64_MIN
,
gpCols
[
i
]
,
gap
,
&
winIndex
);
if
(
!
pCurWin
||
pCurWin
->
pos
.
pageId
==
-
1
)
{
// window has been closed.
step
=
1
;
...
...
@@ -4168,13 +4170,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
0
,
pOperator
->
exprSupp
.
numOfExprs
,
0
,
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
pOperator
->
exprSupp
.
numOfExprs
,
0
,
pWins
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildOp
->
exprSupp
,
pBlock
,
0
,
pChildOp
->
exprSupp
.
numOfExprs
,
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildOp
->
exprSupp
,
pBlock
,
START_TS_COLUMN_INDEX
,
pChildOp
->
exprSupp
.
numOfExprs
,
0
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pBlock
->
info
.
groupId
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
);
}
...
...
@@ -4285,21 +4287,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"sem
s
session"
);
printDataBlock
(
pBInfo
->
pRes
,
"sem
i
session"
);
return
pBInfo
->
pRes
;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
&&
!
pInfo
->
returnDelete
)
{
pInfo
->
returnDelete
=
true
;
printDataBlock
(
pInfo
->
pDelRes
,
"sem
s
session"
);
printDataBlock
(
pInfo
->
pDelRes
,
"sem
i
session"
);
return
pInfo
->
pDelRes
;
}
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
// process the rest of the data
pOperator
->
status
=
OP_OPENED
;
printDataBlock
(
pInfo
->
pUpdateRes
,
"sem
s
session"
);
printDataBlock
(
pInfo
->
pUpdateRes
,
"sem
i
session"
);
return
pInfo
->
pUpdateRes
;
}
// semi interval operator clear disk buffer
...
...
@@ -4318,13 +4320,14 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearSpecialDataBlock
(
pInfo
->
pUpdateRes
);
break
;
}
printDataBlock
(
pBlock
,
"semi session recv"
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
pSup
,
pBlock
,
0
,
pSup
->
numOfExprs
,
0
,
pWins
);
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
pSup
,
pBlock
,
START_TS_COLUMN_INDEX
,
pSup
->
numOfExprs
,
0
,
pWins
);
removeSessionResults
(
pStUpdated
,
pWins
);
taosArrayDestroy
(
pWins
);
copy
UpdateDataBlock
(
pInfo
->
pUpdateRes
,
pBlock
,
pInfo
->
primaryTsIndex
);
copy
DataBlock
(
pInfo
->
pUpdateRes
,
pBlock
);
break
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
// gap must be 0
...
...
@@ -4364,21 +4367,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"sem
s
session"
);
printDataBlock
(
pBInfo
->
pRes
,
"sem
i
session"
);
return
pBInfo
->
pRes
;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
&&
!
pInfo
->
returnDelete
)
{
pInfo
->
returnDelete
=
true
;
printDataBlock
(
pInfo
->
pDelRes
,
"sem
s
session"
);
printDataBlock
(
pInfo
->
pDelRes
,
"sem
i
session"
);
return
pInfo
->
pDelRes
;
}
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
// process the rest of the data
pOperator
->
status
=
OP_OPENED
;
printDataBlock
(
pInfo
->
pUpdateRes
,
"sem
s
session"
);
printDataBlock
(
pInfo
->
pUpdateRes
,
"sem
i
session"
);
return
pInfo
->
pUpdateRes
;
}
...
...
@@ -4400,8 +4403,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pOperator
->
name
=
"StreamSessionFinalAggOperator"
;
}
else
{
pInfo
->
isFinal
=
false
;
pInfo
->
pUpdateRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
pUpdateRes
->
info
.
type
=
STREAM_CLEAR
;
pInfo
->
pUpdateRes
=
createSpecialDataBlock
(
STREAM_CLEAR
);
blockDataEnsureCapacity
(
pInfo
->
pUpdateRes
,
128
);
pOperator
->
name
=
"StreamSessionSemiAggOperator"
;
pOperator
->
fpSet
=
...
...
@@ -4616,23 +4618,20 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
}
static
void
doClearStateWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
SColumn
*
pCol
,
int32_t
keyIndex
,
SHashObj
*
pSeUpdated
,
SHashObj
*
pSeDeleted
)
{
SColumnInfoData
*
pTsColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
SColumnInfoData
*
p
KeyColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
keyIndex
);
SHashObj
*
pSeUpdated
,
SHashObj
*
pSeDeleted
)
{
SColumnInfoData
*
pTsColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
p
GroupColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
TSKEY
*
tsCol
=
(
TSKEY
*
)
pTsColInfo
->
pData
;
bool
allEqual
=
false
;
int32_t
step
=
1
;
uint64_t
groupId
=
pBlock
->
info
.
groupId
;
uint64_t
*
gpCol
=
(
uint64_t
*
)
pGroupColInfo
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
char
*
pKeyData
=
colDataGetData
(
pKeyColInfo
,
i
);
int32_t
winIndex
=
0
;
SStateWindowInfo
*
pCurWin
=
getStateWindowByTs
(
pAggSup
,
tsCol
[
i
],
g
roupId
,
&
winIndex
);
SStateWindowInfo
*
pCurWin
=
getStateWindowByTs
(
pAggSup
,
tsCol
[
i
],
g
pCol
[
i
]
,
&
winIndex
);
if
(
!
pCurWin
)
{
continue
;
}
step
=
updateStateWindowInfo
(
pAggSup
->
pCurWins
,
winIndex
,
tsCol
,
groupId
,
pKeyColInfo
,
pBlock
->
info
.
rows
,
i
,
&
allEqual
,
pSeDeleted
);
ASSERT
(
isTsInWindow
(
pCurWin
,
tsCol
[
i
])
||
isEqualStateKey
(
pCurWin
,
pKeyData
));
updateSessionWindowInfo
(
&
pCurWin
->
winInfo
,
tsCol
,
NULL
,
0
,
pBlock
->
info
.
rows
,
i
,
0
,
NULL
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
deleteWindow
(
pAggSup
->
pCurWins
,
winIndex
,
destroyStateWinInfo
);
}
...
...
@@ -4675,7 +4674,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
pSDataBlock
->
info
.
rows
,
i
,
&
allEqual
,
pStDeleted
);
if
(
!
allEqual
)
{
appendOneRow
(
pAggSup
->
pScanBlock
,
&
pCurWin
->
winInfo
.
win
.
skey
,
&
pCurWin
->
winInfo
.
win
.
ekey
,
&
groupId
);
GROUPID_COLUMN_INDEX
,
&
groupId
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
deleteWindow
(
pAggSup
->
pCurWins
,
winIndex
,
destroyStateWinInfo
);
continue
;
...
...
@@ -4730,8 +4729,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
printDataBlock
(
pBlock
,
"single state recv"
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
stateCol
,
pInfo
->
stateCol
.
slotId
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录