提交 6cf09fb5 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
# TDengine 简介 # TDengine 简介
TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供缓存、数据订阅、流式计算等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。TDengine 的主要优势如下: TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供缓存、数据订阅、流式计算等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。与其他时序数据库相比,TDengine 的主要优势如下:
- 高性能:通过创新的存储引擎设计,无论是数据写入还是查询,TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,存储空间不及通用数据库的1/10。 - 高性能:通过创新的存储引擎设计,无论是数据写入还是查询,TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,存储空间不及通用数据库的1/10。
...@@ -41,7 +41,9 @@ TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series ...@@ -41,7 +41,9 @@ TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series
TDengine 目前可以在 Linux、 Windows 等平台上安装和运行。任何 OS 的应用也可以选择 taosAdapter 的 RESTful 接口连接服务端 taosd。CPU 支持 X64/ARM64,后续会支持 MIPS64、Alpha64、ARM32、RISC-V 等 CPU 架构。 TDengine 目前可以在 Linux、 Windows 等平台上安装和运行。任何 OS 的应用也可以选择 taosAdapter 的 RESTful 接口连接服务端 taosd。CPU 支持 X64/ARM64,后续会支持 MIPS64、Alpha64、ARM32、RISC-V 等 CPU 架构。
用户可根据需求选择通过[源码](https://www.taosdata.com/cn/getting-started/#通过源码安装)或者[安装包](https://www.taosdata.com/cn/getting-started/#通过安装包安装)来安装。本快速指南仅适用于通过源码安装。 用户可根据需求选择通过源码、[容器](https://docs.taosdata.com/3.0/get-started/docker/)[安装包](https://docs.taosdata.com/3.0/get-started/package/)[Kubenetes](https://docs.taosdata.com/3.0/deployment/k8s/)来安装。本快速指南仅适用于通过源码安装。
TDengine 还提供一组辅助工具软件 taosTools,目前它包含 taosBenchmark(曾命名为 taosdemo)和 taosdump 两个软件。默认 TDengine 编译不包含 taosTools, 您可以在编译 TDengine 时使用`cmake .. -DBUILD_TOOLS=true` 来同时编译 taosTools。
## 安装工具 ## 安装工具
...@@ -53,10 +55,6 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev ...@@ -53,10 +55,6 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev
#### 为 taos-tools 安装编译需要的软件 #### 为 taos-tools 安装编译需要的软件
taosTools 是用于 TDengine 的辅助工具软件集合。目前它包含 taosBenchmark(曾命名为 taosdemo)和 taosdump 两个软件。
默认 TDengine 编译不包含 taosTools。您可以在编译 TDengine 时使用`cmake .. -DBUILD_TOOLS=true` 来同时编译 taosTools。
为了在 Ubuntu/Debian 系统上编译 [taos-tools](https://github.com/taosdata/taos-tools) 需要安装如下软件: 为了在 Ubuntu/Debian 系统上编译 [taos-tools](https://github.com/taosdata/taos-tools) 需要安装如下软件:
```bash ```bash
...@@ -104,7 +102,7 @@ sudo yum config-manager --set-enabled Powertools ...@@ -104,7 +102,7 @@ sudo yum config-manager --set-enabled Powertools
### 设置 golang 开发环境 ### 设置 golang 开发环境
TDengine 包含数个使用 Go 语言开发的组件,请参考 golang.org 官方文档设置 go 开发环境。 TDengine 包含数个使用 Go 语言开发的组件,比如taosAdapter, 请参考 golang.org 官方文档设置 go 开发环境。
请使用 1.14 及以上版本。对于中国用户,我们建议使用代理来加速软件包下载。 请使用 1.14 及以上版本。对于中国用户,我们建议使用代理来加速软件包下载。
...@@ -113,7 +111,7 @@ go env -w GO111MODULE=on ...@@ -113,7 +111,7 @@ go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.cn,direct go env -w GOPROXY=https://goproxy.cn,direct
``` ```
默认情况下,内嵌的 http 服务仍然可以从 TDengine 源码构建。当然您也可以使用以下命令选择构建 taosAdapter 作为 RESTful 接口的服务。 缺省是不会构建 taosAdapter, 但您可以使用以下命令选择构建 taosAdapter 作为 RESTful 接口的服务。
``` ```
cmake .. -DBUILD_HTTP=false cmake .. -DBUILD_HTTP=false
......
...@@ -20,27 +20,29 @@ English | [简体中文](README-CN.md) | We are hiring, check [here](https://tde ...@@ -20,27 +20,29 @@ English | [简体中文](README-CN.md) | We are hiring, check [here](https://tde
# What is TDengine? # What is TDengine?
TDengine is an open source, cloud native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other TSDBs with the following advantages.: TDengine is an open source, high performance, cloud native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other TSDBs with the following advantages.:
- High-Performance: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression. - High-Performance: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression.
- Simplified Solution: Through built-in caching, stream processing and data subscription features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly. - Simplified Solution: Through built-in caching, stream processing and data subscription features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly.
- Cloud Native: Through native distributed design, sharding and partitioning, separation of compute and storage, RAFT, support for kubernetes deployment and full observability, TDengine can be deployed on public, private or hybrid clouds. - Cloud Native: Through native distributed design, sharding and partitioning, separation of compute and storage, RAFT, support for kubernetes deployment and full observability, TDengine is a cloud native Time-Series Database and can be deployed on public, private or hybrid clouds.
- Open Source: TDengine’s core modules, including cluster feature, are all available under open source licenses. It has gathered 18.8k stars on GitHub, an active developer community, and over 137k running instances worldwide.
- Ease of Use: For administrators, TDengine significantly reduces the effort to deploy and maintain. For developers, it provides a simple interface, simplified solution and seamless integrations for third party tools. For data users, it gives easy data access. - Ease of Use: For administrators, TDengine significantly reduces the effort to deploy and maintain. For developers, it provides a simple interface, simplified solution and seamless integrations for third party tools. For data users, it gives easy data access.
- Easy Data Analytics: Through super tables, storage and compute separation, data partitioning by time interval, pre-computation and other means, TDengine makes it easy to explore, format, and get access to data in a highly efficient way. - Easy Data Analytics: Through super tables, storage and compute separation, data partitioning by time interval, pre-computation and other means, TDengine makes it easy to explore, format, and get access to data in a highly efficient way.
- Open Source: TDengine’s core modules, including cluster feature, are all available under open source licenses. It has gathered 18.8k stars on GitHub, an active developer community, and over 137k running instances worldwide.
# Documentation # Documentation
For user manual, system design and architecture, please refer to [TDengine Documentation](https://docs.tdengine.com) (中文版请点击[这里](https://docs.taosdata.com)) For user manual, system design and architecture, please refer to [TDengine Documentation](https://docs.tdengine.com) ([中文版](https://docs.taosdata.com))
# Building # Building
At the moment, TDengine server supports running on Linux, Windows, and macOS systems. You can choose to [install from packages](https://www.taosdata.com/en/getting-started/#Install-from-Package) or build it from the source code. This quick guide is for installation from the source only. At the moment, TDengine server supports running on Linux, Windows, and macOS systems. You can choose to [install from packages](https://www.tdengine.com/getting-started/#Install-from-Package) or build it from the source code. This quick guide is for installation from the source only.
We provide a few useful tools such as taosBenchmark (was named taosdemo) and taosdump. They were part of TDengine. By default, TDengine compiling does not include taosTools. You can use 'cmake .. -DBUILD_TOOLS=true' to make them be compiled with TDengine.
To build TDengine, use [CMake](https://cmake.org/) 3.0.2 or higher versions in the project directory. To build TDengine, use [CMake](https://cmake.org/) 3.0.2 or higher versions in the project directory.
...@@ -54,9 +56,6 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev ...@@ -54,9 +56,6 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev
#### Install build dependencies for taosTools #### Install build dependencies for taosTools
We provide a few useful tools such as taosBenchmark (was named taosdemo) and taosdump. They were part of TDengine. From TDengine 2.4.0.0, taosBenchmark and taosdump were not released together with TDengine.
By default, TDengine compiling does not include taosTools. You can use 'cmake .. -DBUILD_TOOLS=true' to make them be compiled with TDengine.
To build the [taosTools](https://github.com/taosdata/taos-tools) on Ubuntu/Debian, the following packages need to be installed. To build the [taosTools](https://github.com/taosdata/taos-tools) on Ubuntu/Debian, the following packages need to be installed.
```bash ```bash
...@@ -90,7 +89,7 @@ Note: Since snappy lacks pkg-config support (refer to [link](https://github.com/ ...@@ -90,7 +89,7 @@ Note: Since snappy lacks pkg-config support (refer to [link](https://github.com/
### Setup golang environment ### Setup golang environment
TDengine includes a few components developed by Go language. Please refer to golang.org official documentation for golang environment setup. TDengine includes a few components like taosAdapter developed by Go language. Please refer to golang.org official documentation for golang environment setup.
Please use version 1.14+. For the user in China, we recommend using a proxy to accelerate package downloading. Please use version 1.14+. For the user in China, we recommend using a proxy to accelerate package downloading.
...@@ -101,7 +100,7 @@ go env -w GOPROXY=https://goproxy.cn,direct ...@@ -101,7 +100,7 @@ go env -w GOPROXY=https://goproxy.cn,direct
### Setup rust environment ### Setup rust environment
TDengine includees a few compoments developed by Rust language. Please refer to rust-lang.org official documentation for rust environment setup. TDengine includes a few compoments developed by Rust language. Please refer to rust-lang.org official documentation for rust environment setup.
## Get the source codes ## Get the source codes
...@@ -140,14 +139,7 @@ cmake .. -DBUILD_TOOLS=true ...@@ -140,14 +139,7 @@ cmake .. -DBUILD_TOOLS=true
make make
``` ```
Note TDengine 2.3.x.0 and later use a component named 'taosAdapter' to play http daemon role by default instead of the http daemon embedded in the early version of TDengine. The taosAdapter is programmed by go language. If you pull TDengine source code to the latest from an existing codebase, please execute 'git submodule update --init --recursive' to pull taosAdapter source code. Please install go language version 1.14 or above for compiling taosAdapter. If you meet difficulties regarding 'go mod', especially you are from China, you can use a proxy to solve the problem. Note TDengine 2.3.x.0 and later use a component named 'taosAdapter' to play http daemon role. If you pull TDengine source code to the latest from an existing codebase, please execute 'git submodule update --init --recursive' to pull taosAdapter source code, and use the following command to choose to build taosAdapter.
```
go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.cn,direct
```
The embedded http daemon still be built from TDengine source code by default. Or you can use the following command to choose to build taosAdapter.
``` ```
cmake .. -DBUILD_HTTP=false cmake .. -DBUILD_HTTP=false
...@@ -172,7 +164,7 @@ cmake .. -DCPUTYPE=aarch64 && cmake --build . ...@@ -172,7 +164,7 @@ cmake .. -DCPUTYPE=aarch64 && cmake --build .
### On Windows platform ### On Windows platform
If you use the Visual Studio 2013, please open a command window by executing "cmd.exe". If you use the Visual Studio 2013, please open a command window by executing "cmd.exe".
Please specify "amd64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat. Please specify "amd64" for 64 bits Windows or specify "x86" for 32 bits Windows when you execute vcvarsall.bat.
```cmd ```cmd
mkdir debug && cd debug mkdir debug && cd debug
...@@ -184,7 +176,7 @@ nmake ...@@ -184,7 +176,7 @@ nmake
If you use the Visual Studio 2019 or 2017: If you use the Visual Studio 2019 or 2017:
please open a command window by executing "cmd.exe". please open a command window by executing "cmd.exe".
Please specify "x64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat. Please specify "x64" for 64 bits Windows or specify "x86" for 32 bits Windows when you execute vcvarsall.bat.
```cmd ```cmd
mkdir debug && cd debug mkdir debug && cd debug
...@@ -237,19 +229,6 @@ taos ...@@ -237,19 +229,6 @@ taos
If TDengine shell connects the server successfully, welcome messages and version info are printed. Otherwise, an error message is shown. If TDengine shell connects the server successfully, welcome messages and version info are printed. Otherwise, an error message is shown.
### Install TDengine by apt-get
If you use Debian or Ubuntu system, you can use 'apt-get' command to install TDengine from official repository. Please use following commands to setup:
```
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list
[Optional] echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
sudo apt-get update
apt-cache policy tdengine
sudo apt-get install tdengine
```
## On Windows platform ## On Windows platform
After building successfully, TDengine can be installed by: After building successfully, TDengine can be installed by:
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 43924b8 GIT_TAG 53a0103
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -32,19 +32,6 @@ docker exec -it <container name> bash ...@@ -32,19 +32,6 @@ docker exec -it <container name> bash
然后就可以执行相关的 Linux 命令操作和访问 TDengine 然后就可以执行相关的 Linux 命令操作和访问 TDengine
:::info
Docker 工具自身的下载请参考 [Docker 官网文档](https://docs.docker.com/get-docker/)
安装完毕后可以在命令行终端查看 Docker 版本。如果版本号正常输出,则说明 Docker 环境已经安装成功。
```bash
$ docker -v
Docker version 20.10.3, build 48d30b5
```
:::
## 运行 TDengine CLI ## 运行 TDengine CLI
进入容器,执行 taos 进入容器,执行 taos
...@@ -113,4 +100,4 @@ taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s); ...@@ -113,4 +100,4 @@ taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s);
## 其它 ## 其它
更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [在 Docker 下使用 TDengine](../../reference/docker) 更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [在 Docker 下使用 TDengine](../../reference/docker)
\ No newline at end of file
import taos
from taos.tmq import *
conn = taos.connect()
# create database
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
# create table and stables
conn.select_db("py_tmq")
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
# create topic
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
# set consumer configure options
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
conf.set("msg.with.table.name", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
# build consumer
tmq = conf.new_consumer()
# build topic list
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
# subscribe consumer
tmq.subscribe(topic_list)
# check subscriptions
sub_list = tmq.subscription()
print("subscribed topics: ",sub_list)
# start subscribe
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row)
tb = res.get_table_name()
print(f"from table: {tb}")
...@@ -3,7 +3,7 @@ sidebar_label: Docker ...@@ -3,7 +3,7 @@ sidebar_label: Docker
title: 通过 Docker 快速体验 TDengine title: 通过 Docker 快速体验 TDengine
--- ---
:::info :::info
如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. 如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
::: :::
本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。 本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。
...@@ -32,18 +32,7 @@ docker exec -it <container name> bash ...@@ -32,18 +32,7 @@ docker exec -it <container name> bash
然后就可以执行相关的 Linux 命令操作和访问 TDengine 然后就可以执行相关的 Linux 命令操作和访问 TDengine
:::info 注: Docker 工具自身的下载和使用请参考 [Docker 官网文档](https://docs.docker.com/get-docker/)
Docker 工具自身的下载请参考 [Docker 官网文档](https://docs.docker.com/get-docker/)
安装完毕后可以在命令行终端查看 Docker 版本。如果版本号正常输出,则说明 Docker 环境已经安装成功。
```bash
$ docker -v
Docker version 20.10.3, build 48d30b5
```
:::
## 运行 TDengine CLI ## 运行 TDengine CLI
...@@ -109,4 +98,4 @@ taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s); ...@@ -109,4 +98,4 @@ taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s);
## 其它 ## 其它
更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [在 Docker 下使用 TDengine](../../reference/docker) 更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [在 Docker 下使用 TDengine](../../reference/docker)
\ No newline at end of file
...@@ -46,8 +46,8 @@ apt-get 方式只适用于 Debian 或 Ubuntu 系统 ...@@ -46,8 +46,8 @@ apt-get 方式只适用于 Debian 或 Ubuntu 系统
</TabItem> </TabItem>
<TabItem label="Deb 安装" value="debinst"> <TabItem label="Deb 安装" value="debinst">
1、从官网下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb; 1.[发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
2进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令: 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令:
```bash ```bash
sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
...@@ -57,8 +57,8 @@ sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb ...@@ -57,8 +57,8 @@ sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
<TabItem label="RPM 安装" value="rpminst"> <TabItem label="RPM 安装" value="rpminst">
1、从官网下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm; 1.[发布历史页面](../../releases) 下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
2进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令: 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令:
```bash ```bash
sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
...@@ -68,8 +68,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm ...@@ -68,8 +68,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
<TabItem label="tar.gz 安装" value="tarinst"> <TabItem label="tar.gz 安装" value="tarinst">
1、从官网下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; 1.[发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
2进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本: 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
```bash ```bash
tar -zxvf TDengine-server-3.0.0.0-Linux-x64.tar.gz tar -zxvf TDengine-server-3.0.0.0-Linux-x64.tar.gz
...@@ -89,7 +89,9 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 ...@@ -89,7 +89,9 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问
</TabItem> </TabItem>
<TabItem label="Windows 安装" value="windows"> <TabItem label="Windows 安装" value="windows">
TODO
1.[发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -152,14 +154,14 @@ systemctl 命令汇总: ...@@ -152,14 +154,14 @@ systemctl 命令汇总:
<TabItem label="Windows 系统" value="windows"> <TabItem label="Windows 系统" value="windows">
TODO 安装后,在 C:\TDengine 目录下,运行 taosd.exe 来启动 TDengine 服务进程。
</TabItem> </TabItem>
</Tabs> </Tabs>
## TDengine 命令行 (CLI) ## TDengine 命令行 (CLI)
为便于检查 TDengine 的状态,执行数据库 (Database) 的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI) taos。要进入 TDengine 命令行,您只要在安装有 TDengine 的 Linux 终端执行 `taos` 即可。 为便于检查 TDengine 的状态,执行数据库 (Database) 的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI) taos。要进入 TDengine 命令行,您只要在安装有 TDengine 的 Linux 终端执行 `taos` 即可,也可以在安装有 TDengine 的 Windows 终端的 C:\TDengine 目录下,运行 taos.exe 来启动 TDengine 命令行
```bash ```bash
taos taos
......
...@@ -29,100 +29,77 @@ stream_options: { ...@@ -29,100 +29,77 @@ stream_options: {
首先准备数据,完成建库、建一张超级表和多张子表操作 首先准备数据,完成建库、建一张超级表和多张子表操作
```sql ```sql
drop database if exists stream_db; DROP DATABASE IF EXISTS power;
create database stream_db; CREATE DATABASE power;
USE power;
create stable stream_db.meters (ts timestamp, current float, voltage int) TAGS (location varchar(64), groupId int); CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
create table stream_db.d1001 using stream_db.meters tags("beijing", 1); CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2);
create table stream_db.d1002 using stream_db.meters tags("guangzhou", 2); CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3);
create table stream_db.d1003 using stream_db.meters tags("shanghai", 3); CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2);
CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);
``` ```
### 创建流 ### 创建流
```sql ```sql
create stream stream1 into stream_db.stream1_output_stb as select _wstart as start, _wend as end, max(current) as max_current from stream_db.meters where voltage <= 220 and ts > now - 12h interval (1h); create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 and ts > now - 12h interval (1h);
``` ```
### 写入数据 ### 写入数据
```sql ```sql
insert into stream_db.d1001 values(now-14h, 10.3, 210); insert into d1001 values(now-13h, 10.30000, 219, 0.31000);
insert into stream_db.d1001 values(now-13h, 13.5, 216); insert into d1001 values(now-11h, 12.60000, 218, 0.33000);
insert into stream_db.d1001 values(now-12h, 12.5, 219); insert into d1001 values(now-10h, 12.30000, 221, 0.31000);
insert into stream_db.d1002 values(now-11h, 14.7, 221); insert into d1002 values(now-9h, 10.30000, 218, 0.25000);
insert into stream_db.d1002 values(now-10h, 10.5, 218); insert into d1003 values(now-8h, 11.80000, 221, 0.28000);
insert into stream_db.d1002 values(now-9h, 11.2, 220); insert into d1003 values(now-7h, 13.40000, 223, 0.29000);
insert into stream_db.d1003 values(now-8h, 11.5, 217); insert into d1004 values(now-6h, 10.80000, 223, 0.29000);
insert into stream_db.d1003 values(now-7h, 12.3, 227); insert into d1004 values(now-5h, 11.50000, 221, 0.35000);
insert into stream_db.d1003 values(now-6h, 12.3, 215);
``` ```
### 查询以观查结果 ### 查询以观查结果
```sql ```sql
taos> select * from stream_db.stream1_output_stb; taos> select start, end, max_current from current_stream_output_stb;
start | end | max_current | group_id | start | end | max_current |
=================================================================================================== ===========================================================================
2022-08-09 14:00:00.000 | 2022-08-09 15:00:00.000 | 10.50000 | 0 | 2022-08-12 04:00:00.000 | 2022-08-12 05:00:00.000 | 12.60000 |
2022-08-09 15:00:00.000 | 2022-08-09 16:00:00.000 | 11.20000 | 0 | 2022-08-12 06:00:00.000 | 2022-08-12 07:00:00.000 | 10.30000 |
2022-08-09 16:00:00.000 | 2022-08-09 17:00:00.000 | 11.50000 | 0 | Query OK, 2 rows in database (0.009580s)
2022-08-09 18:00:00.000 | 2022-08-09 19:00:00.000 | 12.30000 | 0 |
Query OK, 4 rows in database (0.012033s)
``` ```
## 示例二 ## 示例二
某运营商平台要采集机房所有服务器的系统资源指标,包含 cpu、内存、网络延迟等,采集后需要对数据进行四舍五入运算,将地域和服务器名以下划线拼接,然后将结果按时间排序并以服务器名分组输出到新的数据表中。
### 创建 DB 和原始数据表 依然以示例一中的数据为基础,我们已经采集到了每个智能电表的电流和电压数据,现在要求出功率,并将地域和电表名以符号 "." 拼接,然后以电表名称分组输出到新的数据表中。
首先准备数据,完成建库、建一张超级表和多张子表操作
```sql ### 创建 DB 和原始数据表
drop database if exists stream_db;
create database stream_db;
create stable stream_db.idc (ts timestamp, cpu float, mem float, latency float) TAGS (location varchar(64), groupId int);
create table stream_db.server01 using stream_db.idc tags("beijing", 1); 参考示例一 [创建 DB 和原始数据表](#创建-db-和原始数据表)
create table stream_db.server02 using stream_db.idc tags("shanghai", 2);
create table stream_db.server03 using stream_db.idc tags("beijing", 2);
create table stream_db.server04 using stream_db.idc tags("tianjin", 3);
create table stream_db.server05 using stream_db.idc tags("shanghai", 1);
```
### 创建流 ### 创建流
```sql ```sql
create stream stream2 into stream_db.stream2_output_stb as select ts, concat_ws("_", location, tbname) as server_location, round(cpu) as cpu, round(mem) as mem, round(latency) as latency from stream_db.idc partition by tbname order by ts; create stream power_stream into power_stream_output_stb as select ts, concat_ws(".", location, tbname) as meter_location, current*voltage as meter_power from meters partition by tbname;
``` ```
### 写入数据 ### 写入数据
```sql
insert into stream_db.server01 values(now-14h, 50.9, 654.8, 23.11); 参考示例一 [写入数据](#写入数据)
insert into stream_db.server01 values(now-13h, 13.5, 221.2, 11.22);
insert into stream_db.server02 values(now-12h, 154.7, 218.3, 22.33);
insert into stream_db.server02 values(now-11h, 120.5, 111.5, 5.55);
insert into stream_db.server03 values(now-10h, 101.5, 125.6, 5.99);
insert into stream_db.server03 values(now-9h, 12.3, 165.6, 6.02);
insert into stream_db.server04 values(now-8h, 160.9, 120.7, 43.51);
insert into stream_db.server04 values(now-7h, 240.9, 520.7, 54.55);
insert into stream_db.server05 values(now-6h, 190.9, 320.7, 55.43);
insert into stream_db.server05 values(now-5h, 110.9, 600.7, 35.54);
```
### 查询以观查结果 ### 查询以观查结果
```sql ```sql
taos> select ts, server_location, cpu, mem, latency from stream_db.stream2_output_stb; taos> select ts, meter_location, meter_power from power_stream_output_stb;
ts | server_location | cpu | mem | latency | ts | meter_location | meter_power |
================================================================================================================================ =======================================================================================
2022-08-09 21:24:56.785 | beijing_server01 | 51.00000 | 655.00000 | 23.00000 | 2022-08-12 07:44:47.817 | California.SanFrancisco.d1002 | 2245.400041580 |
2022-08-09 22:24:56.795 | beijing_server01 | 14.00000 | 221.00000 | 11.00000 | 2022-08-12 08:44:47.826 | California.LosAngeles.d1003 | 2607.800042152 |
2022-08-09 23:24:56.806 | shanghai_server02 | 155.00000 | 218.00000 | 22.00000 | 2022-08-12 09:44:47.833 | California.LosAngeles.d1003 | 2988.199914932 |
2022-08-10 00:24:56.815 | shanghai_server02 | 121.00000 | 112.00000 | 6.00000 | 2022-08-12 03:44:47.791 | California.SanFrancisco.d1001 | 2255.700041771 |
2022-08-10 01:24:56.826 | beijing_server03 | 102.00000 | 126.00000 | 6.00000 | 2022-08-12 05:44:47.800 | California.SanFrancisco.d1001 | 2746.800083160 |
2022-08-10 02:24:56.838 | beijing_server03 | 12.00000 | 166.00000 | 6.00000 | 2022-08-12 06:44:47.809 | California.SanFrancisco.d1001 | 2718.300042152 |
2022-08-10 03:24:56.846 | tianjin_server04 | 161.00000 | 121.00000 | 44.00000 | 2022-08-12 10:44:47.840 | California.LosAngeles.d1004 | 2408.400042534 |
2022-08-10 04:24:56.853 | tianjin_server04 | 241.00000 | 521.00000 | 55.00000 | 2022-08-12 11:44:48.379 | California.LosAngeles.d1004 | 2541.500000000 |
2022-08-10 05:24:56.866 | shanghai_server05 | 191.00000 | 321.00000 | 55.00000 | Query OK, 8 rows in database (0.014788s)
2022-08-10 06:24:57.301 | shanghai_server05 | 111.00000 | 601.00000 | 36.00000 | ```
Query OK, 10 rows in database (0.022950s) \ No newline at end of file
```
...@@ -4,6 +4,17 @@ description: "数据订阅与推送服务。写入到 TDengine 中的时序数 ...@@ -4,6 +4,17 @@ description: "数据订阅与推送服务。写入到 TDengine 中的时序数
title: 数据订阅 title: 数据订阅
--- ---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。 与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
...@@ -51,7 +62,7 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); ...@@ -51,7 +62,7 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
``` ```
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码可以在 [tmq.c](https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c) 看到 这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码
## 写入数据 ## 写入数据
...@@ -62,13 +73,9 @@ drop database if exists tmqdb; ...@@ -62,13 +73,9 @@ drop database if exists tmqdb;
create database tmqdb; create database tmqdb;
create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16) tags(t1 int, t3 varchar(16)); create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16) tags(t1 int, t3 varchar(16));
create table tmqdb.ctb0 using tmqdb.stb tags(0, "subtable0"); create table tmqdb.ctb0 using tmqdb.stb tags(0, "subtable0");
create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1"); create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1");
create table tmqdb.ctb2 using tmqdb.stb tags(2, "subtable2");
create table tmqdb.ctb3 using tmqdb.stb tags(3, "subtable3");
insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22');
insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33');
``` ```
## 创建topic: ## 创建topic:
...@@ -130,7 +137,6 @@ TMQ支持多种订阅类型: ...@@ -130,7 +137,6 @@ TMQ支持多种订阅类型:
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
return tmq;
``` ```
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。 上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
...@@ -143,66 +149,23 @@ TMQ支持多种订阅类型: ...@@ -143,66 +149,23 @@ TMQ支持多种订阅类型:
```sql ```sql
tmq_list_t* topicList = tmq_list_new(); tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName"); tmq_list_append(topicList, "topicName");
return topicList;
``` ```
## 启动订阅并开始消费 ## 启动订阅并开始消费
```sql ```
/* 启动订阅 */ /* 启动订阅 */
tmq_subscribe(tmq, topicList); tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList); tmq_list_destroy(topicList);
/* 循环poll消息 */ /* 循环poll消息 */
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeOut = 5000;
while (running) { while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut); TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
if (tmqmsg) { msg_process(tmqmsg);
msgCnt++; }
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
} else {
break;
}
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
``` ```
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析: 这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。
```sql
static int32_t msg_process(TAOS_RES* msg) {
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf);
}
return rows;
}
```
## 结束消费 ## 结束消费
...@@ -243,4 +206,44 @@ TMQ支持多种订阅类型: ...@@ -243,4 +206,44 @@ TMQ支持多种订阅类型:
show subscriptions; show subscriptions;
``` ```
## 示例代码
本节展示各种语言的示例代码。
<Tabs>
<TabItem label="C" value="c">
```c
{{#include examples/c/tmq.c}}
```
</TabItem>
<TabItem label="Java" value="java">
<Java />
</TabItem>
<TabItem label="Go" value="Go">
<Go/>
</TabItem>
<TabItem label="Rust" value="Rust">
<Rust />
</TabItem>
<TabItem label="Python" value="Python">
```python
{{#include docs/examples/python/tmq_example.py}}
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
<Node/>
</TabItem>
<TabItem label="C#" value="C#">
<CSharp/>
</TabItem>
</Tabs>
...@@ -8,7 +8,7 @@ title: 权限管理 ...@@ -8,7 +8,7 @@ title: 权限管理
## 创建用户 ## 创建用户
```sql ```sql
CREATE USER use_name PASS password; CREATE USER use_name PASS 'password';
``` ```
创建用户。 创建用户。
...@@ -91,4 +91,4 @@ priv_level : { ...@@ -91,4 +91,4 @@ priv_level : {
``` ```
收回对用户的授权。 收回对用户的授权。
\ No newline at end of file
...@@ -358,7 +358,7 @@ JDBC 连接器可能报错的错误码包括 3 种:JDBC driver 本身的报错 ...@@ -358,7 +358,7 @@ JDBC 连接器可能报错的错误码包括 3 种:JDBC driver 本身的报错
具体的错误码请参考: 具体的错误码请参考:
- [TDengine Java Connector](https://github.com/taosdata/taos-connector-jdbc/blob/main/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java) - [TDengine Java Connector](https://github.com/taosdata/taos-connector-jdbc/blob/main/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java)
- [TDengine_ERROR_CODE](../error-code) <!-- - [TDengine_ERROR_CODE](../error-code) -->
### 通过参数绑定写入数据 ### 通过参数绑定写入数据
......
...@@ -86,7 +86,7 @@ TDengine is removed successfully! ...@@ -86,7 +86,7 @@ TDengine is removed successfully!
</TabItem> </TabItem>
<TabItem label="Windows 卸载" value="windows"> <TabItem label="Windows 卸载" value="windows">
TODO 在 C:\TDengine 目录下,通过运行 unins000.exe 卸载程序来卸载 TDengine。
</TabItem> </TabItem>
</Tabs> </Tabs>
......
...@@ -241,15 +241,16 @@ dataDir /mnt/data6 2 0 ...@@ -241,15 +241,16 @@ dataDir /mnt/data6 2 0
## 数据查询 ## 数据查询
TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、mnode 节点协同完成 TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、qnode、mnode 节点协同完成,一个复杂的超级表聚合查询可能需要多个 vnode 和 qnode 节点公共分担查询和计算任务
### 单表查询 ### 查询基本流程
SQL 语句的解析和校验工作在客户端完成。解析 SQL 语句并生成抽象语法树(Abstract Syntax Tree,AST),然后对其进行校验和检查。以及向管理节点(mnode)请求查询中指定表的元数据信息(table metadata)。 1. 客户端解析输入 SQL 语句并生成抽象语法树(Abstract Syntax Tree,AST),然后根据元数据信息对其进行校验和检查。在此期间,元数据管理模块(Catalog)会向管理节点(mnode)或 vnode 请求查询中指定库和表的元数据信息(table metadata)。
2. 在通过校验检查后,客户端将生成分布式的查询计划并对查询计划进行优化处理。
根据元数据信息中的 End Point 信息,将查询请求序列化后发送到该表所在的数据节点(dnode)。dnode 接收到查询请求后,识别出该查询请求指向的虚拟节点(vnode),将消息转发到 vnode 的查询执行队列。vnode 的查询执行线程建立基础的查询执行环境,并立即返回该查询请求,同时开始执行该查询。 3. 客户端根据配置的查询策略进行任务调度处理,一个查询子任务会根据其数据亲缘关系或负载信息调度到某个 vnode 或 qnode 所属的数据节点(dnode)进行处理。
4. dnode 接收到查询请求后,识别出该查询请求指向的虚拟节点(vnode)或查询节点(qnode),将消息转发到 vnode 或 qnode 的查询执行队列。
客户端在获取查询结果的时候,dnode 的查询执行队列中的工作线程会等待 vnode 执行线程执行完成,才能将查询结果返回到请求的客户端。 5. vnode 或 qnode 的查询执行线程建立基础的查询执行环境,并立即执行该查询,在得到部分可获取查询结果后通知客户端。
6. 客户端将启动下级查询任务或直接获取查询结果。
### 按时间轴聚合、降采样、插值 ### 按时间轴聚合、降采样、插值
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_UTIL_SCHED_H_ #define _TD_UTIL_SCHED_H_
#include "os.h" #include "os.h"
#include "tdef.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -30,6 +31,24 @@ typedef struct SSchedMsg { ...@@ -30,6 +31,24 @@ typedef struct SSchedMsg {
void *thandle; void *thandle;
} SSchedMsg; } SSchedMsg;
typedef struct {
char label[TSDB_LABEL_LEN];
tsem_t emptySem;
tsem_t fullSem;
TdThreadMutex queueMutex;
int32_t fullSlot;
int32_t emptySlot;
int32_t queueSize;
int32_t numOfThreads;
TdThread *qthread;
SSchedMsg *queue;
int8_t stop;
void *pTmrCtrl;
void *pTimer;
} SSchedQueue;
/** /**
* Create a thread-safe ring-buffer based task queue and return the instance. A thread * Create a thread-safe ring-buffer based task queue and return the instance. A thread
* pool will be created to consume the messages in the queue. * pool will be created to consume the messages in the queue.
...@@ -38,7 +57,7 @@ typedef struct SSchedMsg { ...@@ -38,7 +57,7 @@ typedef struct SSchedMsg {
* @param label the label of the queue * @param label the label of the queue
* @return the created queue scheduler * @return the created queue scheduler
*/ */
void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label); void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label, SSchedQueue* pSched);
/** /**
* Create a thread-safe ring-buffer based task queue and return the instance. * Create a thread-safe ring-buffer based task queue and return the instance.
......
...@@ -322,7 +322,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp( ...@@ -322,7 +322,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows);
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields);
int32_t len = *(int32_t *)data; int32_t len = *(int32_t *)(((char *)data) + 4);
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len)); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len));
return JNI_SUCCESS; return JNI_SUCCESS;
} }
...@@ -592,7 +592,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNI ...@@ -592,7 +592,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNI
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows);
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields);
int32_t len = *(int32_t *)data; int32_t len = *(int32_t *)(((char *)data) + 4);
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len)); (*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len));
return JNI_SUCCESS; return JNI_SUCCESS;
......
...@@ -87,10 +87,11 @@ typedef struct { ...@@ -87,10 +87,11 @@ typedef struct {
typedef struct { typedef struct {
tsem_t syncSem; tsem_t syncSem;
int64_t sync; int64_t sync;
bool standby;
SReplica replica; SReplica replica;
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
SRWLatch lock;
int8_t standby;
int8_t leaderTransferFinish; int8_t leaderTransferFinish;
} SSyncMgmt; } SSyncMgmt;
......
...@@ -238,7 +238,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { ...@@ -238,7 +238,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
} else { } else {
memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1); memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
} }
if (mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) { if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
return -1; return -1;
} }
......
...@@ -60,15 +60,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM ...@@ -60,15 +60,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
} }
taosRLockLatch(&pMgmt->lock);
if (transId <= 0) { if (transId <= 0) {
taosRUnLockLatch(&pMgmt->lock);
mError("trans:%d, invalid commit msg", transId); mError("trans:%d, invalid commit msg", transId);
} else if (transId == pMgmt->transId) { } else if (transId == pMgmt->transId) {
taosRUnLockLatch(&pMgmt->lock);
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
} }
pMgmt->transId = 0; pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} else { } else {
taosRUnLockLatch(&pMgmt->lock);
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) { if (pTrans != NULL) {
mDebug("trans:%d, execute in mnode which not leader", transId); mDebug("trans:%d, execute in mnode which not leader", transId);
...@@ -115,6 +119,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM ...@@ -115,6 +119,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
cbMeta.code, cbMeta.index, cbMeta.term); cbMeta.code, cbMeta.index, cbMeta.term);
taosWLockLatch(&pMgmt->lock);
if (pMgmt->transId == -1) { if (pMgmt->transId == -1) {
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
...@@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM ...@@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
pMgmt->transId = 0; pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} }
taosWUnLockLatch(&pMgmt->lock);
} }
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
...@@ -170,12 +176,24 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) { ...@@ -170,12 +176,24 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
mDebug("vgId:1, become follower"); mDebug("vgId:1, become follower");
// clear old leader resource taosWLockLatch(&pMnode->syncMgmt.lock);
if (pMnode->syncMgmt.transId != 0) {
pMnode->syncMgmt.transId = 0;
tsem_post(&pMnode->syncMgmt.syncSem);
}
taosWUnLockLatch(&pMnode->syncMgmt.lock);
} }
static void mndBecomeLeader(struct SSyncFSM *pFsm) { static void mndBecomeLeader(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
mDebug("vgId:1, become leader"); mDebug("vgId:1, become leader");
SMnode *pMnode = pFsm->data;
taosWLockLatch(&pMnode->syncMgmt.lock);
if (pMnode->syncMgmt.transId != 0) {
pMnode->syncMgmt.transId = 0;
tsem_post(&pMnode->syncMgmt.syncSem);
}
taosWUnLockLatch(&pMnode->syncMgmt.lock);
} }
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
...@@ -202,6 +220,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { ...@@ -202,6 +220,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
int32_t mndInitSync(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosInitRWLatch(&pMgmt->lock);
pMgmt->transId = 0;
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg}; SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
...@@ -254,11 +274,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { ...@@ -254,11 +274,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
memcpy(req.pCont, pRaw, req.contLen); memcpy(req.pCont, pRaw, req.contLen);
pMgmt->errCode = 0; pMgmt->errCode = 0;
taosWLockLatch(&pMgmt->lock);
pMgmt->transId = transId; pMgmt->transId = transId;
taosWUnLockLatch(&pMgmt->lock);
mTrace("trans:%d, will be proposed", pMgmt->transId); mTrace("trans:%d, will be proposed", pMgmt->transId);
const bool isWeak = false; const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &req, isWeak); int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
if (code == 0) { if (code == 0) {
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
...@@ -286,10 +309,12 @@ void mndSyncStart(SMnode *pMnode) { ...@@ -286,10 +309,12 @@ void mndSyncStart(SMnode *pMnode) {
} }
void mndSyncStop(SMnode *pMnode) { void mndSyncStop(SMnode *pMnode) {
taosWLockLatch(&pMnode->syncMgmt.lock);
if (pMnode->syncMgmt.transId != 0) { if (pMnode->syncMgmt.transId != 0) {
pMnode->syncMgmt.transId = 0; pMnode->syncMgmt.transId = 0;
tsem_post(&pMnode->syncMgmt.syncSem); tsem_post(&pMnode->syncMgmt.syncSem);
} }
taosWUnLockLatch(&pMnode->syncMgmt.lock);
} }
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
......
...@@ -308,7 +308,8 @@ struct SVnode { ...@@ -308,7 +308,8 @@ struct SVnode {
SSink* pSink; SSink* pSink;
tsem_t canCommit; tsem_t canCommit;
int64_t sync; int64_t sync;
int32_t blockCount; SRWLatch lock;
bool blocked;
bool restored; bool restored;
tsem_t syncSem; tsem_t syncSem;
SQHandle* pQuery; SQHandle* pQuery;
......
...@@ -85,7 +85,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -85,7 +85,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->state.commitTerm = info.state.commitTerm; pVnode->state.commitTerm = info.state.commitTerm;
pVnode->pTfs = pTfs; pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
pVnode->blockCount = 0; taosInitRWLatch(&pVnode->lock);
pVnode->blocked = false;
tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&pVnode->syncSem, 0, 0);
tsem_init(&(pVnode->canCommit), 0, 1); tsem_init(&(pVnode->canCommit), 0, 1);
......
...@@ -28,20 +28,28 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } ...@@ -28,20 +28,28 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) { if (vnodeIsMsgBlock(pMsg->msgType)) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); taosWLockLatch(&pVnode->lock);
pVnode->blockCount = 1; if (!pVnode->blocked) {
tsem_wait(&pVnode->syncSem); vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
pVnode->blocked = true;
taosWUnLockLatch(&pVnode->lock);
tsem_wait(&pVnode->syncSem);
} else {
taosWUnLockLatch(&pVnode->lock);
}
} }
} }
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) { if (vnodeIsMsgBlock(pMsg->msgType)) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
if (pVnode->blockCount) { taosWLockLatch(&pVnode->lock);
if (pVnode->blocked) {
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
pVnode->blockCount = 0; pVnode->blocked = false;
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
taosWUnLockLatch(&pVnode->lock);
} }
} }
...@@ -677,6 +685,12 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { ...@@ -677,6 +685,12 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
vDebug("vgId:%d, become follower", pVnode->config.vgId); vDebug("vgId:%d, become follower", pVnode->config.vgId);
// clear old leader resource // clear old leader resource
taosWLockLatch(&pVnode->lock);
if (pVnode->blocked) {
pVnode->blocked = false;
tsem_post(&pVnode->syncSem);
}
taosWUnLockLatch(&pVnode->lock);
} }
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
......
...@@ -581,6 +581,20 @@ _return: ...@@ -581,6 +581,20 @@ _return:
} }
int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) { int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
char *p = strchr(dbFName, '.');
if (p) {
++p;
} else {
p = dbFName;
}
if (IS_SYS_DBNAME(p)) {
*inCache = true;
*pass = true;
ctgDebug("sysdb %s, pass", dbFName);
return TSDB_CODE_SUCCESS;
}
SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user)); SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user));
if (NULL == pUser) { if (NULL == pUser) {
ctgDebug("user not in cache, user:%s", user); ctgDebug("user not in cache, user:%s", user);
......
...@@ -916,7 +916,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo ...@@ -916,7 +916,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
int32_t vgNum = taosHashGetSize(dbInfo->vgHash); int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
if (vgNum <= 0) { if (vgNum <= 0) {
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum); ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
tableNameHashFp fp = NULL; tableNameHashFp fp = NULL;
...@@ -931,6 +931,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo ...@@ -931,6 +931,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo)); vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == vgInfo) { if (NULL == vgInfo) {
taosHashCancelIterate(dbInfo->vgHash, pIter);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -980,7 +981,6 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo ...@@ -980,7 +981,6 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
if (NULL == p) { if (NULL == p) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash)); ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash));
ASSERT(0);
taosArrayDestroy(pVgList); taosArrayDestroy(pVgList);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
......
...@@ -101,12 +101,14 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn ...@@ -101,12 +101,14 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
} }
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
/*
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery; uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) { if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity, qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueItemSize(pDispatcher->pDataBlocks)); taosQueueItemSize(pDispatcher->pDataBlocks));
return false; return false;
} }
*/
pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData); pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData);
......
...@@ -1433,7 +1433,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u ...@@ -1433,7 +1433,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
pAggInfo->groupId = groupId; pAggInfo->groupId = groupId;
} }
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) { static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
const int32_t* rowCellOffset) {
bool returnNotNull = false; bool returnNotNull = false;
for (int32_t j = 0; j < numOfExprs; ++j) { for (int32_t j = 0; j < numOfExprs; ++j) {
struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset); struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
...@@ -1613,7 +1614,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG ...@@ -1613,7 +1614,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
if (!pbInfo->mergeResultBlock) { if (!pbInfo->mergeResultBlock) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
} else { } else {
while(hasRemainResults(pGroupResInfo)) { while (hasRemainResults(pGroupResInfo)) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
if (pBlock->info.rows >= pOperator->resultInfo.threshold) { if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
break; break;
...@@ -2062,10 +2063,10 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t ...@@ -2062,10 +2063,10 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t
} }
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList,
char** pNextStart) { char** pNextStart) {
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes); blockDataCleanup(pRes);
*pNextStart = (char*) blockDecode(pRes, pData); *pNextStart = (char*)blockDecode(pRes, pData);
} else { // extract data according to pColList } else { // extract data according to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList)); ASSERT(numOfOutput == taosArrayGetSize(pColList));
char* pStart = pData; char* pStart = pData;
...@@ -2161,9 +2162,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -2161,9 +2162,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
} }
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
int32_t index = 0; int32_t index = 0;
char* pStart = pRetrieveRsp->data; char* pStart = pRetrieveRsp->data;
while(index++ < pRetrieveRsp->numOfBlocks) { while (index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
code = extractDataBlockFromFetchRsp(pb, pStart, pRetrieveRsp->numOfCols, NULL, &pStart); code = extractDataBlockFromFetchRsp(pb, pStart, pRetrieveRsp->numOfCols, NULL, &pStart);
if (code != 0) { if (code != 0) {
...@@ -2177,8 +2178,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -2177,8 +2178,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d" qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb," " execId:%d"
" index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb,"
" completed:%d try next %d/%" PRIzu, " completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
...@@ -2186,9 +2189,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -2186,9 +2189,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
completed += 1; completed += 1;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize/1024.0); GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
} }
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);
...@@ -3521,7 +3525,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -3521,7 +3525,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
...@@ -3562,7 +3566,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -3562,7 +3566,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
return pOperator; return pOperator;
_error: _error:
destroyAggOperatorInfo(pInfo, numOfCols); destroyAggOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -4180,7 +4183,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4180,7 +4183,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock, pTaskInfo); pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
...@@ -4195,7 +4199,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4195,7 +4199,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pIntervalPhyNode->window.mergeDataBlock, pTaskInfo); pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
pIntervalPhyNode->window.mergeDataBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 0; int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
...@@ -4249,8 +4254,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4249,8 +4254,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
ASSERT(0); ASSERT(0);
} }
taosMemoryFree(ops); taosMemoryFree(ops);
if (pOptr) pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
return pOptr; return pOptr;
} }
......
...@@ -97,7 +97,8 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { ...@@ -97,7 +97,8 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->groupId = groupId; pRowSup->groupId = groupId;
} }
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) { static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
uint64_t groupId) {
pRowSup->startRowIndex = rowIndex; pRowSup->startRowIndex = rowIndex;
pRowSup->numOfRows = 0; pRowSup->numOfRows = 0;
pRowSup->win.skey = tsList[rowIndex]; pRowSup->win.skey = tsList[rowIndex];
...@@ -869,7 +870,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ ...@@ -869,7 +870,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
} }
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) { static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);; return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
;
} }
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
...@@ -910,9 +912,9 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) { ...@@ -910,9 +912,9 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) {
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) { if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) {
return; return;
} }
int32_t delSize = taosArrayGetSize(pDelWins); int32_t delSize = taosArrayGetSize(pDelWins);
void* pIte = NULL; void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
SResKeyPos* pResKey = (SResKeyPos*)pIte; SResKeyPos* pResKey = (SResKeyPos*)pIte;
int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes); int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes);
...@@ -1592,9 +1594,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1592,9 +1594,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -1874,7 +1876,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1874,7 +1876,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
_error: _error:
destroyIntervalOperatorInfo(pInfo, numOfCols); destroyIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -1931,7 +1932,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -1931,7 +1932,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
_error: _error:
destroyIntervalOperatorInfo(pInfo, numOfCols); destroyIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -1965,7 +1965,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1965,7 +1965,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid); doKeepTuple(pRowSup, tsList[j], gid);
} else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap || } else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap ||
(pRowSup->prevTs - tsList[j] >= 0 ) && (pRowSup->prevTs - tsList[j] <= gap)) { (pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap)) {
// The gap is less than the threshold, so it belongs to current session window that has been opened already. // The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid); doKeepTuple(pRowSup, tsList[j], gid);
if (j == 0 && pRowSup->startRowIndex != 0) { if (j == 0 && pRowSup->startRowIndex != 0) {
...@@ -2118,7 +2118,7 @@ static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock ...@@ -2118,7 +2118,7 @@ static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex, static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex,
bool isLastRow) { bool isLastRow) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
bool fillLastPoint = pSliceInfo->fillLastPoint; bool fillLastPoint = pSliceInfo->fillLastPoint;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
...@@ -2150,11 +2150,9 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo ...@@ -2150,11 +2150,9 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
} }
pSliceInfo->fillLastPoint = isLastRow ? true : false; pSliceInfo->fillLastPoint = isLastRow ? true : false;
} }
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) {
SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows; int32_t rows = pResBlock->info.rows;
// todo set the correct primary timestamp column // todo set the correct primary timestamp column
...@@ -2165,7 +2163,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2165,7 +2163,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
int32_t dstSlot = pExprInfo->base.resSchema.slotId; int32_t dstSlot = pExprInfo->base.resSchema.slotId;
//SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); // SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) { switch (pSliceInfo->fillType) {
...@@ -2181,15 +2179,15 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2181,15 +2179,15 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0; float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char *)&v, false); colDataAppend(pDst, rows, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0; double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char *)&v, false); colDataAppend(pDst, rows, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0; int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char *)&v, false); colDataAppend(pDst, rows, (char*)&v, false);
} }
pResBlock->info.rows += 1; pResBlock->info.rows += 1;
break; break;
...@@ -2198,8 +2196,8 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2198,8 +2196,8 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
case TSDB_FILL_LINEAR: { case TSDB_FILL_LINEAR: {
SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot); SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot);
SPoint start = pLinearInfo->start; SPoint start = pLinearInfo->start;
SPoint end = pLinearInfo->end; SPoint end = pLinearInfo->end;
SPoint current = {.key = pSliceInfo->current}; SPoint current = {.key = pSliceInfo->current};
current.val = taosMemoryCalloc(pLinearInfo->bytes, 1); current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
...@@ -2212,7 +2210,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2212,7 +2210,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
colDataAppendNULL(pDst, rows); colDataAppendNULL(pDst, rows);
} else { } else {
taosGetLinearInterpolationVal(&current, pLinearInfo->type, &start, &end, pLinearInfo->type); taosGetLinearInterpolationVal(&current, pLinearInfo->type, &start, &end, pLinearInfo->type);
colDataAppend(pDst, rows, (char *)current.val, false); colDataAppend(pDst, rows, (char*)current.val, false);
} }
taosMemoryFree(current.val); taosMemoryFree(current.val);
...@@ -2318,11 +2316,11 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB ...@@ -2318,11 +2316,11 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
SFillLinearInfo linearInfo = {0}; SFillLinearInfo linearInfo = {0};
linearInfo.start.key = INT64_MIN; linearInfo.start.key = INT64_MIN;
linearInfo.end.key = INT64_MAX; linearInfo.end.key = INT64_MAX;
linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes); linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
linearInfo.hasNull = false; linearInfo.hasNull = false;
linearInfo.type = pColInfo->info.type; linearInfo.type = pColInfo->info.type;
linearInfo.bytes = pColInfo->info.bytes; linearInfo.bytes = pColInfo->info.bytes;
taosArrayPush(pInfo->pLinearInfo, &linearInfo); taosArrayPush(pInfo->pLinearInfo, &linearInfo);
} }
...@@ -2400,7 +2398,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2400,7 +2398,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
if (i == 0 && needToFillLastPoint(pSliceInfo)) { // first row in current block if (i == 0 && needToFillLastPoint(pSliceInfo)) { // first row in current block
doKeepLinearInfo(pSliceInfo, pBlock, i, false); doKeepLinearInfo(pSliceInfo, pBlock, i, false);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
...@@ -2446,8 +2444,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2446,8 +2444,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (nextTs > pSliceInfo->current) { if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
break; break;
} }
...@@ -2458,11 +2456,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2458,11 +2456,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break; break;
} }
} }
} else {// it is the last row of current block } else { // it is the last row of current block
//store ts value as start, and calculate interp value when processing next block // store ts value as start, and calculate interp value when processing next block
doKeepLinearInfo(pSliceInfo, pBlock, i, true); doKeepLinearInfo(pSliceInfo, pBlock, i, true);
} }
} else { // non-linear interpolation } else { // non-linear interpolation
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
...@@ -2480,7 +2478,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2480,7 +2478,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
// no need to increate pSliceInfo->current here // no need to increate pSliceInfo->current here
//pSliceInfo->current = // pSliceInfo->current =
// taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); // taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (i < pBlock->info.rows - 1) { if (i < pBlock->info.rows - 1) {
doKeepLinearInfo(pSliceInfo, pBlock, i, false); doKeepLinearInfo(pSliceInfo, pBlock, i, false);
...@@ -2488,8 +2486,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2488,8 +2486,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (nextTs > pSliceInfo->current) { if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
break; break;
} }
...@@ -2501,7 +2499,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2501,7 +2499,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
} }
} }
} else { // non-linear interpolation } else { // non-linear interpolation
if (i < pBlock->info.rows - 1) { if (i < pBlock->info.rows - 1) {
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i + 1); doKeepNextRows(pSliceInfo, pBlock, i + 1);
...@@ -2509,8 +2507,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2509,8 +2507,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (nextTs > pSliceInfo->current) { if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
break; break;
} }
...@@ -2557,7 +2555,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2557,7 +2555,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pResBlock->info.rows += 1; pResBlock->info.rows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i); doKeepPrevRows(pSliceInfo, pBlock, i);
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
...@@ -2567,8 +2564,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2567,8 +2564,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (nextTs > pSliceInfo->current) { if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
break; break;
} }
...@@ -2580,7 +2577,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2580,7 +2577,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
} }
} }
} else { // non-linear interpolation } else { // non-linear interpolation
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
...@@ -2596,12 +2593,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2596,12 +2593,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
} }
} }
} }
// check if need to interpolate after last datablock // check if need to interpolate after last datablock
// except for fill(next), fill(linear) // except for fill(next), fill(linear)
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) { while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
...@@ -2825,7 +2822,6 @@ _error: ...@@ -2825,7 +2822,6 @@ _error:
destroySWindowOperatorInfo(pInfo, numOfCols); destroySWindowOperatorInfo(pInfo, numOfCols);
} }
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -3462,7 +3458,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3462,7 +3458,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
_error: _error:
destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols); destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -3647,7 +3642,6 @@ _error: ...@@ -3647,7 +3642,6 @@ _error:
destroyStreamSessionAggOperatorInfo(pInfo, numOfCols); destroyStreamSessionAggOperatorInfo(pInfo, numOfCols);
} }
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -3765,8 +3759,8 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star ...@@ -3765,8 +3759,8 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star
return insertNewSessionWindow(pWinInfos, startTs, index + 1); return insertNewSessionWindow(pWinInfos, startTs, index + 1);
} }
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,int32_t rows, int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,
int32_t start, int64_t gap, SHashObj* pStDeleted) { int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted) {
for (int32_t i = start; i < rows; ++i) { for (int32_t i = start; i < rows; ++i) {
if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) { if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
return i - start; return i - start;
...@@ -3943,8 +3937,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -3943,8 +3937,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex); SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex);
winRows = winRows = updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap,
updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -4051,7 +4045,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It ...@@ -4051,7 +4045,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
blockDataEnsureCapacity(pBlock, size); blockDataEnsureCapacity(pBlock, size);
size_t keyLen = 0; size_t keyLen = 0;
while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) {
SWinRes* res = *Ite; SWinRes* res = *Ite;
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
...@@ -4182,7 +4176,7 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) { ...@@ -4182,7 +4176,7 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) {
int32_t size = taosArrayGetSize(pResWins); int32_t size = taosArrayGetSize(pResWins);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i); SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i);
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId};
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
} }
} }
...@@ -4479,7 +4473,6 @@ _error: ...@@ -4479,7 +4473,6 @@ _error:
destroyStreamSessionAggOperatorInfo(pInfo, pOperator->exprSupp.numOfExprs); destroyStreamSessionAggOperatorInfo(pInfo, pOperator->exprSupp.numOfExprs);
} }
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -4916,7 +4909,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4916,7 +4909,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
_error: _error:
destroyStreamStateOperatorInfo(pInfo, numOfCols); destroyStreamStateOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -5026,7 +5018,6 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -5026,7 +5018,6 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
} }
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -5101,7 +5092,7 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5101,7 +5092,7 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
blockDataCleanup(pRes); blockDataCleanup(pRes);
if (iaInfo->binfo.mergeResultBlock) { if (iaInfo->binfo.mergeResultBlock) {
while(1) { while (1) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
break; break;
} }
...@@ -5191,7 +5182,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -5191,7 +5182,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
_error: _error:
destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols); destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols);
taosMemoryFreeClear(miaInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -5496,7 +5486,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -5496,7 +5486,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
_error: _error:
destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); destroyMergeIntervalOperatorInfo(miaInfo, numOfCols);
taosMemoryFreeClear(miaInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
......
...@@ -2176,7 +2176,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2176,7 +2176,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "top", .name = "top",
.type = FUNCTION_TYPE_TOP, .type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC,
.translateFunc = translateTopBot, .translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup, .initFunc = topBotFunctionSetup,
...@@ -2191,7 +2191,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2191,7 +2191,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "bottom", .name = "bottom",
.type = FUNCTION_TYPE_BOTTOM, .type = FUNCTION_TYPE_BOTTOM,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC,
.translateFunc = translateTopBot, .translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup, .initFunc = topBotFunctionSetup,
...@@ -2590,7 +2590,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2590,7 +2590,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "sample", .name = "sample",
.type = FUNCTION_TYPE_SAMPLE, .type = FUNCTION_TYPE_SAMPLE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC,
.translateFunc = translateSample, .translateFunc = translateSample,
.getEnvFunc = getSampleFuncEnv, .getEnvFunc = getSampleFuncEnv,
.initFunc = sampleFunctionSetup, .initFunc = sampleFunctionSetup,
......
...@@ -62,12 +62,13 @@ static void indexDestroy(void* sIdx); ...@@ -62,12 +62,13 @@ static void indexDestroy(void* sIdx);
void indexInit() { void indexInit() {
// refactor later // refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index", NULL);
indexRefMgt = taosOpenRef(1000, indexDestroy); indexRefMgt = taosOpenRef(1000, indexDestroy);
} }
void indexCleanup() { void indexCleanup() {
// refacto later // refacto later
taosCleanUpScheduler(indexQhandle); taosCleanUpScheduler(indexQhandle);
taosMemoryFreeClear(indexQhandle);
taosCloseRef(indexRefMgt); taosCloseRef(indexRefMgt);
} }
......
...@@ -2037,7 +2037,17 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, ...@@ -2037,7 +2037,17 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
code = getDBVgInfoImpl(pCxt, pName, &vgroupList); code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
} }
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) { if (TSDB_CODE_SUCCESS == code &&
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
isSelectStmt(pCxt->pCurrStmt) &&
0 == taosArrayGetSize(vgroupList)) {
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
}
if (TSDB_CODE_SUCCESS == code &&
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList); code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
} }
......
...@@ -96,12 +96,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag ...@@ -96,12 +96,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
return true; return true;
} }
static void* pTaskQueue = NULL; static SSchedQueue pTaskQueue = {0};
int32_t initTaskQueue() { int32_t initTaskQueue() {
int32_t queueSize = tsMaxShellConns * 2; int32_t queueSize = tsMaxShellConns * 2;
pTaskQueue = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc"); void *p = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc", &pTaskQueue);
if (NULL == pTaskQueue) { if (NULL == p) {
qError("failed to init task queue"); qError("failed to init task queue");
return -1; return -1;
} }
...@@ -111,7 +111,7 @@ int32_t initTaskQueue() { ...@@ -111,7 +111,7 @@ int32_t initTaskQueue() {
} }
int32_t cleanupTaskQueue() { int32_t cleanupTaskQueue() {
taosCleanUpScheduler(pTaskQueue); taosCleanUpScheduler(&pTaskQueue);
return 0; return 0;
} }
...@@ -134,7 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) ...@@ -134,7 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
schedMsg.thandle = execParam; schedMsg.thandle = execParam;
schedMsg.msg = code; schedMsg.msg = code;
taosScheduleTask(pTaskQueue, &schedMsg); taosScheduleTask(&pTaskQueue, &schedMsg);
return 0; return 0;
} }
......
...@@ -302,6 +302,7 @@ void *taosMemoryStrDup(const char *ptr) { ...@@ -302,6 +302,7 @@ void *taosMemoryStrDup(const char *ptr) {
} }
void taosMemoryFree(void *ptr) { void taosMemoryFree(void *ptr) {
if (NULL == ptr) return;
#ifdef USE_TD_MEMORY #ifdef USE_TD_MEMORY
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) { if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) {
......
...@@ -391,7 +391,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * ...@@ -391,7 +391,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
assert(TAOS_LRU_ENTRY_IN_CACHE(old)); assert(TAOS_LRU_ENTRY_IN_CACHE(old));
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { if (!TAOS_LRU_ENTRY_HAS_REFS(old)) {
taosLRUCacheShardLRURemove(shard, old); taosLRUCacheShardLRURemove(shard, old);
assert(shard->usage >= old->totalCharge); assert(shard->usage >= old->totalCharge);
shard->usage -= old->totalCharge; shard->usage -= old->totalCharge;
......
...@@ -22,30 +22,16 @@ ...@@ -22,30 +22,16 @@
#define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue. #define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue.
typedef struct {
char label[TSDB_LABEL_LEN];
tsem_t emptySem;
tsem_t fullSem;
TdThreadMutex queueMutex;
int32_t fullSlot;
int32_t emptySlot;
int32_t queueSize;
int32_t numOfThreads;
TdThread *qthread;
SSchedMsg *queue;
bool stop;
void *pTmrCtrl;
void *pTimer;
} SSchedQueue;
static void *taosProcessSchedQueue(void *param); static void *taosProcessSchedQueue(void *param);
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label) { void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label, SSchedQueue *pSched) {
SSchedQueue *pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1); if (NULL == pSched) {
if (pSched == NULL) { pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1);
uError("%s: no enough memory for pSched", label); if (pSched == NULL) {
return NULL; uError("%s: no enough memory for pSched", label);
return NULL;
}
} }
pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize); pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize);
...@@ -86,7 +72,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab ...@@ -86,7 +72,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
return NULL; return NULL;
} }
pSched->stop = false; atomic_store_8(&pSched->stop, 0);
for (int32_t i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
TdThreadAttr attr; TdThreadAttr attr;
taosThreadAttrInit(&attr); taosThreadAttrInit(&attr);
...@@ -107,7 +93,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab ...@@ -107,7 +93,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
} }
void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) { void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label); SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label, NULL);
if (tmrCtrl != NULL && pSched != NULL) { if (tmrCtrl != NULL && pSched != NULL) {
pSched->pTmrCtrl = tmrCtrl; pSched->pTmrCtrl = tmrCtrl;
...@@ -131,7 +117,7 @@ void *taosProcessSchedQueue(void *scheduler) { ...@@ -131,7 +117,7 @@ void *taosProcessSchedQueue(void *scheduler) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
ASSERT(0); ASSERT(0);
} }
if (pSched->stop) { if (atomic_load_8(&pSched->stop)) {
break; break;
} }
...@@ -172,6 +158,11 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { ...@@ -172,6 +158,11 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
return; return;
} }
if (atomic_load_8(&pSched->stop)) {
uError("sched is already stopped, msg:%p is dropped", pMsg);
return;
}
if ((ret = tsem_wait(&pSched->emptySem)) != 0) { if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
ASSERT(0); ASSERT(0);
...@@ -202,7 +193,10 @@ void taosCleanUpScheduler(void *param) { ...@@ -202,7 +193,10 @@ void taosCleanUpScheduler(void *param) {
uDebug("start to cleanup %s schedQsueue", pSched->label); uDebug("start to cleanup %s schedQsueue", pSched->label);
pSched->stop = true; atomic_store_8(&pSched->stop, 1);
taosMsleep(200);
for (int32_t i = 0; i < pSched->numOfThreads; ++i) { for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) { if (taosCheckPthreadValid(pSched->qthread[i])) {
tsem_post(&pSched->fullSem); tsem_post(&pSched->fullSem);
...@@ -226,7 +220,7 @@ void taosCleanUpScheduler(void *param) { ...@@ -226,7 +220,7 @@ void taosCleanUpScheduler(void *param) {
if (pSched->queue) taosMemoryFree(pSched->queue); if (pSched->queue) taosMemoryFree(pSched->queue);
if (pSched->qthread) taosMemoryFree(pSched->qthread); if (pSched->qthread) taosMemoryFree(pSched->qthread);
taosMemoryFree(pSched); // fix memory leak //taosMemoryFree(pSched);
} }
// for debug purpose, dump the scheduler status every 1min. // for debug purpose, dump the scheduler status every 1min.
......
...@@ -555,7 +555,7 @@ static void taosTmrModuleInit(void) { ...@@ -555,7 +555,7 @@ static void taosTmrModuleInit(void) {
return; return;
} }
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr"); tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK); taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads); tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
...@@ -606,6 +606,7 @@ void taosTmrCleanUp(void* handle) { ...@@ -606,6 +606,7 @@ void taosTmrCleanUp(void* handle) {
taosUninitTimer(); taosUninitTimer();
taosCleanUpScheduler(tmrQhandle); taosCleanUpScheduler(tmrQhandle);
taosMemoryFreeClear(tmrQhandle);
for (int32_t i = 0; i < tListLen(wheels); i++) { for (int32_t i = 0; i < tListLen(wheels); i++) {
time_wheel_t* wheel = wheels + i; time_wheel_t* wheel = wheels + i;
......
...@@ -210,6 +210,8 @@ if $rows != 3 then ...@@ -210,6 +210,8 @@ if $rows != 3 then
return -1 return -1
endi endi
sql_error select * from performance_schema.PERF_OFFSETS;
sql show create stable stb; sql show create stable stb;
if $rows != 1 then if $rows != 1 then
return -1 return -1
......
...@@ -45,19 +45,19 @@ sql_error drop database db ...@@ -45,19 +45,19 @@ sql_error drop database db
sql_error use db sql_error use db
sql_error alter database db replica 1; sql_error alter database db replica 1;
sql_error show db.vgroups sql_error show db.vgroups
sql_error select * from information_schema.ins_stables where db_name = 'db' sql select * from information_schema.ins_stables where db_name = 'db'
sql_error select * from information_schema.ins_tables where db_name = 'db' sql select * from information_schema.ins_tables where db_name = 'db'
print =============== check show print =============== check show
sql_error select * from information_schema.ins_users sql select * from information_schema.ins_users
sql_error show cluster sql_error show cluster
sql_error select * from information_schema.ins_dnodes sql select * from information_schema.ins_dnodes
sql_error select * from information_schema.ins_mnodes sql select * from information_schema.ins_mnodes
sql_error show snodes sql_error show snodes
sql_error select * from information_schema.ins_qnodes sql select * from information_schema.ins_qnodes
sql_error show bnodes sql_error show bnodes
sql_error show grants sql_error show grants
sql_error show dnode 1 variables; sql_error show dnode 1 variables;
sql show variables; sql show variables;
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
...@@ -515,7 +515,7 @@ class TDTestCase: ...@@ -515,7 +515,7 @@ class TDTestCase:
# "condition": "where ts>0 and ts < now interval(1h) fill(next)" # "condition": "where ts>0 and ts < now interval(1h) fill(next)"
# } # }
# self.checksample(**err45) # interval # self.checksample(**err45) # interval
tdSql.query("select sample( c1 , 1 ) from t1 where ts>0 and ts < now interval(1h) fill(next)") tdSql.error("select sample( c1 , 1 ) from t1 where ts>0 and ts < now interval(1h) fill(next)")
err46 = { err46 = {
"table_expr": "t1", "table_expr": "t1",
"condition": "group by c6" "condition": "group by c6"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册