diff --git a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-1-cluster-status.webp b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-1-cluster-status.webp
index a78e18028a94c2f6a783b08d992a25c791527407..3bc0d960f1db45ee8d2adcee26de89334e681956 100644
Binary files a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-1-cluster-status.webp and b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-1-cluster-status.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-2-dnodes.webp b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-2-dnodes.webp
index b152418d0902b8ebdf62ebce6705c10dd5ab4fbf..f5a602d3f9dcecb64ded5e1f463ba460daab0024 100644
Binary files a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-2-dnodes.webp and b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-2-dnodes.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-3-mnodes.webp b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-3-mnodes.webp
index f58f48b7f17375cb8e62e7c0126ca3aea56a13f6..f155fa42a0fb5df71ee48c8c65a8c7d8851ddc3e 100644
Binary files a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-3-mnodes.webp and b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-3-mnodes.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-4-requests.webp b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-4-requests.webp
index 00afcce013602dce0da17bfd033f65aaa8e43bb7..dc0b85e262bd4340e986a42105e0ff9838d12fa6 100644
Binary files a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-4-requests.webp and b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-4-requests.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-5-database.webp b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-5-database.webp
index 567e5694f9d7a035a3eb354493d3df8ed64db251..342c8cfc0a8e852e7cd092aff453ed1fd2ec85a2 100644
Binary files a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-5-database.webp and b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-5-database.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-8-taosadapter.webp b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-8-taosadapter.webp
index 8666193f59497180574fd2786266e5baabbe9761..942130d4fabf7944c7add10acb3bb42ca7f51e0f 100644
Binary files a/docs/zh/14-reference/07-tdinsight/assets/TDinsight-8-taosadapter.webp and b/docs/zh/14-reference/07-tdinsight/assets/TDinsight-8-taosadapter.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/assets/howto-add-datasource.webp b/docs/zh/14-reference/07-tdinsight/assets/howto-add-datasource.webp
index 06d0ff6ed50091a6340508bc5b2b3f78b65dcb18..d7fc9e233acd1a4b1bbb940b13bc4296c261a33a 100644
Binary files a/docs/zh/14-reference/07-tdinsight/assets/howto-add-datasource.webp and b/docs/zh/14-reference/07-tdinsight/assets/howto-add-datasource.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/assets/import_dashboard.webp b/docs/zh/14-reference/07-tdinsight/assets/import_dashboard.webp
index fb7958f1b9fbd43c8f63136024842790e711c490..ae2a1e8e9b7b63a68d56dfcd2187eca614da9a3d 100644
Binary files a/docs/zh/14-reference/07-tdinsight/assets/import_dashboard.webp and b/docs/zh/14-reference/07-tdinsight/assets/import_dashboard.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/assets/import_dashboard_view.webp b/docs/zh/14-reference/07-tdinsight/assets/import_dashboard_view.webp
new file mode 100644
index 0000000000000000000000000000000000000000..1b10e41c75fbbb9a30bce4aa8d1adb8216fbe127
Binary files /dev/null and b/docs/zh/14-reference/07-tdinsight/assets/import_dashboard_view.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/assets/select_dashboard_db.webp b/docs/zh/14-reference/07-tdinsight/assets/select_dashboard_db.webp
new file mode 100644
index 0000000000000000000000000000000000000000..956132e37e9df255d3ff82654fd357bec001e695
Binary files /dev/null and b/docs/zh/14-reference/07-tdinsight/assets/select_dashboard_db.webp differ
diff --git a/docs/zh/14-reference/07-tdinsight/index.md b/docs/zh/14-reference/07-tdinsight/index.mdx
similarity index 67%
rename from docs/zh/14-reference/07-tdinsight/index.md
rename to docs/zh/14-reference/07-tdinsight/index.mdx
index 5990a831b8bc1788deaddfb38f717f2723969362..f4b981cb653d5407fe53119b536d1bdc073c6944 100644
--- a/docs/zh/14-reference/07-tdinsight/index.md
+++ b/docs/zh/14-reference/07-tdinsight/index.mdx
@@ -3,19 +3,28 @@ title: TDinsight - 基于Grafana的TDengine零依赖监控解决方案
sidebar_label: TDinsight
---
-TDinsight 是使用内置监控数据库和 [Grafana] 对 TDengine 进行监控的解决方案。
+TDinsight 是使用监控数据库和 [Grafana] 对 TDengine 进行监控的解决方案。
-TDengine 启动后,会自动创建一个监测数据库 `log`,并自动将服务器的 CPU、内存、硬盘空间、带宽、请求数、磁盘读写速度、慢查询等信息定时写入该数据库,并对重要的系统操作(比如登录、创建、删除数据库等)以及各种错误报警信息进行记录。通过 [Grafana] 和 [TDengine 数据源插件](https://github.com/taosdata/grafanaplugin/releases),TDinsight 将集群状态、节点信息、插入及查询请求、资源使用情况等进行可视化展示,同时还支持 vnode、dnode、mnode 节点状态异常告警,为开发者实时监控 TDengine 集群运行状态提供了便利。本文将指导用户安装 Grafana 服务器并通过 `TDinsight.sh` 安装脚本自动安装 TDengine 数据源插件及部署 TDinsight 可视化面板。
+TDengine 通过 [taosKeeper](../../taosKeeper) 将服务器的 CPU、内存、硬盘空间、带宽、请求数、磁盘读写速度、慢查询等信息定时写入指定数据库,并对重要的系统操作(比如登录、创建、删除数据库等)以及各种错误报警信息进行记录。通过 [Grafana] 和 [TDengine 数据源插件](https://github.com/taosdata/grafanaplugin/releases),TDinsight 将集群状态、节点信息、插入及查询请求、资源使用情况等进行可视化展示,同时还支持 vnode、dnode、mnode 节点状态异常告警,为开发者实时监控 TDengine 集群运行状态提供了便利。本文将指导用户安装 Grafana 服务器并通过 `TDinsight.sh` 安装脚本自动安装 TDengine 数据源插件及部署 TDinsight 可视化面板。
## 系统要求
-要部署 TDinsight,需要一个单节点的 TDengine 服务器或一个多节点的 [TDengine] 集群,以及一个[Grafana]服务器。此仪表盘需要 TDengine 2.3.3.0 及以上,并启用 `log` 数据库(`monitor = 1`)。
+- 单节点的 TDengine 服务器或多节点的 [TDengine] 集群,以及一个[Grafana]服务器。此仪表盘需要 TDengine 3.0.0.0 及以上,并开启监控服务,具体配置请参考:[TDengine 监控配置](../config/#监控相关)。
+- taosAdapter 已经安装并正常运行。具体细节请参考:[taosAdapter 使用手册](../../taosadapter)
+- taosKeeper 已安装并正常运行。具体细节请参考:[taosKeeper 使用手册](../../taosKeeper)
+
+记录以下信息:
+
+- taosAdapter 集群 REST API 地址,如:`http://tdengine.local:6041`。
+- taosAdapter 集群认证信息,可使用用户名及密码。
+- taosKeeper 记录监控指标的数据库名称。
## 安装 Grafana
-我们建议在此处使用最新的[Grafana] 7 或 8 版本。您可以在任何[支持的操作系统](https://grafana.com/docs/grafana/latest/installation/requirements/#supported-operating-systems)中,按照 [Grafana 官方文档安装说明](https://grafana.com/docs/grafana/latest/installation/) 安装 [Grafana]。
+我们建议在此处使用最新的[Grafana] 8 或 9 版本。您可以在任何[支持的操作系统](https://grafana.com/docs/grafana/latest/installation/requirements/#supported-operating-systems)中,按照 [Grafana 官方文档安装说明](https://grafana.com/docs/grafana/latest/installation/) 安装 [Grafana]。
-### 在 Debian 或 Ubuntu 上安装 Grafana
+
+
对于 Debian 或 Ubuntu 操作系统,建议使用 Grafana 镜像仓库。使用如下命令从零开始安装:
@@ -31,6 +40,8 @@ sudo apt-get install grafana
```
### 在 CentOS / RHEL 上安装 Grafana
+
+
您可以从官方 YUM 镜像仓库安装。
@@ -59,7 +70,12 @@ sudo yum install \
https://dl.grafana.com/oss/release/grafana-7.5.11-1.x86_64.rpm
```
-## 自动部署 TDinsight
+
+
+
+
+
+
我们提供了一个自动化安装脚本 [`TDinsight.sh`](https://github.com/taosdata/grafanaplugin/releases/latest/download/TDinsight.sh) 脚本以便用户快速进行安装配置。
@@ -71,7 +87,7 @@ chmod +x TDinsight.sh
./TDinsight.sh
```
-这个脚本会自动下载最新的[Grafana TDengine 数据源插件](https://github.com/taosdata/grafanaplugin/releases/latest) 和 [TDinsight 仪表盘](https://grafana.com/grafana/dashboards/15167) ,将命令行选项中的可配置参数转为 [Grafana Provisioning](https://grafana.com/docs/grafana/latest/administration/provisioning/) 配置文件,以进行自动化部署及更新等操作。利用该脚本提供的告警设置选项,你还可以获得内置的阿里云短信告警通知支持。
+这个脚本会自动下载最新的[Grafana TDengine 数据源插件](https://github.com/taosdata/grafanaplugin/releases/latest) 和 [TDinsight 仪表盘](https://github.com/taosdata/grafanaplugin/blob/master/dashboards/TDinsightV3.json) ,将命令行选项中的可配置参数转为 [Grafana Provisioning](https://grafana.com/docs/grafana/latest/administration/provisioning/) 配置文件,以进行自动化部署及更新等操作。利用该脚本提供的告警设置选项,你还可以获得内置的阿里云短信告警通知支持。
假设您在同一台主机上使用 TDengine 和 Grafana 的默认服务。运行 `./TDinsight.sh` 并打开 Grafana 浏览器窗口就可以看到 TDinsight 仪表盘了。
@@ -106,18 +122,6 @@ Install and configure TDinsight dashboard in Grafana on Ubuntu 18.04/20.04 syste
-E, --external-notifier Apply external notifier uid to TDinsight dashboard.
-Aliyun SMS as Notifier:
--s, --sms-enabled To enable tdengine-datasource plugin builtin Aliyun SMS webhook.
--N, --sms-notifier-name Provisioning notifier name.[default: TDinsight Builtin SMS]
--U, --sms-notifier-uid Provisioning notifier uid, use lowercase notifier name by default.
--D, --sms-notifier-is-default Set notifier as default.
--I, --sms-access-key-id Aliyun SMS access key id
--K, --sms-access-key-secret Aliyun SMS access key secret
--S, --sms-sign-name Sign name
--C, --sms-template-code Template code
--T, --sms-template-param Template param, a escaped JSON string like '{"alarm_level":"%s","time":"%s","name":"%s","content":"%s"}'
--B, --sms-phone-numbers Comma-separated numbers list, eg "189xxxxxxxx,132xxxxxxxx"
--L, --sms-listen-addr [default: 127.0.0.1:9100]
```
大多数命令行选项都可以通过环境变量获得同样的效果。
@@ -136,17 +140,6 @@ Aliyun SMS as Notifier:
| -t | --tdinsight-title | TDINSIGHT_DASHBOARD_TITLE | TDinsight 仪表盘标题。 [默认:TDinsight] |
| -e | --tdinsight-可编辑 | TDINSIGHT_DASHBOARD_EDITABLE | 如果配置仪表盘可以编辑。 [默认值:false] |
| -E | --external-notifier | EXTERNAL_NOTIFIER | 将外部通知程序 uid 应用于 TDinsight 仪表盘。 |
-| -s | --sms-enabled | SMS_ENABLED | 启用阿里云短信 webhook 内置的 tdengine-datasource 插件。 |
-| -N | --sms-notifier-name | SMS_NOTIFIER_NAME | 供应通知程序名称。[默认:`TDinsight Builtin SMS`] |
-| -U | --sms-notifier-uid | SMS_NOTIFIER_UID | "Notification Channel" `uid`,默认使用程序名称的小写,其他字符用 “-” 代替。 |
-| -D | --sms-notifier-is-default | SMS_NOTIFIER_IS_DEFAULT | 将内置短信通知设置为默认值。 |
-| -I | --sms-access-key-id | SMS_ACCESS_KEY_ID | 阿里云短信访问密钥 id |
-| -K | --sms-access-key-secret | SMS_ACCESS_KEY_SECRET | 阿里云短信访问秘钥 |
-| -S | --sms-sign-name | SMS_SIGN_NAME | 签名 |
-| -C | --sms-template-code | SMS_TEMPLATE_CODE | 模板代码 |
-| -T | --sms-template-param | SMS_TEMPLATE_PARAM | 模板参数的 JSON 模板 |
-| -B | --sms-phone-numbers | SMS_PHONE_NUMBERS | 逗号分隔的手机号列表,例如`"189xxxxxxxx,132xxxxxxxx"` |
-| -L | --sms-listen-addr | SMS_LISTEN_ADDR | 内置 SMS webhook 监听地址,默认为`127.0.0.1:9100` |
假设您在主机 `tdengine` 上启动 TDengine 数据库,HTTP API 端口为 `6041`,用户为 `root1`,密码为 `pass5ord`。执行脚本:
@@ -166,31 +159,18 @@ curl --no-progress-meter -u admin:admin http://localhost:3000/api/alert-notifica
sudo ./TDinsight.sh -a http://tdengine:6041 -u root1 -p pass5ord -E existing-notifier
```
-如果你想使用[阿里云短信](https://www.aliyun.com/product/sms)服务作为通知渠道,你应该使用`-s`标志启用并添加以下参数:
-
-- `-N`:Notification Channel 名,默认为`TDinsight Builtin SMS`。
-- `-U`:Channel uid,默认是 `name` 的小写,任何其他字符都替换为 - ,对于默认的 `-N`,其 uid 为 `tdinsight-builtin-sms`。
-- `-I`:阿里云短信访问密钥 id。
-- `-K`:阿里云短信访问秘钥。
-- `-S`:阿里云短信签名。
-- `-C`:阿里云短信模板 ID。
-- `-T`:阿里云短信模板参数,为 JSON 格式模板,示例如下 `'{"alarm_level":"%s","time":"%s","name":"%s","content":"%s "}'`。有四个参数:告警级别、时间、名称和告警内容。
-- `-B`:电话号码列表,以逗号`,`分隔。
-
如果要监控多个 TDengine 集群,则需要设置多个 TDinsight 仪表盘。设置非默认 TDinsight 需要进行一些更改: `-n` `-i` `-t` 选项需要更改为非默认名称,如果使用 内置短信告警功能,`-N` 和 `-L` 也应该改变。
```bash
sudo ./TDengine.sh -n TDengine-Env1 -a http://another:6041 -u root -p taosdata -i tdinsight-env1 -t 'TDinsight Env1'
-# 如果使用内置短信通知
-sudo ./TDengine.sh -n TDengine-Env1 -a http://another:6041 -u root -p taosdata -i tdinsight-env1 -t 'TDinsight Env1' \
- -s -N 'Env1 SMS' -I xx -K xx -S xx -C SMS_XX -T '' -B 00000000000 -L 127.0.0.01:10611
```
请注意,配置数据源、通知 Channel 和仪表盘在前端是不可更改的。您应该再次通过此脚本更新配置或手动更改 `/etc/grafana/provisioning` 目录(这是 Grafana 的默认目录,根据需要使用`-P`选项更改)中的配置文件。
特别地,当您使用 Grafana Cloud 或其他组织时,`-O` 可用于设置组织 ID。 `-G` 可指定 Grafana 插件安装目录。 `-e` 参数将仪表盘设置为可编辑。
-## 手动设置 TDinsight
+
+
### 安装 TDengine 数据源插件
@@ -247,23 +227,30 @@ sudo systemctl enable grafana-server
![TDengine Database TDinsight 数据源测试](./assets/howto-add-datasource-test.webp)
+
+
+
### 导入仪表盘
-指向 **+** / **Create** - **import**(或 `/dashboard/import` url)。
+在配置 TDengine 数据源界面,点击 **Dashboards** tab。
![TDengine Database TDinsight 导入仪表盘和配置](./assets/import_dashboard.webp)
-在 **Import via grafana.com** 位置键入仪表盘 ID `15167` 并 **Load**。
+选择 `TDengine for 3.x`,并点击 `import`。
+
+导入完成后,在搜索界面已经出现了 **TDinsight for 3.x** dashboard。
+
+![TDengine Database TDinsight 查看导入结果](./assets/import_dashboard_view.webp)
-![通过 grafana.com 导入](./assets/import-dashboard-15167.webp)
+进入 TDinsight for 3.x dashboard 后,选择 taosKeeper 中设置的记录监控指标的数据库。
-导入完成后,TDinsight 的完整页面视图如下所示。
+![TDengine Database TDinsight 选择数据库](./assets/select_dashboard_db.webp)
-![TDengine Database TDinsight 显示](./assets/TDinsight-full.webp)
+然后可以看到监控结果。
## TDinsight 仪表盘详细信息
-TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes, vnodes](https://www.taosdata.com/cn/documentation/architecture#cluster)或数据库的使用情况和状态。
+TDinsight 仪表盘旨在提供 TDengine 相关资源的使用情况和状态,比如 dnodes、 mnodes、 vnodes 和数据库等。
指标详情如下:
@@ -285,7 +272,6 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
- **Measuring Points Used**:启用告警规则的测点数用量(社区版无数据,默认情况下是健康的)。
- **Grants Expire Time**:启用告警规则的企业版过期时间(社区版无数据,默认情况是健康的)。
- **Error Rate**:启用警报的集群总合错误率(每秒平均错误数)。
-- **Variables**:`show variables` 表格展示。
### DNodes 状态
@@ -294,7 +280,6 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
- **DNodes Status**:`show dnodes` 的简单表格视图。
- **DNodes Lifetime**:从创建 dnode 开始经过的时间。
- **DNodes Number**:DNodes 数量变化。
-- **Offline Reason**:如果有任何 dnode 状态为离线,则以饼图形式展示离线原因。
### MNode 概述
@@ -309,7 +294,6 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
1. **Requests Rate(Inserts per Second)**:平均每秒插入次数。
2. **Requests (Selects)**:查询请求数及变化率(count of second)。
-3. **Requests (HTTP)**:HTTP 请求数和请求速率(count of second)。
### 数据库
@@ -319,9 +303,8 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
1. **STables**:超级表数量。
2. **Total Tables**:所有表数量。
-3. **Sub Tables**:所有超级表子表的数量。
-4. **Tables**:所有普通表数量随时间变化图。
-5. **Tables Number Foreach VGroups**:每个 VGroups 包含的表数量。
+3. **Tables**:所有普通表数量随时间变化图。
+4. **Tables Number Foreach VGroups**:每个 VGroups 包含的表数量。
### DNode 资源使用情况
@@ -356,12 +339,11 @@ TDinsight 仪表盘旨在提供 TDengine 相关资源使用情况[dnodes, mnodes
支持监控 taosAdapter 请求统计和状态详情。包括:
-1. **http_request**: 包含总请求数,请求失败数以及正在处理的请求数
-2. **top 3 request endpoint**: 按终端分组,请求排名前三的数据
-3. **Memory Used**: taosAdapter 内存使用情况
-4. **latency_quantile(ms)**: (1, 2, 5, 9, 99)阶段的分位数
-5. **top 3 failed request endpoint**: 按终端分组,请求失败排名前三的数据
-6. **CPU Used**: taosAdapter CPU 使用情况
+1. **http_request_inflight**: 即时处理请求数
+2. **http_request_total**: 请求总数。
+3. **http_request_fail**: 请求总数。
+4. **CPU Used**: taosAdapter CPU 使用情况。
+5. **Memory Used**: taosAdapter 内存使用情况。
## 升级
@@ -403,13 +385,6 @@ services:
TDENGINE_API: ${TDENGINE_API}
TDENGINE_USER: ${TDENGINE_USER}
TDENGINE_PASS: ${TDENGINE_PASS}
- SMS_ACCESS_KEY_ID: ${SMS_ACCESS_KEY_ID}
- SMS_ACCESS_KEY_SECRET: ${SMS_ACCESS_KEY_SECRET}
- SMS_SIGN_NAME: ${SMS_SIGN_NAME}
- SMS_TEMPLATE_CODE: ${SMS_TEMPLATE_CODE}
- SMS_TEMPLATE_PARAM: '${SMS_TEMPLATE_PARAM}'
- SMS_PHONE_NUMBERS: $SMS_PHONE_NUMBERS
- SMS_LISTEN_ADDR: ${SMS_LISTEN_ADDR}
ports:
- 3000:3000
volumes:
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index fb4eac991f4d64e2c5477e3b102395ad6c83550b..601c22a3ba793b17a8ccc8de2cd4dfa37d9c7682 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -1016,7 +1016,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
-void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
+void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, int32_t uidCol, uint64_t* pID);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 7b13aa8ad8fe5f33a8427d5a4d1e49e2ef1690ff..1377b42b7213187105bcfa21b43064d446dcb9a1 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -1086,7 +1086,10 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
+ SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
+ SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
+ SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
int32_t dummy = 0;
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[i]);
@@ -1100,9 +1103,13 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SResultWindowInfo* pEndWin =
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
ASSERT(pEndWin);
+ TSKEY ts = INT64_MIN;
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
+ colDataAppendNULL(pDestUidCol, i);
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
+ colDataAppendNULL(pDestCalStartTsCol, i);
+ colDataAppendNULL(pDestCalEndTsCol, i);
pDestBlock->info.rows++;
}
return TSDB_CODE_SUCCESS;
@@ -1157,13 +1164,13 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return code;
}
-void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid) {
+void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, int32_t uidCol, uint64_t* pID) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
- SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
+ SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, uidCol);
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
- colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
+ colDataAppend(pUidCol, pBlock->info.rows, (const char*)pID, false);
pBlock->info.rows++;
}
@@ -1190,7 +1197,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup);
if ((update || closedWin) && out) {
- appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
+ appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, UID_COLUMN_INDEX, &pBlock->info.uid);
}
}
if (out) {
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 0594a727fcaaf8f41a55703e64d7247a2dca6d15..e9298487e760bee4cf28f47ab11a104b82cba509 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -3951,11 +3951,13 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
int32_t numOfOutput, int64_t gap, SArray* result) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
+ SColumnInfoData* pGpDataInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
+ uint64_t* gpCols = (uint64_t*)pGpDataInfo->pData;
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0;
SResultWindowInfo* pCurWin =
- getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
+ getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex);
if (!pCurWin || pCurWin->pos.pageId == -1) {
// window has been closed.
step = 1;
@@ -4168,13 +4170,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
- doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs, 0,
+ doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, pOperator->exprSupp.numOfExprs, 0,
pWins);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
- doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, 0, pChildOp->exprSupp.numOfExprs,
+ doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs,
0, NULL);
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
}
@@ -4285,21 +4287,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows > 0) {
- printDataBlock(pBInfo->pRes, "sems session");
+ printDataBlock(pBInfo->pRes, "semi session");
return pBInfo->pRes;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
pInfo->returnDelete = true;
- printDataBlock(pInfo->pDelRes, "sems session");
+ printDataBlock(pInfo->pDelRes, "semi session");
return pInfo->pDelRes;
}
if (pInfo->pUpdateRes->info.rows > 0) {
// process the rest of the data
pOperator->status = OP_OPENED;
- printDataBlock(pInfo->pUpdateRes, "sems session");
+ printDataBlock(pInfo->pUpdateRes, "semi session");
return pInfo->pUpdateRes;
}
// semi interval operator clear disk buffer
@@ -4318,13 +4320,14 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearSpecialDataBlock(pInfo->pUpdateRes);
break;
}
+ printDataBlock(pBlock, "semi session recv");
if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
- doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, 0, pWins);
+ doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, START_TS_COLUMN_INDEX, pSup->numOfExprs, 0, pWins);
removeSessionResults(pStUpdated, pWins);
taosArrayDestroy(pWins);
- copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
+ copyDataBlock(pInfo->pUpdateRes, pBlock);
break;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
// gap must be 0
@@ -4364,21 +4367,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows > 0) {
- printDataBlock(pBInfo->pRes, "sems session");
+ printDataBlock(pBInfo->pRes, "semi session");
return pBInfo->pRes;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
pInfo->returnDelete = true;
- printDataBlock(pInfo->pDelRes, "sems session");
+ printDataBlock(pInfo->pDelRes, "semi session");
return pInfo->pDelRes;
}
if (pInfo->pUpdateRes->info.rows > 0) {
// process the rest of the data
pOperator->status = OP_OPENED;
- printDataBlock(pInfo->pUpdateRes, "sems session");
+ printDataBlock(pInfo->pUpdateRes, "semi session");
return pInfo->pUpdateRes;
}
@@ -4400,8 +4403,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pOperator->name = "StreamSessionFinalAggOperator";
} else {
pInfo->isFinal = false;
- pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
- pInfo->pUpdateRes->info.type = STREAM_CLEAR;
+ pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->name = "StreamSessionSemiAggOperator";
pOperator->fpSet =
@@ -4616,23 +4618,20 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
}
static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock,
- int32_t tsIndex, SColumn* pCol, int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) {
- SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
- SColumnInfoData* pKeyColInfo = taosArrayGet(pBlock->pDataBlock, keyIndex);
+ SHashObj* pSeUpdated, SHashObj* pSeDeleted) {
+ SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
+ SColumnInfoData* pGroupColInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
TSKEY* tsCol = (TSKEY*)pTsColInfo->pData;
bool allEqual = false;
int32_t step = 1;
- uint64_t groupId = pBlock->info.groupId;
+ uint64_t* gpCol = (uint64_t*) pGroupColInfo->pData;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
- char* pKeyData = colDataGetData(pKeyColInfo, i);
int32_t winIndex = 0;
- SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], groupId, &winIndex);
+ SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], gpCol[i], &winIndex);
if (!pCurWin) {
continue;
}
- step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, groupId, pKeyColInfo,
- pBlock->info.rows, i, &allEqual, pSeDeleted);
- ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData));
+ updateSessionWindowInfo(&pCurWin->winInfo, tsCol, NULL, 0, pBlock->info.rows, i, 0, NULL);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
}
@@ -4675,7 +4674,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
pSDataBlock->info.rows, i, &allEqual, pStDeleted);
if (!allEqual) {
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
- &groupId);
+ GROUPID_COLUMN_INDEX, &groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue;
@@ -4730,8 +4729,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
printDataBlock(pBlock, "single state recv");
if (pBlock->info.type == STREAM_CLEAR) {
- doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
- pSeUpdated, pInfo->pSeDeleted);
+ doClearStateWindows(&pInfo->streamAggSup, pBlock, pSeUpdated, pInfo->pSeDeleted);
continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));